Commit 1fe39339 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: fix Transport race returning bodyless responses and reusing conns

The Transport had a delicate protocol between its readLoop goroutine
and the goroutine calling RoundTrip. The basic concern is that the
caller's RoundTrip goroutine wants to wait for either a
connection-level error (the conn dying) or the response. But sometimes
both happen: there's a valid response (without a body), but the conn
is also going away. Both goroutines' logic dealing with this had grown
large and complicated with hard-to-follow comments over the years.

Simplify and document. Pull some bits into functions and do all
bodyless stuff in one place (it's special enough), rather than having
a bunch of conditionals scattered everywhere. One test is no longer
even applicable since the race it tested is no longer possible (the
code doesn't exist).

The bug that this fixes is that when the Transport reads a bodyless
response from a server, it was returning that response before
returning the persistent connection to the idle pool. As a result,
~1/1000 of serial requests would end up creating a new connection
rather than re-using the just-used connection due to goroutine
scheduling chance. Instead, this now adds bodyless responses'
connections back to the idle pool first, then sends the response to
the RoundTrip goroutine, but making sure that the RoundTrip goroutine
is outside of its select on the connection dying.

There's a new buffered channel involved now, which is a minor
complication, but it's much more self-contained and well-documented
than the previous complexity. (The alternative of making the
responseAndError channel itself unbuffered is too invasive and risky
at this point; it would require a number of changes to avoid
deadlocked goroutines in error cases)

In any case, flakes look to be gone now. We'll see if trybots agree.

Fixes #13633

Change-Id: I95a22942b2aa334ae7c87331fddd751d4cdfdffc
Reviewed-on: https://go-review.googlesource.com/17890Reviewed-by: 's avatarRuss Cox <rsc@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
parent 1babba2e
...@@ -34,10 +34,9 @@ func init() { ...@@ -34,10 +34,9 @@ func init() {
} }
var ( var (
SetInstallConnClosedHook = hookSetter(&testHookPersistConnClosedGotRes) SetEnterRoundTripHook = hookSetter(&testHookEnterRoundTrip)
SetEnterRoundTripHook = hookSetter(&testHookEnterRoundTrip) SetTestHookWaitResLoop = hookSetter(&testHookWaitResLoop)
SetTestHookWaitResLoop = hookSetter(&testHookWaitResLoop) SetRoundTripRetried = hookSetter(&testHookRoundTripRetried)
SetRoundTripRetried = hookSetter(&testHookRoundTripRetried)
) )
func SetReadLoopBeforeNextReadHook(f func()) { func SetReadLoopBeforeNextReadHook(f func()) {
......
...@@ -970,7 +970,9 @@ func (pc *persistConn) cancelRequest() { ...@@ -970,7 +970,9 @@ func (pc *persistConn) cancelRequest() {
} }
func (pc *persistConn) readLoop() { func (pc *persistConn) readLoop() {
// eofc is used to block http.Handler goroutines reading from Response.Body defer pc.close()
// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection // at EOF until this goroutines has (potentially) added the connection
// back to the idle pool. // back to the idle pool.
eofc := make(chan struct{}) eofc := make(chan struct{})
...@@ -983,20 +985,14 @@ func (pc *persistConn) readLoop() { ...@@ -983,20 +985,14 @@ func (pc *persistConn) readLoop() {
alive := true alive := true
for alive { for alive {
pb, err := pc.br.Peek(1) _, err := pc.br.Peek(1)
if err != nil { if err != nil {
err = beforeRespHeaderError{err} err = beforeRespHeaderError{err}
} }
pc.lk.Lock() pc.lk.Lock()
if pc.numExpectedResponses == 0 { if pc.numExpectedResponses == 0 {
if !pc.closed { pc.readLoopPeekFailLocked(err)
pc.closeLocked()
if len(pb) > 0 {
buf, _ := pc.br.Peek(pc.br.Buffered())
log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, err)
}
}
pc.lk.Unlock() pc.lk.Unlock()
return return
} }
...@@ -1006,122 +1002,152 @@ func (pc *persistConn) readLoop() { ...@@ -1006,122 +1002,152 @@ func (pc *persistConn) readLoop() {
var resp *Response var resp *Response
if err == nil { if err == nil {
resp, err = ReadResponse(pc.br, rc.req) resp, err = pc.readResponse(rc)
if err == nil {
if rc.continueCh != nil {
if resp.StatusCode == 100 {
rc.continueCh <- struct{}{}
} else {
close(rc.continueCh)
}
}
if resp.StatusCode == 100 {
resp, err = ReadResponse(pc.br, rc.req)
}
}
}
if resp != nil {
resp.TLS = pc.tlsState
} }
hasBody := resp != nil && rc.req.Method != "HEAD" && resp.ContentLength != 0
if err != nil { if err != nil {
pc.close() // If we won't be able to retry this request later (from the
} else { // roundTrip goroutine), mark it as done now.
if rc.addedGzip && hasBody && resp.Header.Get("Content-Encoding") == "gzip" { // BEFORE the send on rc.ch, as the client might re-use the
resp.Header.Del("Content-Encoding") // same *Request pointer, and we don't want to set call
resp.Header.Del("Content-Length") // t.setReqCanceler from this persistConn while the Transport
resp.ContentLength = -1 // potentially spins up a different persistConn for the
resp.Body = &gzipReader{body: resp.Body} // caller's subsequent request.
if checkTransportResend(err, rc.req, pc) != nil {
pc.t.setReqCanceler(rc.req, nil)
} }
resp.Body = &bodyEOFSignal{body: resp.Body} rc.ch <- responseAndError{err: err}
return
} }
if err != nil || resp.Close || rc.req.Close || resp.StatusCode <= 199 { pc.lk.Lock()
pc.numExpectedResponses--
pc.lk.Unlock()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
if resp.Close || rc.req.Close || resp.StatusCode <= 199 {
// Don't do keep-alive on error if either party requested a close // Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response. // or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above. // StatusCode 100 is already handled above.
alive = false alive = false
} }
var waitForBodyRead chan bool // channel is nil when there's no body if !hasBody {
if hasBody { pc.t.setReqCanceler(rc.req, nil)
waitForBodyRead = make(chan bool, 2) resc := make(chan *Response) // unbuffered matters; see below
resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error { rc.ch <- responseAndError{ch: resc} // buffered send
waitForBodyRead <- false
return nil // Put the idle conn back into the pool before we send the response
} // so if they process it quickly and make another request, they'll
resp.Body.(*bodyEOFSignal).fn = func(err error) error { // get this same conn. But we use the unbuffered channel 'rc'
isEOF := err == io.EOF // to guarantee that persistConn.roundTrip got out of its select
waitForBodyRead <- isEOF // potentially waiting for this persistConn to close.
if isEOF { // but after
<-eofc // see comment at top alive = alive &&
} else if err != nil && pc.isCanceled() { !pc.sawEOF &&
return errRequestCanceled pc.wroteRequest() &&
} pc.t.putIdleConn(pc)
return err
} resc <- resp // unbuffered send
} else {
// Before send on rc.ch, as client might re-use the // Now that they've read from the unbuffered channel, they're safely
// same *Request pointer, and we don't want to set this // out of the select that also waits on this goroutine to die, so
// on t from this persistConn while the Transport // we're allowed to exit now if needed (if alive is false)
// potentially spins up a different persistConn for the testHookReadLoopBeforeNextRead()
// caller's subsequent request. continue
//
// If this request will be retried, don't clear the reqCanceler
// yet or else roundTrip thinks it's been canceled.
if err == nil ||
checkTransportResend(err, rc.req, pc) != nil {
pc.t.setReqCanceler(rc.req, nil)
}
} }
pc.lk.Lock() if rc.addedGzip {
pc.numExpectedResponses-- maybeUngzipResponse(resp)
pc.lk.Unlock() }
resp.Body = &bodyEOFSignal{body: resp.Body}
// The connection might be going away when we put the waitForBodyRead := make(chan bool, 2)
// idleConn below. When that happens, we close the response channel to signal resp.Body.(*bodyEOFSignal).earlyCloseFn = func() error {
// to roundTrip that the connection is gone. roundTrip waits for waitForBodyRead <- false
// both closing and a response in a select, so it might choose return nil
// the close channel, rather than the response. }
// We send the response first so that roundTrip can check resp.Body.(*bodyEOFSignal).fn = func(err error) error {
// if there is a pending one with a non-blocking select isEOF := err == io.EOF
// on the response channel before erroring out. waitForBodyRead <- isEOF
rc.ch <- responseAndError{resp, err} if isEOF {
<-eofc // see comment above eofc declaration
if hasBody { } else if err != nil && pc.isCanceled() {
// To avoid a race, wait for the just-returned return errRequestCanceled
// response body to be fully consumed before peek on
// the underlying bufio reader.
select {
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
pc.t.putIdleConn(pc)
if bodyEOF {
eofc <- struct{}{}
}
case <-pc.closech:
alive = false
} }
} else { return err
}
rc.ch <- responseAndError{r: resp}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancelation or death)
select {
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive && alive = alive &&
bodyEOF &&
!pc.sawEOF && !pc.sawEOF &&
pc.wroteRequest() && pc.wroteRequest() &&
pc.t.putIdleConn(pc) pc.t.putIdleConn(pc)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-pc.closech:
alive = false
} }
testHookReadLoopBeforeNextRead() testHookReadLoopBeforeNextRead()
} }
pc.close() }
func maybeUngzipResponse(resp *Response) {
if resp.Header.Get("Content-Encoding") == "gzip" {
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Body = &gzipReader{body: resp.Body}
}
}
func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
if pc.closed {
return
}
if n := pc.br.Buffered(); n > 0 {
buf, _ := pc.br.Peek(n)
log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
}
pc.closeLocked()
}
// readResponse reads an HTTP response (or two, in the case of "Expect:
// 100-continue") from the server. It returns the final non-100 one.
func (pc *persistConn) readResponse(rc requestAndChan) (resp *Response, err error) {
resp, err = ReadResponse(pc.br, rc.req)
if err != nil {
return
}
if rc.continueCh != nil {
if resp.StatusCode == 100 {
rc.continueCh <- struct{}{}
} else {
close(rc.continueCh)
}
}
if resp.StatusCode == 100 {
resp, err = ReadResponse(pc.br, rc.req)
if err != nil {
return
}
}
resp.TLS = pc.tlsState
return
} }
// waitForContinue returns the function to block until // waitForContinue returns the function to block until
...@@ -1198,11 +1224,25 @@ func (pc *persistConn) wroteRequest() bool { ...@@ -1198,11 +1224,25 @@ func (pc *persistConn) wroteRequest() bool {
} }
} }
// responseAndError is how the goroutine reading from an HTTP/1 server
// communicates with the goroutine doing the RoundTrip.
type responseAndError struct { type responseAndError struct {
res *Response ch chan *Response // if non-nil, res should be read from here
r *Response // else use this response (see res method)
err error err error
} }
func (re responseAndError) res() *Response {
switch {
case re.err != nil:
return nil
case re.ch != nil:
return <-re.ch
default:
return re.r
}
}
type requestAndChan struct { type requestAndChan struct {
req *Request req *Request
ch chan responseAndError ch chan responseAndError
...@@ -1250,12 +1290,11 @@ func nop() {} ...@@ -1250,12 +1290,11 @@ func nop() {}
// testHooks. Always non-nil. // testHooks. Always non-nil.
var ( var (
testHookPersistConnClosedGotRes = nop testHookEnterRoundTrip = nop
testHookEnterRoundTrip = nop testHookWaitResLoop = nop
testHookWaitResLoop = nop testHookRoundTripRetried = nop
testHookRoundTripRetried = nop testHookPrePendingDial = nop
testHookPrePendingDial = nop testHookPostPendingDial = nop
testHookPostPendingDial = nop
testHookMu sync.Locker = fakeLocker{} // guards following testHookMu sync.Locker = fakeLocker{} // guards following
testHookReadLoopBeforeNextRead = nop testHookReadLoopBeforeNextRead = nop
...@@ -1347,7 +1386,7 @@ WaitResponse: ...@@ -1347,7 +1386,7 @@ WaitResponse:
} }
} }
if err != nil { if err != nil {
re = responseAndError{nil, beforeRespHeaderError{err}} re = responseAndError{err: beforeRespHeaderError{err}}
pc.close() pc.close()
break WaitResponse break WaitResponse
} }
...@@ -1357,25 +1396,13 @@ WaitResponse: ...@@ -1357,25 +1396,13 @@ WaitResponse:
respHeaderTimer = timer.C respHeaderTimer = timer.C
} }
case <-pc.closech: case <-pc.closech:
// The persist connection is dead. This shouldn't var err error
// usually happen (only with Connection: close responses if pc.isCanceled() {
// with no response bodies), but if it does happen it err = errRequestCanceled
// means either a) the remote server hung up on us } else {
// prematurely, or b) the readLoop sent us a response & err = beforeRespHeaderError{errClosed}
// closed its closech at roughly the same time, and we
// selected this case first. If we got a response, readLoop makes sure
// to send it before it puts the conn and closes the channel.
// That way, we can fetch the response, if there is one,
// with a non-blocking receive.
select {
case re = <-resc:
testHookPersistConnClosedGotRes()
default:
re = responseAndError{err: beforeRespHeaderError{errClosed}}
if pc.isCanceled() {
re = responseAndError{err: errRequestCanceled}
}
} }
re = responseAndError{err: err}
break WaitResponse break WaitResponse
case <-respHeaderTimer: case <-respHeaderTimer:
pc.close() pc.close()
...@@ -1392,7 +1419,7 @@ WaitResponse: ...@@ -1392,7 +1419,7 @@ WaitResponse:
if re.err != nil { if re.err != nil {
pc.t.setReqCanceler(req.Request, nil) pc.t.setReqCanceler(req.Request, nil)
} }
return re.res, re.err return re.res(), re.err
} }
// markBroken marks a connection as broken (so it's not reused). // markBroken marks a connection as broken (so it's not reused).
......
...@@ -602,6 +602,7 @@ func TestTransportHeadChunkedResponse(t *testing.T) { ...@@ -602,6 +602,7 @@ func TestTransportHeadChunkedResponse(t *testing.T) {
tr := &Transport{DisableKeepAlives: false} tr := &Transport{DisableKeepAlives: false}
c := &Client{Transport: tr} c := &Client{Transport: tr}
defer tr.CloseIdleConnections()
// Ensure that we wait for the readLoop to complete before // Ensure that we wait for the readLoop to complete before
// calling Head again // calling Head again
...@@ -2661,62 +2662,6 @@ func TestTransportRangeAndGzip(t *testing.T) { ...@@ -2661,62 +2662,6 @@ func TestTransportRangeAndGzip(t *testing.T) {
res.Body.Close() res.Body.Close()
} }
// Previously, we used to handle a logical race within RoundTrip by waiting for 100ms
// in the case of an error. Changing the order of the channel operations got rid of this
// race.
//
// In order to test that the channel op reordering works, we install a hook into the
// roundTrip function which gets called if we saw the connection go away and
// we subsequently received a response.
func TestTransportResponseCloseRace(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
defer afterTest(t)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
}))
defer ts.Close()
sawRace := false
SetInstallConnClosedHook(func() {
sawRace = true
})
defer SetInstallConnClosedHook(nil)
SetTestHookWaitResLoop(func() {
// Make the select race much more likely by blocking before
// the select, so both will be ready by the time the
// select runs.
time.Sleep(50 * time.Millisecond)
})
defer SetTestHookWaitResLoop(nil)
tr := &Transport{
DisableKeepAlives: true,
}
req, err := NewRequest("GET", ts.URL, nil)
if err != nil {
t.Fatal(err)
}
// selects are not deterministic, so do this a bunch
// and see if we handle the logical race at least once.
for i := 0; i < 10000; i++ {
resp, err := tr.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error: %s", err)
continue
}
resp.Body.Close()
if sawRace {
t.Logf("saw race after %d iterations", i+1)
break
}
}
if !sawRace {
t.Errorf("didn't see response/connection going away race")
}
}
// Test for issue 10474 // Test for issue 10474
func TestTransportResponseCancelRace(t *testing.T) { func TestTransportResponseCancelRace(t *testing.T) {
defer afterTest(t) defer afterTest(t)
...@@ -2953,6 +2898,40 @@ func TestTransportAutomaticHTTP2(t *testing.T) { ...@@ -2953,6 +2898,40 @@ func TestTransportAutomaticHTTP2(t *testing.T) {
} }
} }
// Issue 13633: there was a race where we returned bodyless responses
// to callers before recycling the persistent connection, which meant
// a client doing two subsequent requests could end up on different
// connections. It's somewhat harmless but enough tests assume it's
// not true in order to test other things that it's worth fixing.
// Plus it's nice to be consistent and not have timing-dependent
// behavior.
func TestTransportReuseConnEmptyResponseBody(t *testing.T) {
defer afterTest(t)
cst := newClientServerTest(t, h1Mode, HandlerFunc(func(w ResponseWriter, r *Request) {
w.Header().Set("X-Addr", r.RemoteAddr)
// Empty response body.
}))
defer cst.close()
n := 100
if testing.Short() {
n = 10
}
var firstAddr string
for i := 0; i < n; i++ {
res, err := cst.c.Get(cst.ts.URL)
if err != nil {
log.Fatal(err)
}
addr := res.Header.Get("X-Addr")
if i == 0 {
firstAddr = addr
} else if addr != firstAddr {
t.Fatalf("On request %d, addr %q != original addr %q", i+1, addr, firstAddr)
}
res.Body.Close()
}
}
func wantBody(res *Response, err error, want string) error { func wantBody(res *Response, err error, want string) error {
if err != nil { if err != nil {
return err return err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment