git » go-net » commit 075e191

http2: adjust flow control on open streams when processing SETTINGS

author Brad Fitzpatrick
2016-08-05 06:12:51 UTC
committer Brad Fitzpatrick
2016-08-05 16:39:04 UTC
parent 7c62cfdcccc65f87b0120ec841012ba816fc1aec

http2: adjust flow control on open streams when processing SETTINGS

The http2 spec says:

> When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver
> MUST adjust the size of all stream flow-control windows that it
> maintains by the difference between the new value and the old value.

We didn't do this before, and it never mattered until
https://golang.org/cl/25362 for golang/go#16519 because we always knew
the peer's initial window size.

Once we started writing request bodies before hearing the peer's
setting (and thus assuming 64KB), it became very important that this
TODO was done. Should've done it earlier.

More details in the bug.

Updates golang/go#16612 (fixes after bundle into std)

Change-Id: I0ac0280bdd5f6e933ad82f8c9df3c4528295bac2
Reviewed-on: https://go-review.googlesource.com/25508
Reviewed-by: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Chris Broadfoot <cbro@golang.org>

http2/transport.go +26 -13
http2/transport_test.go +70 -0

diff --git a/http2/transport.go b/http2/transport.go
index 64ec4e7..3cefc22 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -16,6 +16,7 @@ import (
 	"io"
 	"io/ioutil"
 	"log"
+	"math"
 	"net"
 	"net/http"
 	"sort"
@@ -162,14 +163,14 @@ type ClientConn struct {
 	br              *bufio.Reader
 	fr              *Framer
 	lastActive      time.Time
-
-	// Settings from peer:
+	// Settings from peer: (also guarded by mu)
 	maxFrameSize         uint32
 	maxConcurrentStreams uint32
 	initialWindowSize    uint32
-	hbuf                 bytes.Buffer // HPACK encoder writes into this
-	henc                 *hpack.Encoder
-	freeBuf              [][]byte
+
+	hbuf    bytes.Buffer // HPACK encoder writes into this
+	henc    *hpack.Encoder
+	freeBuf [][]byte
 
 	wmu  sync.Mutex // held while writing; acquire AFTER mu if holding both
 	werr error      // first write error that has occurred
@@ -427,8 +428,9 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
 		wantSettingsAck:      true,
 	}
 	if VerboseLogs {
-		t.vlogf("http2: Transport creating client conn %#x to %v", cc, c.RemoteAddr())
+		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
 	}
+
 	cc.cond = sync.NewCond(&cc.mu)
 	cc.flow.add(int32(initialWindowSize))
 
@@ -498,7 +500,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
 	}
 	return cc.goAway == nil && !cc.closed &&
 		int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
-		cc.nextStreamID < 2147483647
+		cc.nextStreamID < math.MaxInt32
 }
 
 func (cc *ClientConn) closeIfIdle() {
@@ -513,9 +515,8 @@ func (cc *ClientConn) closeIfIdle() {
 	cc.mu.Unlock()
 
 	if VerboseLogs {
-		cc.vlogf("http2: Transport closing idle conn %#x (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
+		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
 	}
-
 	cc.tconn.Close()
 }
 
@@ -1229,7 +1230,7 @@ func (rl *clientConnReadLoop) run() error {
 	for {
 		f, err := cc.fr.ReadFrame()
 		if err != nil {
-			cc.vlogf("http2: Transport readFrame error on conn %#x: (%T) %v", cc, err, err)
+			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
 		}
 		if se, ok := err.(StreamError); ok {
 			if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
@@ -1282,7 +1283,7 @@ func (rl *clientConnReadLoop) run() error {
 		}
 		if err != nil {
 			if VerboseLogs {
-				cc.vlogf("http2: Transport conn %#x received error from processing frame %v: %v", cc, summarizeFrame(f), err)
+				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
 			}
 			return err
 		}
@@ -1698,11 +1699,23 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
 		case SettingMaxConcurrentStreams:
 			cc.maxConcurrentStreams = s.Val
 		case SettingInitialWindowSize:
-			// TODO: error if this is too large.
+			// Values above the maximum flow-control
+			// window size of 2^31-1 MUST be treated as a
+			// connection error (Section 5.4.1) of type
+			// FLOW_CONTROL_ERROR.
+			if s.Val > math.MaxInt32 {
+				return ConnectionError(ErrCodeFlowControl)
+			}
 
-			// TODO: adjust flow control of still-open
+			// Adjust flow control of currently-open
 			// frames by the difference of the old initial
 			// window size and this one.
+			delta := int32(s.Val) - int32(cc.initialWindowSize)
+			for _, cs := range cc.streams {
+				cs.flow.add(delta)
+			}
+			cc.cond.Broadcast()
+
 			cc.initialWindowSize = s.Val
 		default:
 			// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
diff --git a/http2/transport_test.go b/http2/transport_test.go
index a09b6c1..9ab4149 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2230,6 +2230,76 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) {
 	ct.run()
 }
 
+// Issue 16612: adjust flow control on open streams when transport
+// receives SETTINGS with INITIAL_WINDOW_SIZE from server.
+func TestTransportAdjustsFlowControl(t *testing.T) {
+	ct := newClientTester(t)
+	clientDone := make(chan struct{})
+
+	const bodySize = 1 << 20
+
+	ct.client = func() error {
+		defer ct.cc.(*net.TCPConn).CloseWrite()
+		defer close(clientDone)
+
+		req, _ := http.NewRequest("POST", "https://dummy.tld/", struct{ io.Reader }{io.LimitReader(neverEnding('A'), bodySize)})
+		res, err := ct.tr.RoundTrip(req)
+		if err != nil {
+			return err
+		}
+		res.Body.Close()
+		return nil
+	}
+	ct.server = func() error {
+		_, err := io.ReadFull(ct.sc, make([]byte, len(ClientPreface)))
+		if err != nil {
+			return fmt.Errorf("reading client preface: %v", err)
+		}
+
+		var gotBytes int64
+		var sentSettings bool
+		for {
+			f, err := ct.fr.ReadFrame()
+			if err != nil {
+				select {
+				case <-clientDone:
+					return nil
+				default:
+					return fmt.Errorf("ReadFrame while waiting for Headers: %v", err)
+				}
+			}
+			switch f := f.(type) {
+			case *DataFrame:
+				gotBytes += int64(len(f.Data()))
+				// After we've got half the client's
+				// initial flow control window's worth
+				// of request body data, give it just
+				// enough flow control to finish.
+				if gotBytes >= initialWindowSize/2 && !sentSettings {
+					sentSettings = true
+
+					ct.fr.WriteSettings(Setting{ID: SettingInitialWindowSize, Val: bodySize})
+					ct.fr.WriteWindowUpdate(0, bodySize)
+					ct.fr.WriteSettingsAck()
+				}
+
+				if f.StreamEnded() {
+					var buf bytes.Buffer
+					enc := hpack.NewEncoder(&buf)
+					enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
+					ct.fr.WriteHeaders(HeadersFrameParam{
+						StreamID:      f.StreamID,
+						EndHeaders:    true,
+						EndStream:     true,
+						BlockFragment: buf.Bytes(),
+					})
+				}
+			}
+		}
+	}
+	ct.run()
+}
+
 // See golang.org/issue/16556
 func TestTransportReturnsDataPaddingFlowControl(t *testing.T) {
 	ct := newClientTester(t)