Commit 3d9a20a7 authored by Tom Bergan's avatar Tom Bergan Committed by Brad Fitzpatrick

http2: don't flush a stream's write queue in sc.resetStream

resetStream(st) queues a RST_STREAM frame and then calls closeStream(st).
Unfortunately, closeStream(st) flushes any writes pending on st, which
can drop the RST_STREAM that was just queued. (If we are lucky, the
RST_STREAM will fit in the send buffer and won't be dropped, however,
if we are unlucky the RST_STREAM will be dropped.)

I fixed this bug by removing closeStream(st) from resetStream. Instead,
closeStream happens after the RST_STREAM frame is written. This is a more
direct implementation of the diagram in RFC 7540 Section 5.1, which says
that a stream does not transition to "closed" until after the RST_STREAM
has been sent.

A side-effect is that the stream may stay open for longer than it did
previously (since it won't close until *after* the RST_STREAM frame is
actually written). Situations like the following are a problem:

- Client sends a DATA frame that exceeds its flow-control window
- Server returns streamError(ErrCodeFlowControl) from processData
- RST_STREAM is queued behind other frames
- Server process the request body and releases flow-control
- Client sends another DATA frame, this one fits in the flow-control
  window. Server should NOT process this frame.

To avoid the above problem, we set a bool st.resetQueued=true when
RST_STREAM is queued, then ignore all future incoming HEADERS and DATA
frames on that stream.

I also removed st.sentReset and st.gotReset, which were used to ignore
frames after RST_STREAM is sent. Now we just check if the stream is closed.

Fixes golang/go#18111

Change-Id: Ieb7c848989431add5b7d95f40d6d91db7edc4980
Reviewed-on: https://go-review.googlesource.com/34238Reviewed-by: 's avatarBrad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent e31bd588
...@@ -453,8 +453,7 @@ type stream struct { ...@@ -453,8 +453,7 @@ type stream struct {
numTrailerValues int64 numTrailerValues int64
weight uint8 weight uint8
state streamState state streamState
sentReset bool // only true once detached from streams map resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotReset bool // only true once detacted from streams map
gotTrailerHeader bool // HEADER frame for trailers was seen gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100) wroteHeaders bool // whether we wrote headers (not status 100)
reqBuf []byte // if non-nil, body pipe buffer to return later at EOF reqBuf []byte // if non-nil, body pipe buffer to return later at EOF
...@@ -874,8 +873,34 @@ func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error { ...@@ -874,8 +873,34 @@ func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
func (sc *serverConn) writeFrame(wr FrameWriteRequest) { func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
sc.serveG.check() sc.serveG.check()
// If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool var ignoreWrite bool
// We are not allowed to write frames on closed streams. RFC 7540 Section
// 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
// a closed stream." Our server never sends PRIORITY, so that exception
// does not apply.
//
// The serverConn might close an open stream while the stream's handler
// is still running. For example, the server might close a stream when it
// receives bad data from the client. If this happens, the handler might
// attempt to write a frame after the stream has been closed (since the
// handler hasn't yet been notified of the close). In this case, we simply
// ignore the frame. The handler will notice that the stream is closed when
// it waits for the frame to be written.
//
// As an exception to this rule, we allow sending RST_STREAM after close.
// This allows us to immediately reject new streams without tracking any
// state for those streams (except for the queued RST_STREAM frame). This
// may result in duplicate RST_STREAMs in some cases, but the client should
// ignore those.
if wr.StreamID() != 0 {
_, isReset := wr.write.(StreamError)
if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset {
ignoreWrite = true
}
}
// Don't send a 100-continue response if we've already sent headers. // Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030. // See golang.org/issue/14030.
switch wr.write.(type) { switch wr.write.(type) {
...@@ -883,6 +908,11 @@ func (sc *serverConn) writeFrame(wr FrameWriteRequest) { ...@@ -883,6 +908,11 @@ func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
wr.stream.wroteHeaders = true wr.stream.wroteHeaders = true
case write100ContinueHeadersFrame: case write100ContinueHeadersFrame:
if wr.stream.wroteHeaders { if wr.stream.wroteHeaders {
// We do not need to notify wr.done because this frame is
// never written with wr.done != nil.
if wr.done != nil {
panic("wr.done != nil for write100ContinueHeadersFrame")
}
ignoreWrite = true ignoreWrite = true
} }
} }
...@@ -908,11 +938,6 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { ...@@ -908,11 +938,6 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
case stateHalfClosedLocal: case stateHalfClosedLocal:
panic("internal error: attempt to send frame on half-closed-local stream") panic("internal error: attempt to send frame on half-closed-local stream")
case stateClosed: case stateClosed:
if st.sentReset || st.gotReset {
// Skip this frame.
sc.scheduleFrameWrite()
return
}
panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wr)) panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wr))
} }
} }
...@@ -921,9 +946,7 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { ...@@ -921,9 +946,7 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
wpp.promisedID, err = wpp.allocatePromisedID() wpp.promisedID, err = wpp.allocatePromisedID()
if err != nil { if err != nil {
sc.writingFrameAsync = false sc.writingFrameAsync = false
if wr.done != nil { wr.replyToWriter(err)
wr.done <- err
}
return return
} }
} }
...@@ -956,25 +979,9 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) { ...@@ -956,25 +979,9 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
sc.writingFrameAsync = false sc.writingFrameAsync = false
wr := res.wr wr := res.wr
st := wr.stream
closeStream := endsStream(wr.write)
if _, ok := wr.write.(handlerPanicRST); ok {
sc.closeStream(st, errHandlerPanicked)
}
// Reply (if requested) to the blocked ServeHTTP goroutine. if writeEndsStream(wr.write) {
if ch := wr.done; ch != nil { st := wr.stream
select {
case ch <- res.err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
}
wr.write = nil // prevent use (assume it's tainted after wr.done send)
if closeStream {
if st == nil { if st == nil {
panic("internal error: expecting non-nil stream") panic("internal error: expecting non-nil stream")
} }
...@@ -987,15 +994,29 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) { ...@@ -987,15 +994,29 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// reading data (see possible TODO at top of // reading data (see possible TODO at top of
// this file), we go into closed state here // this file), we go into closed state here
// anyway, after telling the peer we're // anyway, after telling the peer we're
// hanging up on them. // hanging up on them. We'll transition to
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream // stateClosed after the RST_STREAM frame is
errCancel := streamError(st.id, ErrCodeCancel) // written.
sc.resetStream(errCancel) st.state = stateHalfClosedLocal
sc.resetStream(streamError(st.id, ErrCodeCancel))
case stateHalfClosedRemote: case stateHalfClosedRemote:
sc.closeStream(st, errHandlerComplete) sc.closeStream(st, errHandlerComplete)
} }
} else {
switch v := wr.write.(type) {
case StreamError:
// st may be unknown if the RST_STREAM was generated to reject bad input.
if st, ok := sc.streams[v.StreamID]; ok {
sc.closeStream(st, v)
}
case handlerPanicRST:
sc.closeStream(wr.stream, errHandlerPanicked)
}
} }
// Reply (if requested) to unblock the ServeHTTP goroutine.
wr.replyToWriter(res.err)
sc.scheduleFrameWrite() sc.scheduleFrameWrite()
} }
...@@ -1092,8 +1113,7 @@ func (sc *serverConn) resetStream(se StreamError) { ...@@ -1092,8 +1113,7 @@ func (sc *serverConn) resetStream(se StreamError) {
sc.serveG.check() sc.serveG.check()
sc.writeFrame(FrameWriteRequest{write: se}) sc.writeFrame(FrameWriteRequest{write: se})
if st, ok := sc.streams[se.StreamID]; ok { if st, ok := sc.streams[se.StreamID]; ok {
st.sentReset = true st.resetQueued = true
sc.closeStream(st, se)
} }
} }
...@@ -1257,7 +1277,6 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error { ...@@ -1257,7 +1277,6 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
return ConnectionError(ErrCodeProtocol) return ConnectionError(ErrCodeProtocol)
} }
if st != nil { if st != nil {
st.gotReset = true
st.cancelCtx() st.cancelCtx()
sc.closeStream(st, streamError(f.StreamID, f.ErrCode)) sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
} }
...@@ -1396,7 +1415,7 @@ func (sc *serverConn) processData(f *DataFrame) error { ...@@ -1396,7 +1415,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
// type PROTOCOL_ERROR." // type PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol) return ConnectionError(ErrCodeProtocol)
} }
if st == nil || state != stateOpen || st.gotTrailerHeader { if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued {
// This includes sending a RST_STREAM if the stream is // This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that // in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading & // the http.Handler returned, so it's done reading &
...@@ -1416,6 +1435,10 @@ func (sc *serverConn) processData(f *DataFrame) error { ...@@ -1416,6 +1435,10 @@ func (sc *serverConn) processData(f *DataFrame) error {
sc.inflow.take(int32(f.Length)) sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
return nil
}
return streamError(id, ErrCodeStreamClosed) return streamError(id, ErrCodeStreamClosed)
} }
if st.body == nil { if st.body == nil {
...@@ -1524,6 +1547,11 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { ...@@ -1524,6 +1547,11 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// open, let it process its own HEADERS frame (trailers at this // open, let it process its own HEADERS frame (trailers at this
// point, if it's valid). // point, if it's valid).
if st := sc.streams[f.StreamID]; st != nil { if st := sc.streams[f.StreamID]; st != nil {
if st.resetQueued {
// We're sending RST_STREAM to close the stream, so don't bother
// processing this frame.
return nil
}
return st.processTrailerHeaders(f) return st.processTrailerHeaders(f)
} }
......
...@@ -1262,6 +1262,7 @@ func testServerPostUnblock(t *testing.T, ...@@ -1262,6 +1262,7 @@ func testServerPostUnblock(t *testing.T,
inHandler <- true inHandler <- true
errc <- handler(w, r) errc <- handler(w, r)
}) })
defer st.Close()
st.greet() st.greet()
st.writeHeaders(HeadersFrameParam{ st.writeHeaders(HeadersFrameParam{
StreamID: 1, StreamID: 1,
...@@ -1279,7 +1280,6 @@ func testServerPostUnblock(t *testing.T, ...@@ -1279,7 +1280,6 @@ func testServerPostUnblock(t *testing.T,
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for Handler to return") t.Fatal("timeout waiting for Handler to return")
} }
st.Close()
} }
func TestServer_RSTStream_Unblocks_Read(t *testing.T) { func TestServer_RSTStream_Unblocks_Read(t *testing.T) {
......
...@@ -45,9 +45,10 @@ type writeContext interface { ...@@ -45,9 +45,10 @@ type writeContext interface {
HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer)
} }
// endsStream reports whether the given frame writer w will locally // writeEndsStream reports whether w writes a frame that will transition
// close the stream. // the stream to a half-closed local state. This returns false for RST_STREAM,
func endsStream(w writeFramer) bool { // which closes the entire stream (not just the local half).
func writeEndsStream(w writeFramer) bool {
switch v := w.(type) { switch v := w.(type) {
case *writeData: case *writeData:
return v.endStream return v.endStream
...@@ -57,7 +58,7 @@ func endsStream(w writeFramer) bool { ...@@ -57,7 +58,7 @@ func endsStream(w writeFramer) bool {
// This can only happen if the caller reuses w after it's // This can only happen if the caller reuses w after it's
// been intentionally nil'ed out to prevent use. Keep this // been intentionally nil'ed out to prevent use. Keep this
// here to catch future refactoring breaking it. // here to catch future refactoring breaking it.
panic("endsStream called on nil writeFramer") panic("writeEndsStream called on nil writeFramer")
} }
return false return false
} }
......
...@@ -160,6 +160,20 @@ func (wr FrameWriteRequest) String() string { ...@@ -160,6 +160,20 @@ func (wr FrameWriteRequest) String() string {
return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des) return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
} }
// replyToWriter sends err to wr.done and panics if the send must block
// This does nothing if wr.done is nil.
func (wr *FrameWriteRequest) replyToWriter(err error) {
if wr.done == nil {
return
}
select {
case wr.done <- err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
wr.write = nil // prevent use (assume it's tainted after wr.done send)
}
// writeQueue is used by implementations of WriteScheduler. // writeQueue is used by implementations of WriteScheduler.
type writeQueue struct { type writeQueue struct {
s []FrameWriteRequest s []FrameWriteRequest
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment