Commit d4223d67 authored by Tom Bergan's avatar Tom Bergan

http2: refund connection flow control on DATA frames received after reset

If the transport had previously sent a RST_STREAM but had not yet
deleted the stream from its list of active streams, we should refund
connection-level flow control for any DATA frame received as such
DATA frames will never be read.

We already refund connection-level flow control if a stream closes with
unread data in bufPipe. However, when we receive a DATA frame after
reset, we don't bother writing it to bufPipe, so we have to refund the
flow control separately.

Updates golang/go#20469

Change-Id: I5a9810a5d6b1bd7e291173af53646246545a6665
Reviewed-on: https://go-review.googlesource.com/46591
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarBrad Fitzpatrick <bradfitz@golang.org>
parent 8663ed5d
...@@ -1713,16 +1713,27 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error { ...@@ -1713,16 +1713,27 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
} }
// Return any padded flow control now, since we won't // Return any padded flow control now, since we won't
// refund it later on body reads. // refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 { var refund int
cs.inflow.add(pad) if pad := int(f.Length) - len(data); pad > 0 {
cc.inflow.add(pad) refund += pad
}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {
refund += len(data)
}
if refund > 0 {
cc.inflow.add(int32(refund))
cc.wmu.Lock() cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(pad)) cc.fr.WriteWindowUpdate(0, uint32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(pad)) if !didReset {
cs.inflow.add(int32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
}
cc.bw.Flush() cc.bw.Flush()
cc.wmu.Unlock() cc.wmu.Unlock()
} }
didReset := cs.didReset
cc.mu.Unlock() cc.mu.Unlock()
if len(data) > 0 && !didReset { if len(data) > 0 && !didReset {
......
...@@ -2210,12 +2210,11 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) { ...@@ -2210,12 +2210,11 @@ func testTransportUsesGoAwayDebugError(t *testing.T, failMidBody bool) {
ct.run() ct.run()
} }
// See golang.org/issue/16481 func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
func TestTransportReturnsUnusedFlowControl(t *testing.T) {
ct := newClientTester(t) ct := newClientTester(t)
clientClosed := make(chan bool, 1) clientClosed := make(chan struct{})
serverWroteBody := make(chan bool, 1) serverWroteFirstByte := make(chan struct{})
ct.client = func() error { ct.client = func() error {
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil) req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
...@@ -2223,13 +2222,13 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ...@@ -2223,13 +2222,13 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) {
if err != nil { if err != nil {
return err return err
} }
<-serverWroteBody <-serverWroteFirstByte
if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 { if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 {
return fmt.Errorf("body read = %v, %v; want 1, nil", n, err) return fmt.Errorf("body read = %v, %v; want 1, nil", n, err)
} }
res.Body.Close() // leaving 4999 bytes unread res.Body.Close() // leaving 4999 bytes unread
clientClosed <- true close(clientClosed)
return nil return nil
} }
...@@ -2264,10 +2263,27 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ...@@ -2264,10 +2263,27 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) {
EndStream: false, EndStream: false,
BlockFragment: buf.Bytes(), BlockFragment: buf.Bytes(),
}) })
ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream
serverWroteBody <- true
<-clientClosed // Two cases:
// - Send one DATA frame with 5000 bytes.
// - Send two DATA frames with 1 and 4999 bytes each.
//
// In both cases, the client should consume one byte of data,
// refund that byte, then refund the following 4999 bytes.
//
// In the second case, the server waits for the client connection to
// close before seconding the second DATA frame. This tests the case
// where the client receives a DATA frame after it has reset the stream.
if oneDataFrame {
ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 5000))
close(serverWroteFirstByte)
<-clientClosed
} else {
ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 1))
close(serverWroteFirstByte)
<-clientClosed
ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 4999))
}
waitingFor := "RSTStreamFrame" waitingFor := "RSTStreamFrame"
for { for {
...@@ -2281,7 +2297,7 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ...@@ -2281,7 +2297,7 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) {
switch waitingFor { switch waitingFor {
case "RSTStreamFrame": case "RSTStreamFrame":
if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel { if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel {
return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f)) return fmt.Errorf("Expected a RSTStreamFrame with code cancel; got %v", summarizeFrame(f))
} }
waitingFor = "WindowUpdateFrame" waitingFor = "WindowUpdateFrame"
case "WindowUpdateFrame": case "WindowUpdateFrame":
...@@ -2295,6 +2311,16 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) { ...@@ -2295,6 +2311,16 @@ func TestTransportReturnsUnusedFlowControl(t *testing.T) {
ct.run() ct.run()
} }
// See golang.org/issue/16481
func TestTransportReturnsUnusedFlowControlSingleWrite(t *testing.T) {
testTransportReturnsUnusedFlowControl(t, true)
}
// See golang.org/issue/20469
func TestTransportReturnsUnusedFlowControlMultipleWrites(t *testing.T) {
testTransportReturnsUnusedFlowControl(t, false)
}
// Issue 16612: adjust flow control on open streams when transport // Issue 16612: adjust flow control on open streams when transport
// receives SETTINGS with INITIAL_WINDOW_SIZE from server. // receives SETTINGS with INITIAL_WINDOW_SIZE from server.
func TestTransportAdjustsFlowControl(t *testing.T) { func TestTransportAdjustsFlowControl(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment