Commit da705c62 authored by Rob Pike's avatar Rob Pike

netchan: provide a method (Importer.Errors()) to recover protocol errors.

R=rsc
CC=golang-dev
https://golang.org/cl/2229044
parent 8a313e20
...@@ -66,7 +66,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient { ...@@ -66,7 +66,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient {
func (client *expClient) sendError(hdr *header, err string) { func (client *expClient) sendError(hdr *header, err string) {
error := &error{err} error := &error{err}
expLog("sending error to client", error.error) expLog("sending error to client:", error.error)
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
client.mu.Lock() client.mu.Lock()
client.errored = true client.errored = true
......
...@@ -28,6 +28,7 @@ type Importer struct { ...@@ -28,6 +28,7 @@ type Importer struct {
conn net.Conn conn net.Conn
chanLock sync.Mutex // protects access to channel map chanLock sync.Mutex // protects access to channel map
chans map[string]*chanDir chans map[string]*chanDir
errors chan os.Error
} }
// NewImporter creates a new Importer object to import channels // NewImporter creates a new Importer object to import channels
...@@ -43,6 +44,7 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) { ...@@ -43,6 +44,7 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
imp.encDec = newEncDec(conn) imp.encDec = newEncDec(conn)
imp.conn = conn imp.conn = conn
imp.chans = make(map[string]*chanDir) imp.chans = make(map[string]*chanDir)
imp.errors = make(chan os.Error, 10)
go imp.run() go imp.run()
return imp, nil return imp, nil
} }
...@@ -86,15 +88,18 @@ func (imp *Importer) run() { ...@@ -86,15 +88,18 @@ func (imp *Importer) run() {
} }
if err.error != "" { if err.error != "" {
impLog("response error:", err.error) impLog("response error:", err.error)
imp.shutdown() if sent := imp.errors <- os.ErrorString(err.error); !sent {
return imp.shutdown()
return
}
continue // errors are not acknowledged.
} }
case payClosed: case payClosed:
ich := imp.getChan(hdr.name) ich := imp.getChan(hdr.name)
if ich != nil { if ich != nil {
ich.ch.Close() ich.ch.Close()
} }
continue continue // closes are not acknowledged.
default: default:
impLog("unexpected payload type:", hdr.payloadType) impLog("unexpected payload type:", hdr.payloadType)
return return
...@@ -132,6 +137,14 @@ func (imp *Importer) getChan(name string) *chanDir { ...@@ -132,6 +137,14 @@ func (imp *Importer) getChan(name string) *chanDir {
return ich return ich
} }
// Errors returns a channel from which transmission and protocol errors
// can be read. Clients of the importer are not required to read the error
// channel for correct execution. However, if too many errors occur
// without being read from the error channel, the importer will shut down.
func (imp *Importer) Errors() chan os.Error {
return imp.errors
}
// Import imports a channel of the given type and specified direction. // Import imports a channel of the given type and specified direction.
// It is equivalent to ImportNValues with a count of -1, meaning unbounded. // It is equivalent to ImportNValues with a count of -1, meaning unbounded.
func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error { func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error {
......
...@@ -4,7 +4,11 @@ ...@@ -4,7 +4,11 @@
package netchan package netchan
import "testing" import (
"strings"
"testing"
"time"
)
const count = 10 // number of items in most tests const count = 10 // number of items in most tests
const closeCount = 5 // number of items when sender closes early const closeCount = 5 // number of items when sender closes early
...@@ -134,6 +138,45 @@ func TestClosingImportSendExportReceive(t *testing.T) { ...@@ -134,6 +138,45 @@ func TestClosingImportSendExportReceive(t *testing.T) {
exportReceive(exp, t) exportReceive(exp, t)
} }
func TestErrorForIllegalChannel(t *testing.T) {
exp, err := NewExporter("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal("new exporter:", err)
}
imp, err := NewImporter("tcp", exp.Addr().String())
if err != nil {
t.Fatal("new importer:", err)
}
// Now export a channel.
ch := make(chan int, 1)
err = exp.Export("aChannel", ch, Send)
if err != nil {
t.Fatal("export:", err)
}
ch <- 1234
close(ch)
// Now try to import a different channel.
ch = make(chan int)
err = imp.Import("notAChannel", ch, Recv)
if err != nil {
t.Fatal("import:", err)
}
// Expect an error now. Start a timeout.
timeout := make(chan bool, 1) // buffered so closure will not hang around.
go func() {
time.Sleep(10e9) // very long, to give even really slow machines a chance.
timeout <- true
}()
select {
case err = <-imp.Errors():
if strings.Index(err.String(), "no such channel") < 0 {
t.Errorf("wrong error for nonexistent channel:", err)
}
case <-timeout:
t.Error("import of nonexistent channel did not receive an error")
}
}
// Not a great test but it does at least invoke Drain. // Not a great test but it does at least invoke Drain.
func TestExportDrain(t *testing.T) { func TestExportDrain(t *testing.T) {
exp, err := NewExporter("tcp", "127.0.0.1:0") exp, err := NewExporter("tcp", "127.0.0.1:0")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment