author | Brad Fitzpatrick
<bradfitz@golang.org> 2016-08-05 06:12:51 UTC |
committer | Brad Fitzpatrick
<bradfitz@golang.org> 2016-08-05 16:39:04 UTC |
parent | 7c62cfdcccc65f87b0120ec841012ba816fc1aec |
http2/transport.go | +26 | -13 |
http2/transport_test.go | +70 | -0 |
diff --git a/http2/transport.go b/http2/transport.go index 64ec4e7..3cefc22 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -16,6 +16,7 @@ import ( "io" "io/ioutil" "log" + "math" "net" "net/http" "sort" @@ -162,14 +163,14 @@ type ClientConn struct { br *bufio.Reader fr *Framer lastActive time.Time - - // Settings from peer: + // Settings from peer: (also guarded by mu) maxFrameSize uint32 maxConcurrentStreams uint32 initialWindowSize uint32 - hbuf bytes.Buffer // HPACK encoder writes into this - henc *hpack.Encoder - freeBuf [][]byte + + hbuf bytes.Buffer // HPACK encoder writes into this + henc *hpack.Encoder + freeBuf [][]byte wmu sync.Mutex // held while writing; acquire AFTER mu if holding both werr error // first write error that has occurred @@ -427,8 +428,9 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro wantSettingsAck: true, } if VerboseLogs { - t.vlogf("http2: Transport creating client conn %#x to %v", cc, c.RemoteAddr()) + t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) } + cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(initialWindowSize)) @@ -498,7 +500,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool { } return cc.goAway == nil && !cc.closed && int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && - cc.nextStreamID < 2147483647 + cc.nextStreamID < math.MaxInt32 } func (cc *ClientConn) closeIfIdle() { @@ -513,9 +515,8 @@ func (cc *ClientConn) closeIfIdle() { cc.mu.Unlock() if VerboseLogs { - cc.vlogf("http2: Transport closing idle conn %#x (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) + cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) } - cc.tconn.Close() } @@ -1229,7 +1230,7 @@ func (rl *clientConnReadLoop) run() error { for { f, err := cc.fr.ReadFrame() if err != nil { - cc.vlogf("http2: Transport readFrame error on conn %#x: (%T) %v", cc, err, err) + cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(StreamError); ok { if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { @@ -1282,7 +1283,7 @@ func (rl *clientConnReadLoop) run() error { } if err != nil { if VerboseLogs { - cc.vlogf("http2: Transport conn %#x received error from processing frame %v: %v", cc, summarizeFrame(f), err) + cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err) } return err } @@ -1698,11 +1699,23 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error { case SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val case SettingInitialWindowSize: - // TODO: error if this is too large. + // Values above the maximum flow-control + // window size of 2^31-1 MUST be treated as a + // connection error (Section 5.4.1) of type + // FLOW_CONTROL_ERROR. + if s.Val > math.MaxInt32 { + return ConnectionError(ErrCodeFlowControl) + } - // TODO: adjust flow control of still-open + // Adjust flow control of currently-open // frames by the difference of the old initial // window size and this one. + delta := int32(s.Val) - int32(cc.initialWindowSize) + for _, cs := range cc.streams { + cs.flow.add(delta) + } + cc.cond.Broadcast() + cc.initialWindowSize = s.Val default: // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. diff --git a/http2/transport_test.go b/http2/transport_test.go index a09b6c1..9ab4149 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -2230,6 +2230,76 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ct.run() } +// Issue 16612: adjust flow control on open streams when transport +// receives SETTINGS with INITIAL_WINDOW_SIZE from server. +func TestTransportAdjustsFlowControl(t *testing.T) { + ct := newClientTester(t) + clientDone := make(chan struct{}) + + const bodySize = 1 << 20 + + ct.client = func() error { + defer ct.cc.(*net.TCPConn).CloseWrite() + defer close(clientDone) + + req, _ := http.NewRequest("POST", "https://dummy.tld/", struct{ io.Reader }{io.LimitReader(neverEnding('A'), bodySize)}) + res, err := ct.tr.RoundTrip(req) + if err != nil { + return err + } + res.Body.Close() + return nil + } + ct.server = func() error { + _, err := io.ReadFull(ct.sc, make([]byte, len(ClientPreface))) + if err != nil { + return fmt.Errorf("reading client preface: %v", err) + } + + var gotBytes int64 + var sentSettings bool + for { + f, err := ct.fr.ReadFrame() + if err != nil { + select { + case <-clientDone: + return nil + default: + return fmt.Errorf("ReadFrame while waiting for Headers: %v", err) + } + } + switch f := f.(type) { + case *DataFrame: + gotBytes += int64(len(f.Data())) + // After we've got half the client's + // initial flow control window's worth + // of request body data, give it just + // enough flow control to finish. + if gotBytes >= initialWindowSize/2 && !sentSettings { + sentSettings = true + + ct.fr.WriteSettings(Setting{ID: SettingInitialWindowSize, Val: bodySize}) + ct.fr.WriteWindowUpdate(0, bodySize) + ct.fr.WriteSettingsAck() + } + + if f.StreamEnded() { + var buf bytes.Buffer + enc := hpack.NewEncoder(&buf) + enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) + ct.fr.WriteHeaders(HeadersFrameParam{ + StreamID: f.StreamID, + EndHeaders: true, + EndStream: true, + BlockFragment: buf.Bytes(), + }) + } + } + } + } + ct.run() +} + // See golang.org/issue/16556 func TestTransportReturnsDataPaddingFlowControl(t *testing.T) { ct := newClientTester(t)