git » go-net » commit 6a513af

http2: return flow control for closed streams

author Brad Fitzpatrick
2016-07-26 08:19:14 UTC
committer Brad Fitzpatrick
2016-07-26 22:16:01 UTC
parent 9f2c271364b418388d150f9c12ecbf12794095c1

http2: return flow control for closed streams

For both the server and the transport, return connection-level flow control in
two cases: 1) when a stream is closed with buffered data not read by the user,
or 2) when a DATA frame arrives but there the stream has since been closed.

Fixes golang/go#16481

Change-Id: Ic7404180ed04a2903e8fd6e9599a907f88b4f72e
Reviewed-on: https://go-review.googlesource.com/25231
Reviewed-by: Andrew Gerrand <adg@golang.org>

http2/server.go +20 -1
http2/server_test.go +43 -0
http2/transport.go +34 -6
http2/transport_test.go +80 -0

diff --git a/http2/server.go b/http2/server.go
index f368738..dbe6c87 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -1176,6 +1176,10 @@ func (sc *serverConn) closeStream(st *stream, err error) {
 	}
 	delete(sc.streams, st.id)
 	if p := st.body; p != nil {
+		// Return any buffered unread bytes worth of conn-level flow control.
+		// See golang.org/issue/16481
+		sc.sendWindowUpdate(nil, p.Len())
+
 		p.CloseWithError(err)
 	}
 	st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
@@ -1277,6 +1281,8 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
 
 func (sc *serverConn) processData(f *DataFrame) error {
 	sc.serveG.check()
+	data := f.Data()
+
 	// "If a DATA frame is received whose stream is not in "open"
 	// or "half closed (local)" state, the recipient MUST respond
 	// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
@@ -1288,12 +1294,25 @@ func (sc *serverConn) processData(f *DataFrame) error {
 		// the http.Handler returned, so it's done reading &
 		// done writing). Try to stop the client from sending
 		// more DATA.
+
+		// But still enforce their connection-level flow control,
+		// and return any flow control bytes since we're not going
+		// to consume them.
+		if int(sc.inflow.available()) < len(data) {
+			return StreamError{id, ErrCodeFlowControl}
+		}
+		// Deduct the flow control from inflow, since we're
+		// going to immediately add it back in
+		// sendWindowUpdate, which also schedules sending the
+		// frames.
+		sc.inflow.take(int32(len(data)))
+		sc.sendWindowUpdate(nil, len(data)) // conn-level
+
 		return StreamError{id, ErrCodeStreamClosed}
 	}
 	if st.body == nil {
 		panic("internal error: should have a body in this state")
 	}
-	data := f.Data()
 
 	// Sender sending more than they'd declared?
 	if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
diff --git a/http2/server_test.go b/http2/server_test.go
index a45905f..ac4d351 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -2167,6 +2167,9 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
 		// it did before.
 		st.writeData(1, true, []byte("foo"))
 
+		// Get our flow control bytes back, since the handler didn't get them.
+		st.wantWindowUpdate(0, uint32(len("foo")))
+
 		// Sent after a peer sends data anyway (admittedly the
 		// previous RST_STREAM might've still been in-flight),
 		// but they'll get the more friendly 'cancel' code
@@ -3301,3 +3304,43 @@ func TestExpect100ContinueAfterHandlerWrites(t *testing.T) {
 		t.Fatalf("second msg = %q; want %q", buf, msg2)
 	}
 }
+
+type funcReader func([]byte) (n int, err error)
+
+func (f funcReader) Read(p []byte) (n int, err error) { return f(p) }
+
+// golang.org/issue/16481 -- return flow control when streams close with unread data.
+// (The Server version of the bug. See also TestUnreadFlowControlReturned_Transport)
+func TestUnreadFlowControlReturned_Server(t *testing.T) {
+	unblock := make(chan bool, 1)
+	defer close(unblock)
+
+	st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+		// Don't read the 16KB request body. Wait until the client's
+		// done sending it and then return. This should cause the Server
+		// to then return those 16KB of flow control to the client.
+		<-unblock
+	}, optOnlyServer)
+	defer st.Close()
+
+	tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+	defer tr.CloseIdleConnections()
+
+	// This previously hung on the 4th iteration.
+	for i := 0; i < 6; i++ {
+		body := io.MultiReader(
+			io.LimitReader(neverEnding('A'), 16<<10),
+			funcReader(func([]byte) (n int, err error) {
+				unblock <- true
+				return 0, io.EOF
+			}),
+		)
+		req, _ := http.NewRequest("POST", st.ts.URL, body)
+		res, err := tr.RoundTrip(req)
+		if err != nil {
+			t.Fatal(err)
+		}
+		res.Body.Close()
+	}
+
+}
diff --git a/http2/transport.go b/http2/transport.go
index 642f256..b6f6f95 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -1537,10 +1537,27 @@ var errClosedResponseBody = errors.New("http2: response body closed")
 
 func (b transportResponseBody) Close() error {
 	cs := b.cs
-	if cs.bufPipe.Err() != io.EOF {
-		// TODO: write test for this
-		cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+	cc := cs.cc
+
+	serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
+	unread := cs.bufPipe.Len()
+
+	if unread > 0 || !serverSentStreamEnd {
+		cc.mu.Lock()
+		cc.wmu.Lock()
+		if !serverSentStreamEnd {
+			cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
+		}
+		// Return connection-level flow control.
+		if unread > 0 {
+			cc.inflow.add(int32(unread))
+			cc.fr.WriteWindowUpdate(0, uint32(unread))
+		}
+		cc.bw.Flush()
+		cc.wmu.Unlock()
+		cc.mu.Unlock()
 	}
+
 	cs.bufPipe.BreakWithError(errClosedResponseBody)
 	return nil
 }
@@ -1548,6 +1565,7 @@ func (b transportResponseBody) Close() error {
 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
 	cc := rl.cc
 	cs := cc.streamByID(f.StreamID, f.StreamEnded())
+	data := f.Data()
 	if cs == nil {
 		cc.mu.Lock()
 		neverSent := cc.nextStreamID
@@ -1561,9 +1579,17 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
 		// TODO: be stricter here? only silently ignore things which
 		// we canceled, but not things which were closed normally
 		// by the peer? Tough without accumulating too much state.
+
+		// But at least return their flow control:
+		if len(data) > 0 {
+			cc.wmu.Lock()
+			cc.fr.WriteWindowUpdate(0, uint32(len(data)))
+			cc.bw.Flush()
+			cc.wmu.Unlock()
+		}
 		return nil
 	}
-	if data := f.Data(); len(data) > 0 {
+	if len(data) > 0 {
 		if cs.bufPipe.b == nil {
 			// Data frame after it's already closed?
 			cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
@@ -1730,8 +1756,10 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
 }
 
 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
-	// TODO: do something with err? send it as a debug frame to the peer?
-	// But that's only in GOAWAY. Invent a new frame type? Is there one already?
+	// TODO: map err to more interesting error codes, once the
+	// HTTP community comes up with some. But currently for
+	// RST_STREAM there's no equivalent to GOAWAY frame's debug
+	// data, and the error codes are all pretty vague ("cancel").
 	cc.wmu.Lock()
 	cc.fr.WriteRSTStream(streamID, code)
 	cc.bw.Flush()
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 5887714..f22eeca 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2097,3 +2097,83 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) {
 	}
 	ct.run()
 }
+
+// See golang.org/issue/16481
+func TestTransportReturnsUnusedFlowControl(t *testing.T) {
+	ct := newClientTester(t)
+
+	clientClosed := make(chan bool, 1)
+	serverWroteBody := make(chan bool, 1)
+
+	ct.client = func() error {
+		req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
+		res, err := ct.tr.RoundTrip(req)
+		if err != nil {
+			return err
+		}
+		<-serverWroteBody
+
+		if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 {
+			return fmt.Errorf("body read = %v, %v; want 1, nil", n, err)
+		}
+		res.Body.Close() // leaving 4999 bytes unread
+		clientClosed <- true
+
+		return nil
+	}
+	ct.server = func() error {
+		ct.greet()
+
+		var hf *HeadersFrame
+		for {
+			f, err := ct.fr.ReadFrame()
+			if err != nil {
+				return fmt.Errorf("ReadFrame while waiting for Headers: %v", err)
+			}
+			switch f.(type) {
+			case *WindowUpdateFrame, *SettingsFrame:
+				continue
+			}
+			var ok bool
+			hf, ok = f.(*HeadersFrame)
+			if !ok {
+				return fmt.Errorf("Got %T; want HeadersFrame", f)
+			}
+			break
+		}
+
+		var buf bytes.Buffer
+		enc := hpack.NewEncoder(&buf)
+		enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
+		enc.WriteField(hpack.HeaderField{Name: "content-length", Value: "5000"})
+		ct.fr.WriteHeaders(HeadersFrameParam{
+			StreamID:      hf.StreamID,
+			EndHeaders:    true,
+			EndStream:     false,
+			BlockFragment: buf.Bytes(),
+		})
+		ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream
+		serverWroteBody <- true
+
+		<-clientClosed
+
+		f, err := ct.fr.ReadFrame()
+		if err != nil {
+			return fmt.Errorf("ReadFrame while waiting for RSTStreamFrame: %v", err)
+		}
+		if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel {
+			return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f))
+		}
+
+		// And wait for our flow control tokens back:
+		f, err = ct.fr.ReadFrame()
+		if err != nil {
+			return fmt.Errorf("ReadFrame while waiting for WindowUpdateFrame: %v", err)
+		}
+		if wuf, ok := f.(*WindowUpdateFrame); !ok || wuf.Increment != 4999 {
+			return fmt.Errorf("Expected WindowUpdateFrame for 4999 bytes; got %v", summarizeFrame(f))
+		}
+		return nil
+	}
+	ct.run()
+}