Commit 1a26cf06 authored by Tom Bergan's avatar Tom Bergan

http2: Discard DATA frames from the server after the response body is closed

After a response body is closed, we keep writing to the bufPipe. This
accumulates bytes that will never be read, wasting memory. The fix is to
discard the buffer on pipe.BreakWithError.

Updates golang/go#20448

Change-Id: Ia2cf46cb8c401fd8091ef3785eb48fe7b188bb57
Reviewed-on: https://go-review.googlesource.com/43810
Run-TryBot: Tom Bergan <tombergan@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarBrad Fitzpatrick <bradfitz@golang.org>
parent 5961165d
...@@ -15,8 +15,8 @@ import ( ...@@ -15,8 +15,8 @@ import (
// underlying buffer is an interface. (io.Pipe is always unbuffered) // underlying buffer is an interface. (io.Pipe is always unbuffered)
type pipe struct { type pipe struct {
mu sync.Mutex mu sync.Mutex
c sync.Cond // c.L lazily initialized to &p.mu c sync.Cond // c.L lazily initialized to &p.mu
b pipeBuffer b pipeBuffer // nil when done reading
err error // read error once empty. non-nil means closed. err error // read error once empty. non-nil means closed.
breakErr error // immediate read error (caller doesn't see rest of b) breakErr error // immediate read error (caller doesn't see rest of b)
donec chan struct{} // closed on error donec chan struct{} // closed on error
...@@ -32,6 +32,9 @@ type pipeBuffer interface { ...@@ -32,6 +32,9 @@ type pipeBuffer interface {
func (p *pipe) Len() int { func (p *pipe) Len() int {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.b == nil {
return 0
}
return p.b.Len() return p.b.Len()
} }
...@@ -55,6 +58,7 @@ func (p *pipe) Read(d []byte) (n int, err error) { ...@@ -55,6 +58,7 @@ func (p *pipe) Read(d []byte) (n int, err error) {
p.readFn() // e.g. copy trailers p.readFn() // e.g. copy trailers
p.readFn = nil // not sticky like p.err p.readFn = nil // not sticky like p.err
} }
p.b = nil
return 0, p.err return 0, p.err
} }
p.c.Wait() p.c.Wait()
...@@ -75,6 +79,9 @@ func (p *pipe) Write(d []byte) (n int, err error) { ...@@ -75,6 +79,9 @@ func (p *pipe) Write(d []byte) (n int, err error) {
if p.err != nil { if p.err != nil {
return 0, errClosedPipeWrite return 0, errClosedPipeWrite
} }
if p.breakErr != nil {
return len(d), nil // discard when there is no reader
}
return p.b.Write(d) return p.b.Write(d)
} }
...@@ -109,6 +116,9 @@ func (p *pipe) closeWithError(dst *error, err error, fn func()) { ...@@ -109,6 +116,9 @@ func (p *pipe) closeWithError(dst *error, err error, fn func()) {
return return
} }
p.readFn = fn p.readFn = fn
if dst == &p.breakErr {
p.b = nil
}
*dst = err *dst = err
p.closeDoneLocked() p.closeDoneLocked()
} }
......
...@@ -92,6 +92,10 @@ func TestPipeCloseWithError(t *testing.T) { ...@@ -92,6 +92,10 @@ func TestPipeCloseWithError(t *testing.T) {
if err != a { if err != a {
t.Logf("read error = %v, %v", err, a) t.Logf("read error = %v, %v", err, a)
} }
// Write should fail.
if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 {
t.Errorf("Write(abc) after close\ngot =%v, %v\nwant 0, %v", n, err, errClosedPipeWrite)
}
} }
func TestPipeBreakWithError(t *testing.T) { func TestPipeBreakWithError(t *testing.T) {
...@@ -106,4 +110,14 @@ func TestPipeBreakWithError(t *testing.T) { ...@@ -106,4 +110,14 @@ func TestPipeBreakWithError(t *testing.T) {
if err != a { if err != a {
t.Logf("read error = %v, %v", err, a) t.Logf("read error = %v, %v", err, a)
} }
if p.b != nil {
t.Errorf("buffer should be nil after BreakWithError")
}
// Write should succeed silently.
if n, err := p.Write([]byte("abc")); err != nil || n != 3 {
t.Errorf("Write(abc) after break\ngot =%v, %v\nwant 0, nil", n, err)
}
if p.b != nil {
t.Errorf("buffer should be nil after Write")
}
} }
...@@ -1655,6 +1655,7 @@ func (b transportResponseBody) Close() error { ...@@ -1655,6 +1655,7 @@ func (b transportResponseBody) Close() error {
cc.wmu.Lock() cc.wmu.Lock()
if !serverSentStreamEnd { if !serverSentStreamEnd {
cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel) cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
cs.didReset = true
} }
// Return connection-level flow control. // Return connection-level flow control.
if unread > 0 { if unread > 0 {
...@@ -1702,12 +1703,6 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error { ...@@ -1702,12 +1703,6 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
return nil return nil
} }
if f.Length > 0 { if f.Length > 0 {
if len(data) > 0 && cs.bufPipe.b == nil {
// Data frame after it's already closed?
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
return ConnectionError(ErrCodeProtocol)
}
// Check connection-level flow control. // Check connection-level flow control.
cc.mu.Lock() cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) { if cs.inflow.available() >= int32(f.Length) {
......
...@@ -2959,8 +2959,7 @@ func TestTransportAllocationsAfterResponseBodyClose(t *testing.T) { ...@@ -2959,8 +2959,7 @@ func TestTransportAllocationsAfterResponseBodyClose(t *testing.T) {
t.Fatalf("res.Body = %T; want transportResponseBody", res.Body) t.Fatalf("res.Body = %T; want transportResponseBody", res.Body)
} }
if trb.cs.bufPipe.b != nil { if trb.cs.bufPipe.b != nil {
// TODO(tombergan,bradfitz): turn this into an error: t.Errorf("response body pipe is still open")
t.Logf("response body pipe is still open")
} }
gotErr := <-writeErr gotErr := <-writeErr
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment