Commit 5db0a8b9 authored by Petar Maymounkov's avatar Petar Maymounkov Committed by Russ Cox

http: fixed race condition in persist.go

R=rsc, bradfitzgo, bradfitzwork
CC=golang-dev
https://golang.org/cl/4266042
parent 7bc90eda
...@@ -25,15 +25,15 @@ var ( ...@@ -25,15 +25,15 @@ var (
// i.e. requests can be read out of sync (but in the same order) while the // i.e. requests can be read out of sync (but in the same order) while the
// respective responses are sent. // respective responses are sent.
type ServerConn struct { type ServerConn struct {
lk sync.Mutex // read-write protects the following fields
c net.Conn c net.Conn
r *bufio.Reader r *bufio.Reader
clsd bool // indicates a graceful close
re, we os.Error // read/write errors re, we os.Error // read/write errors
lastbody io.ReadCloser lastbody io.ReadCloser
nread, nwritten int nread, nwritten int
pipe textproto.Pipeline
pipereq map[*Request]uint pipereq map[*Request]uint
lk sync.Mutex // protected read/write to re,we
pipe textproto.Pipeline
} }
// NewServerConn returns a new ServerConn reading and writing c. If r is not // NewServerConn returns a new ServerConn reading and writing c. If r is not
...@@ -90,15 +90,21 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { ...@@ -90,15 +90,21 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
defer sc.lk.Unlock() defer sc.lk.Unlock()
return nil, sc.re return nil, sc.re
} }
if sc.r == nil { // connection closed by user in the meantime
defer sc.lk.Unlock()
return nil, os.EBADF
}
r := sc.r
lastbody := sc.lastbody
sc.lastbody = nil
sc.lk.Unlock() sc.lk.Unlock()
// Make sure body is fully consumed, even if user does not call body.Close // Make sure body is fully consumed, even if user does not call body.Close
if sc.lastbody != nil { if lastbody != nil {
// body.Close is assumed to be idempotent and multiple calls to // body.Close is assumed to be idempotent and multiple calls to
// it should return the error that its first invokation // it should return the error that its first invokation
// returned. // returned.
err = sc.lastbody.Close() err = lastbody.Close()
sc.lastbody = nil
if err != nil { if err != nil {
sc.lk.Lock() sc.lk.Lock()
defer sc.lk.Unlock() defer sc.lk.Unlock()
...@@ -107,10 +113,10 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { ...@@ -107,10 +113,10 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
} }
} }
req, err = ReadRequest(sc.r) req, err = ReadRequest(r)
if err != nil {
sc.lk.Lock() sc.lk.Lock()
defer sc.lk.Unlock() defer sc.lk.Unlock()
if err != nil {
if err == io.ErrUnexpectedEOF { if err == io.ErrUnexpectedEOF {
// A close from the opposing client is treated as a // A close from the opposing client is treated as a
// graceful close, even if there was some unparse-able // graceful close, even if there was some unparse-able
...@@ -119,18 +125,16 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) { ...@@ -119,18 +125,16 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
return nil, sc.re return nil, sc.re
} else { } else {
sc.re = err sc.re = err
return return req, err
} }
} }
sc.lastbody = req.Body sc.lastbody = req.Body
sc.nread++ sc.nread++
if req.Close { if req.Close {
sc.lk.Lock()
defer sc.lk.Unlock()
sc.re = ErrPersistEOF sc.re = ErrPersistEOF
return req, sc.re return req, sc.re
} }
return return req, err
} }
// Pending returns the number of unanswered requests // Pending returns the number of unanswered requests
...@@ -165,24 +169,27 @@ func (sc *ServerConn) Write(req *Request, resp *Response) os.Error { ...@@ -165,24 +169,27 @@ func (sc *ServerConn) Write(req *Request, resp *Response) os.Error {
defer sc.lk.Unlock() defer sc.lk.Unlock()
return sc.we return sc.we
} }
sc.lk.Unlock() if sc.c == nil { // connection closed by user in the meantime
defer sc.lk.Unlock()
return os.EBADF
}
c := sc.c
if sc.nread <= sc.nwritten { if sc.nread <= sc.nwritten {
defer sc.lk.Unlock()
return os.NewError("persist server pipe count") return os.NewError("persist server pipe count")
} }
if resp.Close { if resp.Close {
// After signaling a keep-alive close, any pipelined unread // After signaling a keep-alive close, any pipelined unread
// requests will be lost. It is up to the user to drain them // requests will be lost. It is up to the user to drain them
// before signaling. // before signaling.
sc.lk.Lock()
sc.re = ErrPersistEOF sc.re = ErrPersistEOF
sc.lk.Unlock()
} }
sc.lk.Unlock()
err := resp.Write(sc.c) err := resp.Write(c)
if err != nil {
sc.lk.Lock() sc.lk.Lock()
defer sc.lk.Unlock() defer sc.lk.Unlock()
if err != nil {
sc.we = err sc.we = err
return err return err
} }
...@@ -196,14 +203,15 @@ func (sc *ServerConn) Write(req *Request, resp *Response) os.Error { ...@@ -196,14 +203,15 @@ func (sc *ServerConn) Write(req *Request, resp *Response) os.Error {
// responsible for closing the underlying connection. One must call Close to // responsible for closing the underlying connection. One must call Close to
// regain control of that connection and deal with it as desired. // regain control of that connection and deal with it as desired.
type ClientConn struct { type ClientConn struct {
lk sync.Mutex // read-write protects the following fields
c net.Conn c net.Conn
r *bufio.Reader r *bufio.Reader
re, we os.Error // read/write errors re, we os.Error // read/write errors
lastbody io.ReadCloser lastbody io.ReadCloser
nread, nwritten int nread, nwritten int
pipe textproto.Pipeline
pipereq map[*Request]uint pipereq map[*Request]uint
lk sync.Mutex // protects read/write to re,we,pipereq,etc.
pipe textproto.Pipeline
} }
// NewClientConn returns a new ClientConn reading and writing c. If r is not // NewClientConn returns a new ClientConn reading and writing c. If r is not
...@@ -221,11 +229,11 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn { ...@@ -221,11 +229,11 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
// logic. The user should not call Close while Read or Write is in progress. // logic. The user should not call Close while Read or Write is in progress.
func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) { func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {
cc.lk.Lock() cc.lk.Lock()
defer cc.lk.Unlock()
c = cc.c c = cc.c
r = cc.r r = cc.r
cc.c = nil cc.c = nil
cc.r = nil cc.r = nil
cc.lk.Unlock()
return return
} }
...@@ -261,20 +269,22 @@ func (cc *ClientConn) Write(req *Request) (err os.Error) { ...@@ -261,20 +269,22 @@ func (cc *ClientConn) Write(req *Request) (err os.Error) {
defer cc.lk.Unlock() defer cc.lk.Unlock()
return cc.we return cc.we
} }
cc.lk.Unlock() if cc.c == nil { // connection closed by user in the meantime
defer cc.lk.Unlock()
return os.EBADF
}
c := cc.c
if req.Close { if req.Close {
// We write the EOF to the write-side error, because there // We write the EOF to the write-side error, because there
// still might be some pipelined reads // still might be some pipelined reads
cc.lk.Lock()
cc.we = ErrPersistEOF cc.we = ErrPersistEOF
cc.lk.Unlock()
} }
cc.lk.Unlock()
err = req.Write(cc.c) err = req.Write(c)
if err != nil {
cc.lk.Lock() cc.lk.Lock()
defer cc.lk.Unlock() defer cc.lk.Unlock()
if err != nil {
cc.we = err cc.we = err
return err return err
} }
...@@ -316,15 +326,21 @@ func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) { ...@@ -316,15 +326,21 @@ func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) {
defer cc.lk.Unlock() defer cc.lk.Unlock()
return nil, cc.re return nil, cc.re
} }
if cc.r == nil { // connection closed by user in the meantime
defer cc.lk.Unlock()
return nil, os.EBADF
}
r := cc.r
lastbody := cc.lastbody
cc.lastbody = nil
cc.lk.Unlock() cc.lk.Unlock()
// Make sure body is fully consumed, even if user does not call body.Close // Make sure body is fully consumed, even if user does not call body.Close
if cc.lastbody != nil { if lastbody != nil {
// body.Close is assumed to be idempotent and multiple calls to // body.Close is assumed to be idempotent and multiple calls to
// it should return the error that its first invokation // it should return the error that its first invokation
// returned. // returned.
err = cc.lastbody.Close() err = lastbody.Close()
cc.lastbody = nil
if err != nil { if err != nil {
cc.lk.Lock() cc.lk.Lock()
defer cc.lk.Unlock() defer cc.lk.Unlock()
...@@ -333,24 +349,22 @@ func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) { ...@@ -333,24 +349,22 @@ func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) {
} }
} }
resp, err = ReadResponse(cc.r, req.Method) resp, err = ReadResponse(r, req.Method)
if err != nil {
cc.lk.Lock() cc.lk.Lock()
defer cc.lk.Unlock() defer cc.lk.Unlock()
if err != nil {
cc.re = err cc.re = err
return return resp, err
} }
cc.lastbody = resp.Body cc.lastbody = resp.Body
cc.nread++ cc.nread++
if resp.Close { if resp.Close {
cc.lk.Lock()
defer cc.lk.Unlock()
cc.re = ErrPersistEOF // don't send any more requests cc.re = ErrPersistEOF // don't send any more requests
return resp, cc.re return resp, cc.re
} }
return return resp, err
} }
// Do is convenience method that writes a request and reads a response. // Do is convenience method that writes a request and reads a response.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment