Commit 243266f6 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

http: fix Transport connection re-use race

A connection shouldn't be made available
for re-use until its body has been consumed.

(except in the case of pipelining, which isn't
implemented yet)

This CL fixes some issues seen with heavy load
against Amazon S3.

Subtle implementation detail: to prevent a race
with the client requesting a new connection
before previous one is returned, we actually
have to call putIdleConnection _before_ we
return from the final Read/Close call on the
http.Response.Body.

R=rsc, adg
CC=golang-dev
https://golang.org/cl/4351048
parent f3ad899a
...@@ -424,25 +424,37 @@ func (pc *persistConn) readLoop() { ...@@ -424,25 +424,37 @@ func (pc *persistConn) readLoop() {
rc := <-pc.reqch rc := <-pc.reqch
resp, err := pc.cc.Read(rc.req) resp, err := pc.cc.Read(rc.req)
if err == nil && !rc.req.Close {
pc.t.putIdleConn(pc)
}
if err == ErrPersistEOF { if err == ErrPersistEOF {
// Succeeded, but we can't send any more // Succeeded, but we can't send any more
// persistent connections on this again. We // persistent connections on this again. We
// hide this error to upstream callers. // hide this error to upstream callers.
alive = false alive = false
err = nil err = nil
} else if err != nil { } else if err != nil || rc.req.Close {
alive = false alive = false
} }
hasBody := resp != nil && resp.ContentLength != 0 hasBody := resp != nil && resp.ContentLength != 0
var waitForBodyRead chan bool
if alive {
if hasBody {
waitForBodyRead = make(chan bool)
resp.Body.(*bodyEOFSignal).fn = func() {
pc.t.putIdleConn(pc)
waitForBodyRead <- true
}
} else {
pc.t.putIdleConn(pc)
}
}
rc.ch <- responseAndError{resp, err} rc.ch <- responseAndError{resp, err}
// Wait for the just-returned response body to be fully consumed // Wait for the just-returned response body to be fully consumed
// before we race and peek on the underlying bufio reader. // before we race and peek on the underlying bufio reader.
if alive && hasBody { if waitForBodyRead != nil {
<-resp.Body.(*bodyEOFSignal).ch <-waitForBodyRead
} }
} }
} }
...@@ -514,33 +526,33 @@ func responseIsKeepAlive(res *Response) bool { ...@@ -514,33 +526,33 @@ func responseIsKeepAlive(res *Response) bool {
func readResponseWithEOFSignal(r *bufio.Reader, requestMethod string) (resp *Response, err os.Error) { func readResponseWithEOFSignal(r *bufio.Reader, requestMethod string) (resp *Response, err os.Error) {
resp, err = ReadResponse(r, requestMethod) resp, err = ReadResponse(r, requestMethod)
if err == nil && resp.ContentLength != 0 { if err == nil && resp.ContentLength != 0 {
resp.Body = &bodyEOFSignal{resp.Body, make(chan bool, 1), false} resp.Body = &bodyEOFSignal{resp.Body, nil}
} }
return return
} }
// bodyEOFSignal wraps a ReadCloser but sends on ch once once // bodyEOFSignal wraps a ReadCloser but runs fn (if non-nil) at most
// the wrapped ReadCloser is fully consumed (including on Close) // once, right before the final Read() or Close() call returns, but after
// EOF has been seen.
type bodyEOFSignal struct { type bodyEOFSignal struct {
body io.ReadCloser body io.ReadCloser
ch chan bool fn func()
done bool
} }
func (es *bodyEOFSignal) Read(p []byte) (n int, err os.Error) { func (es *bodyEOFSignal) Read(p []byte) (n int, err os.Error) {
n, err = es.body.Read(p) n, err = es.body.Read(p)
if err == os.EOF && !es.done { if err == os.EOF && es.fn != nil {
es.ch <- true es.fn()
es.done = true es.fn = nil
} }
return return
} }
func (es *bodyEOFSignal) Close() (err os.Error) { func (es *bodyEOFSignal) Close() (err os.Error) {
err = es.body.Close() err = es.body.Close()
if err == nil && !es.done { if err == nil && es.fn != nil {
es.ch <- true es.fn()
es.done = true es.fn = nil
} }
return return
} }
...@@ -85,6 +85,7 @@ func TestTransportConnectionCloseOnResponse(t *testing.T) { ...@@ -85,6 +85,7 @@ func TestTransportConnectionCloseOnResponse(t *testing.T) {
t.Fatalf("error in connectionClose=%v, req #%d, Do: %v", connectionClose, n, err) t.Fatalf("error in connectionClose=%v, req #%d, Do: %v", connectionClose, n, err)
} }
body, err := ioutil.ReadAll(res.Body) body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()
if err != nil { if err != nil {
t.Fatalf("error in connectionClose=%v, req #%d, ReadAll: %v", connectionClose, n, err) t.Fatalf("error in connectionClose=%v, req #%d, ReadAll: %v", connectionClose, n, err)
} }
...@@ -154,9 +155,11 @@ func TestTransportIdleCacheKeys(t *testing.T) { ...@@ -154,9 +155,11 @@ func TestTransportIdleCacheKeys(t *testing.T) {
t.Errorf("After CloseIdleConnections expected %d idle conn cache keys; got %d", e, g) t.Errorf("After CloseIdleConnections expected %d idle conn cache keys; got %d", e, g)
} }
if _, _, err := c.Get(ts.URL); err != nil { resp, _, err := c.Get(ts.URL)
if err != nil {
t.Error(err) t.Error(err)
} }
ioutil.ReadAll(resp.Body)
keys := tr.IdleConnKeysForTesting() keys := tr.IdleConnKeysForTesting()
if e, g := 1, len(keys); e != g { if e, g := 1, len(keys); e != g {
...@@ -187,7 +190,11 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) { ...@@ -187,7 +190,11 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) {
// ch) // ch)
donech := make(chan bool) donech := make(chan bool)
doReq := func() { doReq := func() {
c.Get(ts.URL) resp, _, err := c.Get(ts.URL)
if err != nil {
t.Error(err)
}
ioutil.ReadAll(resp.Body)
donech <- true donech <- true
} }
go doReq() go doReq()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment