author | Brad Fitzpatrick
<bradfitz@golang.org> 2016-07-26 08:19:14 UTC |
committer | Brad Fitzpatrick
<bradfitz@golang.org> 2016-07-26 22:16:01 UTC |
parent | 9f2c271364b418388d150f9c12ecbf12794095c1 |
http2/server.go | +20 | -1 |
http2/server_test.go | +43 | -0 |
http2/transport.go | +34 | -6 |
http2/transport_test.go | +80 | -0 |
diff --git a/http2/server.go b/http2/server.go index f368738..dbe6c87 100644 --- a/http2/server.go +++ b/http2/server.go @@ -1176,6 +1176,10 @@ func (sc *serverConn) closeStream(st *stream, err error) { } delete(sc.streams, st.id) if p := st.body; p != nil { + // Return any buffered unread bytes worth of conn-level flow control. + // See golang.org/issue/16481 + sc.sendWindowUpdate(nil, p.Len()) + p.CloseWithError(err) } st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc @@ -1277,6 +1281,8 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { func (sc *serverConn) processData(f *DataFrame) error { sc.serveG.check() + data := f.Data() + // "If a DATA frame is received whose stream is not in "open" // or "half closed (local)" state, the recipient MUST respond // with a stream error (Section 5.4.2) of type STREAM_CLOSED." @@ -1288,12 +1294,25 @@ func (sc *serverConn) processData(f *DataFrame) error { // the http.Handler returned, so it's done reading & // done writing). Try to stop the client from sending // more DATA. + + // But still enforce their connection-level flow control, + // and return any flow control bytes since we're not going + // to consume them. + if int(sc.inflow.available()) < len(data) { + return StreamError{id, ErrCodeFlowControl} + } + // Deduct the flow control from inflow, since we're + // going to immediately add it back in + // sendWindowUpdate, which also schedules sending the + // frames. + sc.inflow.take(int32(len(data))) + sc.sendWindowUpdate(nil, len(data)) // conn-level + return StreamError{id, ErrCodeStreamClosed} } if st.body == nil { panic("internal error: should have a body in this state") } - data := f.Data() // Sender sending more than they'd declared? if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { diff --git a/http2/server_test.go b/http2/server_test.go index a45905f..ac4d351 100644 --- a/http2/server_test.go +++ b/http2/server_test.go @@ -2167,6 +2167,9 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) { // it did before. st.writeData(1, true, []byte("foo")) + // Get our flow control bytes back, since the handler didn't get them. + st.wantWindowUpdate(0, uint32(len("foo"))) + // Sent after a peer sends data anyway (admittedly the // previous RST_STREAM might've still been in-flight), // but they'll get the more friendly 'cancel' code @@ -3301,3 +3304,43 @@ func TestExpect100ContinueAfterHandlerWrites(t *testing.T) { t.Fatalf("second msg = %q; want %q", buf, msg2) } } + +type funcReader func([]byte) (n int, err error) + +func (f funcReader) Read(p []byte) (n int, err error) { return f(p) } + +// golang.org/issue/16481 -- return flow control when streams close with unread data. +// (The Server version of the bug. See also TestUnreadFlowControlReturned_Transport) +func TestUnreadFlowControlReturned_Server(t *testing.T) { + unblock := make(chan bool, 1) + defer close(unblock) + + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) { + // Don't read the 16KB request body. Wait until the client's + // done sending it and then return. This should cause the Server + // to then return those 16KB of flow control to the client. + <-unblock + }, optOnlyServer) + defer st.Close() + + tr := &Transport{TLSClientConfig: tlsConfigInsecure} + defer tr.CloseIdleConnections() + + // This previously hung on the 4th iteration. + for i := 0; i < 6; i++ { + body := io.MultiReader( + io.LimitReader(neverEnding('A'), 16<<10), + funcReader(func([]byte) (n int, err error) { + unblock <- true + return 0, io.EOF + }), + ) + req, _ := http.NewRequest("POST", st.ts.URL, body) + res, err := tr.RoundTrip(req) + if err != nil { + t.Fatal(err) + } + res.Body.Close() + } + +} diff --git a/http2/transport.go b/http2/transport.go index 642f256..b6f6f95 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -1537,10 +1537,27 @@ var errClosedResponseBody = errors.New("http2: response body closed") func (b transportResponseBody) Close() error { cs := b.cs - if cs.bufPipe.Err() != io.EOF { - // TODO: write test for this - cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) + cc := cs.cc + + serverSentStreamEnd := cs.bufPipe.Err() == io.EOF + unread := cs.bufPipe.Len() + + if unread > 0 || !serverSentStreamEnd { + cc.mu.Lock() + cc.wmu.Lock() + if !serverSentStreamEnd { + cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel) + } + // Return connection-level flow control. + if unread > 0 { + cc.inflow.add(int32(unread)) + cc.fr.WriteWindowUpdate(0, uint32(unread)) + } + cc.bw.Flush() + cc.wmu.Unlock() + cc.mu.Unlock() } + cs.bufPipe.BreakWithError(errClosedResponseBody) return nil } @@ -1548,6 +1565,7 @@ func (b transportResponseBody) Close() error { func (rl *clientConnReadLoop) processData(f *DataFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, f.StreamEnded()) + data := f.Data() if cs == nil { cc.mu.Lock() neverSent := cc.nextStreamID @@ -1561,9 +1579,17 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error { // TODO: be stricter here? only silently ignore things which // we canceled, but not things which were closed normally // by the peer? Tough without accumulating too much state. + + // But at least return their flow control: + if len(data) > 0 { + cc.wmu.Lock() + cc.fr.WriteWindowUpdate(0, uint32(len(data))) + cc.bw.Flush() + cc.wmu.Unlock() + } return nil } - if data := f.Data(); len(data) > 0 { + if len(data) > 0 { if cs.bufPipe.b == nil { // Data frame after it's already closed? cc.logf("http2: Transport received DATA frame for closed stream; closing connection") @@ -1730,8 +1756,10 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { } func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { - // TODO: do something with err? send it as a debug frame to the peer? - // But that's only in GOAWAY. Invent a new frame type? Is there one already? + // TODO: map err to more interesting error codes, once the + // HTTP community comes up with some. But currently for + // RST_STREAM there's no equivalent to GOAWAY frame's debug + // data, and the error codes are all pretty vague ("cancel"). cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) cc.bw.Flush() diff --git a/http2/transport_test.go b/http2/transport_test.go index 5887714..f22eeca 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2097,3 +2097,83 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) { } ct.run() } + +// See golang.org/issue/16481 +func TestTransportReturnsUnusedFlowControl(t *testing.T) { + ct := newClientTester(t) + + clientClosed := make(chan bool, 1) + serverWroteBody := make(chan bool, 1) + + ct.client = func() error { + req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + res, err := ct.tr.RoundTrip(req) + if err != nil { + return err + } + <-serverWroteBody + + if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 { + return fmt.Errorf("body read = %v, %v; want 1, nil", n, err) + } + res.Body.Close() // leaving 4999 bytes unread + clientClosed <- true + + return nil + } + ct.server = func() error { + ct.greet() + + var hf *HeadersFrame + for { + f, err := ct.fr.ReadFrame() + if err != nil { + return fmt.Errorf("ReadFrame while waiting for Headers: %v", err) + } + switch f.(type) { + case *WindowUpdateFrame, *SettingsFrame: + continue + } + var ok bool + hf, ok = f.(*HeadersFrame) + if !ok { + return fmt.Errorf("Got %T; want HeadersFrame", f) + } + break + } + + var buf bytes.Buffer + enc := hpack.NewEncoder(&buf) + enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) + enc.WriteField(hpack.HeaderField{Name: "content-length", Value: "5000"}) + ct.fr.WriteHeaders(HeadersFrameParam{ + StreamID: hf.StreamID, + EndHeaders: true, + EndStream: false, + BlockFragment: buf.Bytes(), + }) + ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream + serverWroteBody <- true + + <-clientClosed + + f, err := ct.fr.ReadFrame() + if err != nil { + return fmt.Errorf("ReadFrame while waiting for RSTStreamFrame: %v", err) + } + if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel { + return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f)) + } + + // And wait for our flow control tokens back: + f, err = ct.fr.ReadFrame() + if err != nil { + return fmt.Errorf("ReadFrame while waiting for WindowUpdateFrame: %v", err) + } + if wuf, ok := f.(*WindowUpdateFrame); !ok || wuf.Increment != 4999 { + return fmt.Errorf("Expected WindowUpdateFrame for 4999 bytes; got %v", summarizeFrame(f)) + } + return nil + } + ct.run() +}