Commit e5febf95 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: flush request body chunks in Transport

The Transport's writer to the remote server is wrapped in a
bufio.Writer to suppress many small writes while writing headers and
trailers. However, when writing the request body, the buffering may get
in the way if the request body is arriving slowly.

Because the io.Copy from the Request.Body to the writer is already
buffered, the outer bufio.Writer is unnecessary and prevents small
Request.Body.Reads from going to the server right away. (and the
io.Reader contract does say to return when you've got something,
instead of blocking waiting for more). After the body is finished, the
Transport's bufio.Writer is still used for any trailers following.

A previous attempted fix for this made the chunk writer always flush
if the underlying type was a bufio.Writer, but that is not quite
correct. This CL instead makes it opt-in by using a private sentinel
type (wrapping a *bufio.Writer) to the chunk writer that requests
Flushes after each chunk body (the chunk header & chunk body are still
buffered together into one write).

Fixes #6574

Change-Id: Icefcdf17130c9e285c80b69af295bfd3e72c3a70
Reviewed-on: https://go-review.googlesource.com/10021Reviewed-by: 's avatarAndrew Gerrand <adg@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent 5c7f9442
......@@ -173,8 +173,12 @@ func (cw *chunkedWriter) Write(data []byte) (n int, err error) {
err = io.ErrShortWrite
return
}
_, err = io.WriteString(cw.Wire, "\r\n")
if _, err = io.WriteString(cw.Wire, "\r\n"); err != nil {
return
}
if bw, ok := cw.Wire.(*FlushAfterChunkWriter); ok {
err = bw.Flush()
}
return
}
......@@ -183,6 +187,15 @@ func (cw *chunkedWriter) Close() error {
return err
}
// FlushAfterChunkWriter signals from the caller of NewChunkedWriter
// that each chunk should be followed by a flush. It is used by the
// http.Transport code to keep the buffering behavior for headers and
// trailers, but flush out chunks aggressively in the middle for
// request bodies which may be generated slowly. See Issue 6574.
type FlushAfterChunkWriter struct {
*bufio.Writer
}
func parseHexUint(v []byte) (n uint64, err error) {
for _, b := range v {
n <<= 4
......
......@@ -43,6 +43,7 @@ type transferWriter struct {
Close bool
TransferEncoding []string
Trailer Header
IsResponse bool
}
func newTransferWriter(r interface{}) (t *transferWriter, err error) {
......@@ -89,6 +90,7 @@ func newTransferWriter(r interface{}) (t *transferWriter, err error) {
}
}
case *Response:
t.IsResponse = true
if rr.Request != nil {
t.Method = rr.Request.Method
}
......@@ -206,6 +208,9 @@ func (t *transferWriter) WriteBody(w io.Writer) error {
// Write body
if t.Body != nil {
if chunked(t.TransferEncoding) {
if bw, ok := w.(*bufio.Writer); ok && !t.IsResponse {
w = &internal.FlushAfterChunkWriter{bw}
}
cw := internal.NewChunkedWriter(w)
_, err = io.Copy(cw, t.Body)
if err == nil {
......
......@@ -23,6 +23,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"reflect"
"runtime"
"strconv"
"strings"
......@@ -2447,6 +2448,104 @@ func TestTransportDialCancelRace(t *testing.T) {
}
}
// logWritesConn is a net.Conn that logs each Write call to writes
// and then proxies to w.
// It proxies Read calls to a reader it receives from rch.
type logWritesConn struct {
net.Conn // nil. crash on use.
w io.Writer
rch <-chan io.Reader
r io.Reader // nil until received by rch
mu sync.Mutex
writes []string
}
func (c *logWritesConn) Write(p []byte) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
c.writes = append(c.writes, string(p))
return c.w.Write(p)
}
func (c *logWritesConn) Read(p []byte) (n int, err error) {
if c.r == nil {
c.r = <-c.rch
}
return c.r.Read(p)
}
func (c *logWritesConn) Close() error { return nil }
// Issue 6574
func TestTransportFlushesBodyChunks(t *testing.T) {
defer afterTest(t)
resBody := make(chan io.Reader, 1)
connr, connw := io.Pipe() // connection pipe pair
lw := &logWritesConn{
rch: resBody,
w: connw,
}
tr := &Transport{
Dial: func(network, addr string) (net.Conn, error) {
return lw, nil
},
}
bodyr, bodyw := io.Pipe() // body pipe pair
go func() {
defer bodyw.Close()
for i := 0; i < 3; i++ {
fmt.Fprintf(bodyw, "num%d\n", i)
}
}()
resc := make(chan *Response)
go func() {
req, _ := NewRequest("POST", "http://localhost:8080", bodyr)
req.Header.Set("User-Agent", "x") // known value for test
res, err := tr.RoundTrip(req)
if err != nil {
t.Error("RoundTrip: %v", err)
close(resc)
return
}
resc <- res
}()
// Fully consume the request before checking the Write log vs. want.
req, err := ReadRequest(bufio.NewReader(connr))
if err != nil {
t.Fatal(err)
}
io.Copy(ioutil.Discard, req.Body)
// Unblock the transport's roundTrip goroutine.
resBody <- strings.NewReader("HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n")
res, ok := <-resc
if !ok {
return
}
defer res.Body.Close()
want := []string{
// Because Request.ContentLength = 0, the body is sniffed for 1 byte to determine whether there's content.
// That explains the initial "num0" being split into "n" and "um0".
// The first byte is included with the request headers Write. Perhaps in the future
// we will want to flush the headers out early if the first byte of the request body is
// taking a long time to arrive. But not yet.
"POST / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: x\r\nTransfer-Encoding: chunked\r\nAccept-Encoding: gzip\r\n\r\n" +
"1\r\nn\r\n",
"4\r\num0\n\r\n",
"5\r\nnum1\n\r\n",
"5\r\nnum2\n\r\n",
"0\r\n\r\n",
}
if !reflect.DeepEqual(lw.writes, want) {
t.Errorf("Writes differed.\n Got: %q\nWant: %q\n", lw.writes, want)
}
}
func wantBody(res *http.Response, err error, want string) error {
if err != nil {
return err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment