Commit 075e191f authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

http2: adjust flow control on open streams when processing SETTINGS

The http2 spec says:

> When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver
> MUST adjust the size of all stream flow-control windows that it
> maintains by the difference between the new value and the old value.

We didn't do this before, and it never mattered until
https://golang.org/cl/25362 for golang/go#16519 because we always knew
the peer's initial window size.

Once we started writing request bodies before hearing the peer's
setting (and thus assuming 64KB), it became very important that this
TODO was done. Should've done it earlier.

More details in the bug.

Updates golang/go#16612 (fixes after bundle into std)

Change-Id: I0ac0280bdd5f6e933ad82f8c9df3c4528295bac2
Reviewed-on: https://go-review.googlesource.com/25508Reviewed-by: 's avatarIan Lance Taylor <iant@golang.org>
Reviewed-by: 's avatarChris Broadfoot <cbro@golang.org>
parent 7c62cfdc
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"net" "net"
"net/http" "net/http"
"sort" "sort"
...@@ -162,14 +163,14 @@ type ClientConn struct { ...@@ -162,14 +163,14 @@ type ClientConn struct {
br *bufio.Reader br *bufio.Reader
fr *Framer fr *Framer
lastActive time.Time lastActive time.Time
// Settings from peer: (also guarded by mu)
// Settings from peer:
maxFrameSize uint32 maxFrameSize uint32
maxConcurrentStreams uint32 maxConcurrentStreams uint32
initialWindowSize uint32 initialWindowSize uint32
hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder hbuf bytes.Buffer // HPACK encoder writes into this
freeBuf [][]byte henc *hpack.Encoder
freeBuf [][]byte
wmu sync.Mutex // held while writing; acquire AFTER mu if holding both wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
werr error // first write error that has occurred werr error // first write error that has occurred
...@@ -427,8 +428,9 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro ...@@ -427,8 +428,9 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
wantSettingsAck: true, wantSettingsAck: true,
} }
if VerboseLogs { if VerboseLogs {
t.vlogf("http2: Transport creating client conn %#x to %v", cc, c.RemoteAddr()) t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
} }
cc.cond = sync.NewCond(&cc.mu) cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(initialWindowSize)) cc.flow.add(int32(initialWindowSize))
...@@ -498,7 +500,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool { ...@@ -498,7 +500,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
} }
return cc.goAway == nil && !cc.closed && return cc.goAway == nil && !cc.closed &&
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
cc.nextStreamID < 2147483647 cc.nextStreamID < math.MaxInt32
} }
func (cc *ClientConn) closeIfIdle() { func (cc *ClientConn) closeIfIdle() {
...@@ -513,9 +515,8 @@ func (cc *ClientConn) closeIfIdle() { ...@@ -513,9 +515,8 @@ func (cc *ClientConn) closeIfIdle() {
cc.mu.Unlock() cc.mu.Unlock()
if VerboseLogs { if VerboseLogs {
cc.vlogf("http2: Transport closing idle conn %#x (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
} }
cc.tconn.Close() cc.tconn.Close()
} }
...@@ -1229,7 +1230,7 @@ func (rl *clientConnReadLoop) run() error { ...@@ -1229,7 +1230,7 @@ func (rl *clientConnReadLoop) run() error {
for { for {
f, err := cc.fr.ReadFrame() f, err := cc.fr.ReadFrame()
if err != nil { if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %#x: (%T) %v", cc, err, err) cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
} }
if se, ok := err.(StreamError); ok { if se, ok := err.(StreamError); ok {
if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
...@@ -1282,7 +1283,7 @@ func (rl *clientConnReadLoop) run() error { ...@@ -1282,7 +1283,7 @@ func (rl *clientConnReadLoop) run() error {
} }
if err != nil { if err != nil {
if VerboseLogs { if VerboseLogs {
cc.vlogf("http2: Transport conn %#x received error from processing frame %v: %v", cc, summarizeFrame(f), err) cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
} }
return err return err
} }
...@@ -1698,11 +1699,23 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error { ...@@ -1698,11 +1699,23 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
case SettingMaxConcurrentStreams: case SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val cc.maxConcurrentStreams = s.Val
case SettingInitialWindowSize: case SettingInitialWindowSize:
// TODO: error if this is too large. // Values above the maximum flow-control
// window size of 2^31-1 MUST be treated as a
// connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR.
if s.Val > math.MaxInt32 {
return ConnectionError(ErrCodeFlowControl)
}
// TODO: adjust flow control of still-open // Adjust flow control of currently-open
// frames by the difference of the old initial // frames by the difference of the old initial
// window size and this one. // window size and this one.
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
cs.flow.add(delta)
}
cc.cond.Broadcast()
cc.initialWindowSize = s.Val cc.initialWindowSize = s.Val
default: default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
......
...@@ -2230,6 +2230,76 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ...@@ -2230,6 +2230,76 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) {
ct.run() ct.run()
} }
// Issue 16612: adjust flow control on open streams when transport
// receives SETTINGS with INITIAL_WINDOW_SIZE from server.
func TestTransportAdjustsFlowControl(t *testing.T) {
ct := newClientTester(t)
clientDone := make(chan struct{})
const bodySize = 1 << 20
ct.client = func() error {
defer ct.cc.(*net.TCPConn).CloseWrite()
defer close(clientDone)
req, _ := http.NewRequest("POST", "https://dummy.tld/", struct{ io.Reader }{io.LimitReader(neverEnding('A'), bodySize)})
res, err := ct.tr.RoundTrip(req)
if err != nil {
return err
}
res.Body.Close()
return nil
}
ct.server = func() error {
_, err := io.ReadFull(ct.sc, make([]byte, len(ClientPreface)))
if err != nil {
return fmt.Errorf("reading client preface: %v", err)
}
var gotBytes int64
var sentSettings bool
for {
f, err := ct.fr.ReadFrame()
if err != nil {
select {
case <-clientDone:
return nil
default:
return fmt.Errorf("ReadFrame while waiting for Headers: %v", err)
}
}
switch f := f.(type) {
case *DataFrame:
gotBytes += int64(len(f.Data()))
// After we've got half the client's
// initial flow control window's worth
// of request body data, give it just
// enough flow control to finish.
if gotBytes >= initialWindowSize/2 && !sentSettings {
sentSettings = true
ct.fr.WriteSettings(Setting{ID: SettingInitialWindowSize, Val: bodySize})
ct.fr.WriteWindowUpdate(0, bodySize)
ct.fr.WriteSettingsAck()
}
if f.StreamEnded() {
var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
ct.fr.WriteHeaders(HeadersFrameParam{
StreamID: f.StreamID,
EndHeaders: true,
EndStream: true,
BlockFragment: buf.Bytes(),
})
}
}
}
}
ct.run()
}
// See golang.org/issue/16556 // See golang.org/issue/16556
func TestTransportReturnsDataPaddingFlowControl(t *testing.T) { func TestTransportReturnsDataPaddingFlowControl(t *testing.T) {
ct := newClientTester(t) ct := newClientTester(t)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment