Commit bb807669 authored by Alexander Polcyn's avatar Alexander Polcyn Committed by Brad Fitzpatrick

http2: Add opt-in option to Framer to allow DataFrame struct reuse

The existing Framer in net/http2 allocates a new DataFrame struct
for each DataFrame read on calls to ReadFrame. The SetReuseFrame
option introduced here, if set on a Framer, allows the
Framer to reuse Frame objects and changes the ReadFrame API
so that returned Frame objects are only valid until the next call
to ReadFrame. This opt-in API now only implements reuse of DataFrames,
but it allows the Framer to reuse of any type of Frame.

The footprint caused by creation of new DataFrame structs per data
frame was noticed in micro benchmarks of "gRPC" server "streaming
throuhgput", which uses the Framer in this package. This benchmark
happened to use long lived http2 streams that do client-server "ping-pong"
requests with small data frames, and DataFrames were seen to be a
significant source of allocations.

Running local benchmarks with: (from x/net/http2 directory)

$ go test -run=^$ -bench=BenchmarkServerToClientStream

example output:
* expect an alloc reduction of at least 1 and a small memory reduction between
"BenchmarkServerToClientStreamDefaultOptions" and
"BenchmarkServerToClientStreamReuseFrames"

BenchmarkServerToClientStreamDefaultOptions-12    	   30000
46216 ns/op	     971 B/op	      17 allocs/op
BenchmarkServerToClientStreamReuseFrames-12       	   30000
44952 ns/op	     924 B/op	      16 allocs/op

Fixes golang/go#18502

Change-Id: Iad93420ef6c3918f54249d867098f1dadfa324d8
Reviewed-on: https://go-review.googlesource.com/34812
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarBrad Fitzpatrick <bradfitz@golang.org>
parent 10c134ea
......@@ -122,7 +122,7 @@ var flagName = map[FrameType]map[Flags]string{
// a frameParser parses a frame given its FrameHeader and payload
// bytes. The length of payload will always equal fh.Length (which
// might be 0).
type frameParser func(fh FrameHeader, payload []byte) (Frame, error)
type frameParser func(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error)
var frameParsers = map[FrameType]frameParser{
FrameData: parseDataFrame,
......@@ -323,6 +323,8 @@ type Framer struct {
debugFramerBuf *bytes.Buffer
debugReadLoggerf func(string, ...interface{})
debugWriteLoggerf func(string, ...interface{})
frameCache *frameCache // nil if frames aren't reused (default)
}
func (fr *Framer) maxHeaderListSize() uint32 {
......@@ -398,6 +400,27 @@ const (
maxFrameSize = 1<<24 - 1
)
// SetReuseFrames allows the Framer to reuse Frames.
// If called on a Framer, Frames returned by calls to ReadFrame are only
// valid until the next call to ReadFrame.
func (fr *Framer) SetReuseFrames() {
if fr.frameCache != nil {
return
}
fr.frameCache = &frameCache{}
}
type frameCache struct {
dataFrame DataFrame
}
func (fc *frameCache) getDataFrame() *DataFrame {
if fc == nil {
return &DataFrame{}
}
return &fc.dataFrame
}
// NewFramer returns a Framer that writes frames to w and reads them from r.
func NewFramer(w io.Writer, r io.Reader) *Framer {
fr := &Framer{
......@@ -477,7 +500,7 @@ func (fr *Framer) ReadFrame() (Frame, error) {
if _, err := io.ReadFull(fr.r, payload); err != nil {
return nil, err
}
f, err := typeFrameParser(fh.Type)(fh, payload)
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
if err != nil {
if ce, ok := err.(connError); ok {
return nil, fr.connError(ce.Code, ce.Reason)
......@@ -565,7 +588,7 @@ func (f *DataFrame) Data() []byte {
return f.data
}
func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
func parseDataFrame(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
if fh.StreamID == 0 {
// DATA frames MUST be associated with a stream. If a
// DATA frame is received whose stream identifier
......@@ -574,9 +597,9 @@ func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
// PROTOCOL_ERROR.
return nil, connError{ErrCodeProtocol, "DATA frame with stream ID 0"}
}
f := &DataFrame{
FrameHeader: fh,
}
f := fc.getDataFrame()
f.FrameHeader = fh
var padSize byte
if fh.Flags.Has(FlagDataPadded) {
var err error
......@@ -672,7 +695,7 @@ type SettingsFrame struct {
p []byte
}
func parseSettingsFrame(fh FrameHeader, p []byte) (Frame, error) {
func parseSettingsFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if fh.Flags.Has(FlagSettingsAck) && fh.Length > 0 {
// When this (ACK 0x1) bit is set, the payload of the
// SETTINGS frame MUST be empty. Receipt of a
......@@ -774,7 +797,7 @@ type PingFrame struct {
func (f *PingFrame) IsAck() bool { return f.Flags.Has(FlagPingAck) }
func parsePingFrame(fh FrameHeader, payload []byte) (Frame, error) {
func parsePingFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
if len(payload) != 8 {
return nil, ConnectionError(ErrCodeFrameSize)
}
......@@ -814,7 +837,7 @@ func (f *GoAwayFrame) DebugData() []byte {
return f.debugData
}
func parseGoAwayFrame(fh FrameHeader, p []byte) (Frame, error) {
func parseGoAwayFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if fh.StreamID != 0 {
return nil, ConnectionError(ErrCodeProtocol)
}
......@@ -854,7 +877,7 @@ func (f *UnknownFrame) Payload() []byte {
return f.p
}
func parseUnknownFrame(fh FrameHeader, p []byte) (Frame, error) {
func parseUnknownFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
return &UnknownFrame{fh, p}, nil
}
......@@ -865,7 +888,7 @@ type WindowUpdateFrame struct {
Increment uint32 // never read with high bit set
}
func parseWindowUpdateFrame(fh FrameHeader, p []byte) (Frame, error) {
func parseWindowUpdateFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if len(p) != 4 {
return nil, ConnectionError(ErrCodeFrameSize)
}
......@@ -930,7 +953,7 @@ func (f *HeadersFrame) HasPriority() bool {
return f.FrameHeader.Flags.Has(FlagHeadersPriority)
}
func parseHeadersFrame(fh FrameHeader, p []byte) (_ Frame, err error) {
func parseHeadersFrame(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
hf := &HeadersFrame{
FrameHeader: fh,
}
......@@ -1067,7 +1090,7 @@ func (p PriorityParam) IsZero() bool {
return p == PriorityParam{}
}
func parsePriorityFrame(fh FrameHeader, payload []byte) (Frame, error) {
func parsePriorityFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
if fh.StreamID == 0 {
return nil, connError{ErrCodeProtocol, "PRIORITY frame with stream ID 0"}
}
......@@ -1114,7 +1137,7 @@ type RSTStreamFrame struct {
ErrCode ErrCode
}
func parseRSTStreamFrame(fh FrameHeader, p []byte) (Frame, error) {
func parseRSTStreamFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if len(p) != 4 {
return nil, ConnectionError(ErrCodeFrameSize)
}
......@@ -1144,7 +1167,7 @@ type ContinuationFrame struct {
headerFragBuf []byte
}
func parseContinuationFrame(fh FrameHeader, p []byte) (Frame, error) {
func parseContinuationFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
if fh.StreamID == 0 {
return nil, connError{ErrCodeProtocol, "CONTINUATION frame with stream ID 0"}
}
......@@ -1194,7 +1217,7 @@ func (f *PushPromiseFrame) HeadersEnded() bool {
return f.FrameHeader.Flags.Has(FlagPushPromiseEndHeaders)
}
func parsePushPromise(fh FrameHeader, p []byte) (_ Frame, err error) {
func parsePushPromise(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
pp := &PushPromiseFrame{
FrameHeader: fh,
}
......
......@@ -1096,6 +1096,95 @@ func TestMetaFrameHeader(t *testing.T) {
}
}
func TestSetReuseFrames(t *testing.T) {
fr, buf := testFramer()
fr.SetReuseFrames()
// Check that DataFrames are reused. Note that
// SetReuseFrames only currently implements reuse of DataFrames.
firstDf := readAndVerifyDataFrame("ABC", 3, fr, buf, t)
for i := 0; i < 10; i++ {
df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
if df != firstDf {
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
}
}
for i := 0; i < 10; i++ {
df := readAndVerifyDataFrame("", 0, fr, buf, t)
if df != firstDf {
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
}
}
for i := 0; i < 10; i++ {
df := readAndVerifyDataFrame("HHH", 3, fr, buf, t)
if df != firstDf {
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
}
}
}
func TestSetReuseFramesMoreThanOnce(t *testing.T) {
fr, buf := testFramer()
fr.SetReuseFrames()
firstDf := readAndVerifyDataFrame("ABC", 3, fr, buf, t)
fr.SetReuseFrames()
for i := 0; i < 10; i++ {
df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
// SetReuseFrames should be idempotent
fr.SetReuseFrames()
if df != firstDf {
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
}
}
}
func TestNoSetReuseFrames(t *testing.T) {
fr, buf := testFramer()
const numNewDataFrames = 10
dfSoFar := make([]interface{}, numNewDataFrames)
// Check that DataFrames are not reused if SetReuseFrames wasn't called.
// SetReuseFrames only currently implements reuse of DataFrames.
for i := 0; i < numNewDataFrames; i++ {
df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
for _, item := range dfSoFar {
if df == item {
t.Errorf("Expected Framer to return new DataFrames since SetNoReuseFrames not set.")
}
}
dfSoFar[i] = df
}
}
func readAndVerifyDataFrame(data string, length byte, fr *Framer, buf *bytes.Buffer, t *testing.T) *DataFrame {
var streamID uint32 = 1<<24 + 2<<16 + 3<<8 + 4
fr.WriteData(streamID, true, []byte(data))
wantEnc := "\x00\x00" + string(length) + "\x00\x01\x01\x02\x03\x04" + data
if buf.String() != wantEnc {
t.Errorf("encoded as %q; want %q", buf.Bytes(), wantEnc)
}
f, err := fr.ReadFrame()
if err != nil {
t.Fatal(err)
}
df, ok := f.(*DataFrame)
if !ok {
t.Fatalf("got %T; want *DataFrame", f)
}
if !bytes.Equal(df.Data(), []byte(data)) {
t.Errorf("got %q; want %q", df.Data(), []byte(data))
}
if f.Header().Flags&1 == 0 {
t.Errorf("didn't see END_STREAM flag")
}
return df
}
func encodeHeaderRaw(t *testing.T, pairs ...string) []byte {
var he hpackEncoder
return he.encodeHeaderRaw(t, pairs...)
......
......@@ -80,6 +80,7 @@ type serverTesterOpt string
var optOnlyServer = serverTesterOpt("only_server")
var optQuiet = serverTesterOpt("quiet_logging")
var optFramerReuseFrames = serverTesterOpt("frame_reuse_frames")
func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}) *serverTester {
resetHooks()
......@@ -91,7 +92,7 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
NextProtos: []string{NextProtoTLS},
}
var onlyServer, quiet bool
var onlyServer, quiet, framerReuseFrames bool
h2server := new(Server)
for _, opt := range opts {
switch v := opt.(type) {
......@@ -107,6 +108,8 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
onlyServer = true
case optQuiet:
quiet = true
case optFramerReuseFrames:
framerReuseFrames = true
}
case func(net.Conn, http.ConnState):
ts.Config.ConnState = v
......@@ -149,6 +152,9 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
}
st.cc = cc
st.fr = NewFramer(cc, cc)
if framerReuseFrames {
st.fr.SetReuseFrames()
}
if !logFrameReads && !logFrameWrites {
st.fr.debugReadLoggerf = func(m string, v ...interface{}) {
m = time.Now().Format("2006-01-02 15:04:05.999999999 ") + strings.TrimPrefix(m, "http2: ") + "\n"
......@@ -2994,6 +3000,89 @@ func BenchmarkServerPosts(b *testing.B) {
}
}
// Send a stream of messages from server to client in separate data frames.
// Brings up performance issues seen in long streams.
// Created to show problem in go issue #18502
func BenchmarkServerToClientStreamDefaultOptions(b *testing.B) {
benchmarkServerToClientStream(b)
}
// Justification for Change-Id: Iad93420ef6c3918f54249d867098f1dadfa324d8
// Expect to see memory/alloc reduction by opting in to Frame reuse with the Framer.
func BenchmarkServerToClientStreamReuseFrames(b *testing.B) {
benchmarkServerToClientStream(b, optFramerReuseFrames)
}
func benchmarkServerToClientStream(b *testing.B, newServerOpts ...interface{}) {
defer disableGoroutineTracking()()
b.ReportAllocs()
const msgLen = 1
// default window size
const windowSize = 1<<16 - 1
// next message to send from the server and for the client to expect
nextMsg := func(i int) []byte {
msg := make([]byte, msgLen)
msg[0] = byte(i)
if len(msg) != msgLen {
panic("invalid test setup msg length")
}
return msg
}
st := newServerTester(b, func(w http.ResponseWriter, r *http.Request) {
// Consume the (empty) body from th peer before replying, otherwise
// the server will sometimes (depending on scheduling) send the peer a
// a RST_STREAM with the CANCEL error code.
if n, err := io.Copy(ioutil.Discard, r.Body); n != 0 || err != nil {
b.Errorf("Copy error; got %v, %v; want 0, nil", n, err)
}
for i := 0; i < b.N; i += 1 {
w.Write(nextMsg(i))
w.(http.Flusher).Flush()
}
}, newServerOpts...)
defer st.Close()
st.greet()
const id = uint32(1)
st.writeHeaders(HeadersFrameParam{
StreamID: id,
BlockFragment: st.encodeHeader(":method", "POST"),
EndStream: false,
EndHeaders: true,
})
st.writeData(id, true, nil)
st.wantHeaders()
var pendingWindowUpdate = uint32(0)
for i := 0; i < b.N; i += 1 {
expected := nextMsg(i)
df := st.wantData()
if bytes.Compare(expected, df.data) != 0 {
b.Fatalf("Bad message received; want %v; got %v", expected, df.data)
}
// try to send infrequent but large window updates so they don't overwhelm the test
pendingWindowUpdate += uint32(len(df.data))
if pendingWindowUpdate >= windowSize/2 {
if err := st.fr.WriteWindowUpdate(0, pendingWindowUpdate); err != nil {
b.Fatal(err)
}
if err := st.fr.WriteWindowUpdate(id, pendingWindowUpdate); err != nil {
b.Fatal(err)
}
pendingWindowUpdate = 0
}
}
df := st.wantData()
if !df.StreamEnded() {
b.Fatalf("DATA didn't have END_STREAM; got %v", df)
}
}
// go-fuzz bug, originally reported at https://github.com/bradfitz/http2/issues/53
// Verify we don't hang.
func TestIssue53(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment