Commit 3fc7cfd5 authored by Rob Pike's avatar Rob Pike

improve server handling of errors now that Decoder grabs full message.

R=rsc
DELTA=23  (4 added, 7 deleted, 12 changed)
OCL=31701
CL=31703
parent e76a335a
...@@ -66,7 +66,7 @@ func (client *Client) send(c *Call) { ...@@ -66,7 +66,7 @@ func (client *Client) send(c *Call) {
client.sending.Unlock(); client.sending.Unlock();
} }
func (client *Client) serve() { func (client *Client) input() {
var err os.Error; var err os.Error;
for err == nil { for err == nil {
response := new(Response); response := new(Response);
...@@ -107,7 +107,7 @@ func NewClient(conn io.ReadWriteCloser) *Client { ...@@ -107,7 +107,7 @@ func NewClient(conn io.ReadWriteCloser) *Client {
client.enc = gob.NewEncoder(conn); client.enc = gob.NewEncoder(conn);
client.dec = gob.NewDecoder(conn); client.dec = gob.NewDecoder(conn);
client.pending = make(map[uint64] *Call); client.pending = make(map[uint64] *Call);
go client.serve(); go client.input();
return client; return client;
} }
......
...@@ -14,7 +14,6 @@ import ( ...@@ -14,7 +14,6 @@ import (
"reflect"; "reflect";
"strings"; "strings";
"sync"; "sync";
"time"; // See TODO in serve()
"unicode"; "unicode";
"utf8"; "utf8";
) )
...@@ -174,7 +173,7 @@ func (s *service) call(sending *sync.Mutex, function *reflect.FuncValue, req *Re ...@@ -174,7 +173,7 @@ func (s *service) call(sending *sync.Mutex, function *reflect.FuncValue, req *Re
sendResponse(sending, req, replyv.Interface(), enc, errmsg); sendResponse(sending, req, replyv.Interface(), enc, errmsg);
} }
func (server *serverType) serve(conn io.ReadWriteCloser) { func (server *serverType) input(conn io.ReadWriteCloser) {
dec := gob.NewDecoder(conn); dec := gob.NewDecoder(conn);
enc := gob.NewEncoder(conn); enc := gob.NewEncoder(conn);
sending := new(sync.Mutex); sending := new(sync.Mutex);
...@@ -183,28 +182,32 @@ func (server *serverType) serve(conn io.ReadWriteCloser) { ...@@ -183,28 +182,32 @@ func (server *serverType) serve(conn io.ReadWriteCloser) {
req := new(Request); req := new(Request);
err := dec.Decode(req); err := dec.Decode(req);
if err != nil { if err != nil {
if err == os.EOF || err == io.ErrUnexpectedEOF {
log.Stderr("rpc: ", err);
break;
}
s := "rpc: server cannot decode request: " + err.String(); s := "rpc: server cannot decode request: " + err.String();
sendResponse(sending, req, invalidRequest, enc, s); sendResponse(sending, req, invalidRequest, enc, s);
break; continue;
} }
serviceMethod := strings.Split(req.ServiceMethod, ".", 0); serviceMethod := strings.Split(req.ServiceMethod, ".", 0);
if len(serviceMethod) != 2 { if len(serviceMethod) != 2 {
s := "rpc: service/method request ill:formed: " + req.ServiceMethod; s := "rpc: service/method request ill:formed: " + req.ServiceMethod;
sendResponse(sending, req, invalidRequest, enc, s); sendResponse(sending, req, invalidRequest, enc, s);
break; continue;
} }
// Look up the request. // Look up the request.
service, ok := server.serviceMap[serviceMethod[0]]; service, ok := server.serviceMap[serviceMethod[0]];
if !ok { if !ok {
s := "rpc: can't find service " + req.ServiceMethod; s := "rpc: can't find service " + req.ServiceMethod;
sendResponse(sending, req, invalidRequest, enc, s); sendResponse(sending, req, invalidRequest, enc, s);
break; continue;
} }
mtype, ok := service.method[serviceMethod[1]]; mtype, ok := service.method[serviceMethod[1]];
if !ok { if !ok {
s := "rpc: can't find method " + req.ServiceMethod; s := "rpc: can't find method " + req.ServiceMethod;
sendResponse(sending, req, invalidRequest, enc, s); sendResponse(sending, req, invalidRequest, enc, s);
break; continue;
} }
method := mtype.method; method := mtype.method;
// Decode the argument value. // Decode the argument value.
...@@ -212,18 +215,12 @@ func (server *serverType) serve(conn io.ReadWriteCloser) { ...@@ -212,18 +215,12 @@ func (server *serverType) serve(conn io.ReadWriteCloser) {
replyv := _new(mtype.replyType); replyv := _new(mtype.replyType);
err = dec.Decode(argv.Interface()); err = dec.Decode(argv.Interface());
if err != nil { if err != nil {
log.Stderr("tearing down connection:", err); log.Stderr("rpc: tearing down", serviceMethod[0], "connection:", err);
sendResponse(sending, req, replyv.Interface(), enc, err.String()); sendResponse(sending, req, replyv.Interface(), enc, err.String());
break; continue;
} }
go service.call(sending, method.Func, req, argv, replyv, enc); go service.call(sending, method.Func, req, argv, replyv, enc);
} }
// TODO(r): Gobs cannot handle unexpected data yet. Once they can, we can
// ignore it and the connection can persist. For now, though, bad data
// ruins the connection and we must shut down. The sleep is necessary to
// guarantee all the data gets out before we close the connection, so the
// client can see the error description.
time.Sleep(2e9);
conn.Close(); conn.Close();
} }
...@@ -233,7 +230,7 @@ func (server *serverType) accept(lis net.Listener) { ...@@ -233,7 +230,7 @@ func (server *serverType) accept(lis net.Listener) {
if err != nil { if err != nil {
log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit? log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit?
} }
go server.serve(conn); go server.input(conn);
} }
} }
...@@ -250,7 +247,7 @@ func Add(rcvr interface{}) os.Error { ...@@ -250,7 +247,7 @@ func Add(rcvr interface{}) os.Error {
// ServeConn runs the server on a single connection. When the connection // ServeConn runs the server on a single connection. When the connection
// completes, service terminates. // completes, service terminates.
func ServeConn(conn io.ReadWriteCloser) { func ServeConn(conn io.ReadWriteCloser) {
go server.serve(conn) go server.input(conn)
} }
// Accept accepts connections on the listener and serves requests // Accept accepts connections on the listener and serves requests
...@@ -276,7 +273,7 @@ func serveHTTP(c *http.Conn, req *http.Request) { ...@@ -276,7 +273,7 @@ func serveHTTP(c *http.Conn, req *http.Request) {
return; return;
} }
io.WriteString(conn, "HTTP/1.0 " + connected + "\n\n"); io.WriteString(conn, "HTTP/1.0 " + connected + "\n\n");
server.serve(conn); server.input(conn);
} }
// HandleHTTP registers an HTTP handler for RPC messages. // HandleHTTP registers an HTTP handler for RPC messages.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment