Commit 8e5cb0da authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: conservatively flush Transport request headers by default

This changes the http.Transport to flush the bufio.Writer between
writing the request headers and the body.

That wasn't done in the past to minimize the number of TCP packets on
the wire, but that's just an optimization, and it causes problems when
servers are waiting for the headers and the client is blocked on
something before reading the body.

Instead, only do the don't-flush optimization if we know we're not
going to block, whitelisting a set of common in-memory Request.Body
types. (the same set of types special-cased by http.NewRequest)

Fixes #22088

Change-Id: I7717750aa6df32dd3eb92d181b45bc7af24b1144
Reviewed-on: https://go-review.googlesource.com/114316
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Andrew Bonventre <andybons@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarAndrew Bonventre <andybons@golang.org>
parent b0b09212
......@@ -13,6 +13,7 @@ import (
"io/ioutil"
"net/http/internal"
"net/textproto"
"reflect"
"sort"
"strconv"
"strings"
......@@ -105,6 +106,17 @@ func newTransferWriter(r interface{}) (t *transferWriter, err error) {
if t.ContentLength < 0 && len(t.TransferEncoding) == 0 && t.shouldSendChunkedRequestBody() {
t.TransferEncoding = []string{"chunked"}
}
// If there's a body, conservatively flush the headers
// to any bufio.Writer we're writing to, just in case
// the server needs the headers early, before we copy
// the body and possibly block. We make an exception
// for the common standard library in-memory types,
// though, to avoid unnecessary TCP packets on the
// wire. (Issue 22088.)
if t.ContentLength != 0 && !isKnownInMemoryReader(t.Body) {
t.FlushHeaders = true
}
atLeastHTTP11 = true // Transport requests are always 1.1 or 2.0
case *Response:
t.IsResponse = true
......@@ -1009,3 +1021,19 @@ func (fr finishAsyncByteRead) Read(p []byte) (n int, err error) {
}
return
}
var nopCloserType = reflect.TypeOf(ioutil.NopCloser(nil))
// isKnownInMemoryReader reports whether r is a type known to not
// block on Read. Its caller uses this as an optional optimization to
// send fewer TCP packets.
func isKnownInMemoryReader(r io.Reader) bool {
switch r.(type) {
case *bytes.Reader, *bytes.Buffer, *strings.Reader:
return true
}
if reflect.TypeOf(r) == nopCloserType {
return isKnownInMemoryReader(reflect.ValueOf(r).Field(0).Interface().(io.Reader))
}
return false
}
......@@ -6,7 +6,9 @@ package http
import (
"bufio"
"bytes"
"io"
"io/ioutil"
"strings"
"testing"
)
......@@ -62,3 +64,29 @@ func TestFinalChunkedBodyReadEOF(t *testing.T) {
t.Errorf("buf = %q; want %q", buf, want)
}
}
func TestDetectInMemoryReaders(t *testing.T) {
pr, _ := io.Pipe()
tests := []struct {
r io.Reader
want bool
}{
{pr, false},
{bytes.NewReader(nil), true},
{bytes.NewBuffer(nil), true},
{strings.NewReader(""), true},
{ioutil.NopCloser(pr), false},
{ioutil.NopCloser(bytes.NewReader(nil)), true},
{ioutil.NopCloser(bytes.NewBuffer(nil)), true},
{ioutil.NopCloser(strings.NewReader("")), true},
}
for i, tt := range tests {
got := isKnownInMemoryReader(tt.r)
if got != tt.want {
t.Logf("%d: got = %v; want %v", i, got, tt.want)
}
}
}
......@@ -3264,8 +3264,8 @@ func TestTransportFlushesBodyChunks(t *testing.T) {
defer res.Body.Close()
want := []string{
"POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n" +
"5\r\nnum0\n\r\n",
"POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n",
"5\r\nnum0\n\r\n",
"5\r\nnum1\n\r\n",
"5\r\nnum2\n\r\n",
"0\r\n\r\n",
......@@ -3275,6 +3275,40 @@ func TestTransportFlushesBodyChunks(t *testing.T) {
}
}
// Issue 22088: flush Transport request headers if we're not sure the body won't block on read.
func TestTransportFlushesRequestHeader(t *testing.T) {
defer afterTest(t)
gotReq := make(chan struct{})
cst := newClientServerTest(t, h1Mode, HandlerFunc(func(w ResponseWriter, r *Request) {
close(gotReq)
}))
defer cst.close()
pr, pw := io.Pipe()
req, err := NewRequest("POST", cst.ts.URL, pr)
if err != nil {
t.Fatal(err)
}
gotRes := make(chan struct{})
go func() {
defer close(gotRes)
res, err := cst.tr.RoundTrip(req)
if err != nil {
t.Error(err)
return
}
res.Body.Close()
}()
select {
case <-gotReq:
pw.Close()
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for handler to get request")
}
<-gotRes
}
// Issue 11745.
func TestTransportPrefersResponseOverWriteError(t *testing.T) {
if testing.Short() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment