Commit 67fa7970 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: update bundled http2; fixes TestTransportAndServerSharedBodyRace_h2

Update bundled http2 to git rev d1ba260648 (https://golang.org/cl/18288).

Fixes the flaky TestTransportAndServerSharedBodyRace_h2.

Also adds some debugging to TestTransportAndServerSharedBodyRace_h2
which I hope won't ever be necessary again, but I know will be.

Fixes #13556

Change-Id: Ibcf2fc23ec0122dcac8891fdc3bd7f8acddd880e
Reviewed-on: https://go-review.googlesource.com/18289Reviewed-by: 's avatarAndrew Gerrand <adg@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent e9fc5221
......@@ -4219,6 +4219,8 @@ type http2clientStream struct {
peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
// owned by clientConnReadLoop:
headersDone bool // got HEADERS w/ END_HEADERS
trailersDone bool // got second HEADERS frame w/ END_HEADERS
......@@ -4227,7 +4229,11 @@ type http2clientStream struct {
resTrailer Header // client's Response.Trailer
}
// awaitRequestCancel runs in its own goroutine and waits for the user's
// awaitRequestCancel runs in its own goroutine and waits for the user
// to either cancel a RoundTrip request (using the provided
// Request.Cancel channel), or for the request to be done (any way it
// might be removed from the cc.streams map: peer reset, successful
// completion, TCP connection breakage, etc)
func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) {
if cancel == nil {
return
......@@ -4235,7 +4241,8 @@ func (cs *http2clientStream) awaitRequestCancel(cancel <-chan struct{}) {
select {
case <-cancel:
cs.bufPipe.CloseWithError(http2errRequestCanceled)
case <-cs.bufPipe.Done():
cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
case <-cs.done:
}
}
......@@ -4594,6 +4601,11 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
cc.mu.Unlock()
if werr != nil {
if hasBody {
req.Body.Close()
}
cc.forgetStreamID(cs.ID)
return nil, werr
}
......@@ -4605,26 +4617,47 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
}()
}
readLoopResCh := cs.resc
requestCanceledCh := http2requestCancel(req)
requestCanceled := false
for {
select {
case re := <-cs.resc:
case re := <-readLoopResCh:
res := re.res
if re.err != nil || res.StatusCode > 299 {
cs.abortRequestBodyWrite()
}
if re.err != nil {
cc.forgetStreamID(cs.ID)
return nil, re.err
}
res.Request = req
res.TLS = cc.tlsState
return res, nil
case <-http2requestCancel(req):
case <-requestCanceledCh:
cc.forgetStreamID(cs.ID)
cs.abortRequestBodyWrite()
return nil, http2errRequestCanceled
if !hasBody {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return nil, http2errRequestCanceled
}
requestCanceled = true
requestCanceledCh = nil
readLoopResCh = nil
case <-cs.peerReset:
if requestCanceled {
return nil, http2errRequestCanceled
}
return nil, cs.resetErr
case err := <-bodyCopyErrc:
if requestCanceled {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return nil, http2errRequestCanceled
}
if err != nil {
return nil, err
}
......@@ -4856,6 +4889,7 @@ func (cc *http2ClientConn) newStream() *http2clientStream {
ID: cc.nextStreamID,
resc: make(chan http2resAndError, 1),
peerReset: make(chan struct{}),
done: make(chan struct{}),
}
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
......@@ -4866,12 +4900,17 @@ func (cc *http2ClientConn) newStream() *http2clientStream {
return cs
}
func (cc *http2ClientConn) forgetStreamID(id uint32) {
cc.streamByID(id, true)
}
func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream {
cc.mu.Lock()
defer cc.mu.Unlock()
cs := cc.streams[id]
if andRemove {
if andRemove && cs != nil {
delete(cc.streams, id)
close(cs.done)
}
return cs
}
......@@ -4926,6 +4965,7 @@ func (rl *http2clientConnReadLoop) cleanup() {
case cs.resc <- http2resAndError{err: err}:
default:
}
close(cs.done)
}
cc.closed = true
cc.cond.Broadcast()
......@@ -5291,6 +5331,7 @@ func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode,
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
cc.bw.Flush()
cc.wmu.Unlock()
}
......
......@@ -27,6 +27,7 @@ import (
"os/exec"
"reflect"
"runtime"
"runtime/debug"
"sort"
"strconv"
"strings"
......@@ -3038,7 +3039,6 @@ func TestTransportAndServerSharedBodyRace_h1(t *testing.T) {
testTransportAndServerSharedBodyRace(t, h1Mode)
}
func TestTransportAndServerSharedBodyRace_h2(t *testing.T) {
t.Skip("failing in http2 mode; golang.org/issue/13556")
testTransportAndServerSharedBodyRace(t, h2Mode)
}
func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
......@@ -3046,11 +3046,40 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
const bodySize = 1 << 20
// errorf is like t.Errorf, but also writes to println. When
// this test fails, it hangs. This helps debugging and I've
// added this enough times "temporarily". It now gets added
// full time.
errorf := func(format string, args ...interface{}) {
v := fmt.Sprintf(format, args...)
println(v)
t.Error(v)
}
unblockBackend := make(chan bool)
backend := newClientServerTest(t, h2, HandlerFunc(func(rw ResponseWriter, req *Request) {
io.CopyN(rw, req.Body, bodySize)
gone := rw.(CloseNotifier).CloseNotify()
didCopy := make(chan interface{})
go func() {
n, err := io.CopyN(rw, req.Body, bodySize)
didCopy <- []interface{}{n, err}
}()
isGone := false
Loop:
for {
select {
case <-didCopy:
break Loop
case <-gone:
isGone = true
case <-time.After(time.Second):
println("1 second passes in backend, proxygone=", isGone)
}
}
<-unblockBackend
}))
var quitTimer *time.Timer
defer func() { quitTimer.Stop() }()
defer backend.close()
backendRespc := make(chan *Response, 1)
......@@ -3063,17 +3092,17 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
bresp, err := proxy.c.Do(req2)
if err != nil {
t.Errorf("Proxy outbound request: %v", err)
errorf("Proxy outbound request: %v", err)
return
}
_, err = io.CopyN(ioutil.Discard, bresp.Body, bodySize/2)
if err != nil {
t.Errorf("Proxy copy error: %v", err)
errorf("Proxy copy error: %v", err)
return
}
backendRespc <- bresp // to close later
// Try to cause a race: Both the DefaultTransport and the proxy handler's Server
// Try to cause a race: Both the Transport and the proxy handler's Server
// will try to read/close req.Body (aka req2.Body)
if h2 {
close(cancel)
......@@ -3083,6 +3112,20 @@ func testTransportAndServerSharedBodyRace(t *testing.T, h2 bool) {
rw.Write([]byte("OK"))
}))
defer proxy.close()
defer func() {
// Before we shut down our two httptest.Servers, start a timer.
// We choose 7 seconds because httptest.Server starts logging
// warnings to stderr at 5 seconds. If we don't disarm this bomb
// in 7 seconds (after the two httptest.Server.Close calls above),
// then we explode with stacks.
quitTimer = time.AfterFunc(7*time.Second, func() {
debug.SetTraceback("ALL")
stacks := make([]byte, 1<<20)
stacks = stacks[:runtime.Stack(stacks, true)]
fmt.Fprintf(os.Stderr, "%s", stacks)
log.Fatalf("Timeout.")
})
}()
defer close(unblockBackend)
req, _ := NewRequest("POST", proxy.ts.URL, io.LimitReader(neverEnding('a'), bodySize))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment