author | Andrew Gerrand
<adg@golang.org> 2016-06-06 05:43:46 UTC |
committer | Andrew Gerrand
<adg@golang.org> 2016-06-07 04:32:31 UTC |
parent | c4c3ea71919de159c9e246d7be66deb7f0a39a58 |
http2/transport.go | +6 | -2 |
http2/transport_test.go | +55 | -0 |
diff --git a/http2/transport.go b/http2/transport.go index 13a7540..2a4abfa 100644 --- a/http2/transport.go +++ b/http2/transport.go @@ -1459,8 +1459,12 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) { cc.inflow.add(connAdd) } if err == nil { // No need to refresh if the stream is over or failed. - if v := cs.inflow.available(); v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh { - streamAdd = transportDefaultStreamFlow - v + // Consider any buffered body data (read from the conn but not + // consumed by the client) when computing flow control for this + // stream. + v := int(cs.inflow.available()) + cs.bufPipe.b.Len() + if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh { + streamAdd = int32(transportDefaultStreamFlow - v) cs.inflow.add(streamAdd) } } diff --git a/http2/transport_test.go b/http2/transport_test.go index 8f13ea4..631a04b 100644 --- a/http2/transport_test.go +++ b/http2/transport_test.go @@ -1956,3 +1956,58 @@ func TestTransportHandlerBodyClose(t *testing.T) { } } + +// https://golang.org/issue/15930 +func TestTransportFlowControl(t *testing.T) { + const ( + total = 100 << 20 // 100MB + bufLen = 1 << 16 + ) + + var wrote int64 // updated atomically + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) { + b := make([]byte, bufLen) + for wrote < total { + n, err := w.Write(b) + atomic.AddInt64(&wrote, int64(n)) + if err != nil { + t.Errorf("ResponseWriter.Write error: %v", err) + break + } + w.(http.Flusher).Flush() + } + }, optOnlyServer) + + tr := &Transport{TLSClientConfig: tlsConfigInsecure} + defer tr.CloseIdleConnections() + req, err := http.NewRequest("GET", st.ts.URL, nil) + if err != nil { + t.Fatal("NewRequest error:", err) + } + resp, err := tr.RoundTrip(req) + if err != nil { + t.Fatal("RoundTrip error:", err) + } + defer resp.Body.Close() + + var read int64 + b := make([]byte, bufLen) + for { + n, err := resp.Body.Read(b) + if err == io.EOF { + break + } + if err != nil { + t.Fatal("Read error:", err) + } + read += int64(n) + + const max = transportDefaultStreamFlow + if w := atomic.LoadInt64(&wrote); -max > read-w || read-w > max { + t.Fatalf("Too much data inflight: server wrote %v bytes but client only received %v", w, read) + } + + // Let the server get ahead of the client. + time.Sleep(1 * time.Millisecond) + } +}