Commit 6defc25c authored by Russ Cox's avatar Russ Cox

Publish types PipeReader and PipeWriter

to expose new CloseWithError methods.

R=r
DELTA=161  (72 added, 15 deleted, 74 changed)
OCL=29980
CL=30003
parent 3c06bd62
...@@ -21,7 +21,9 @@ type pipeReturn struct { ...@@ -21,7 +21,9 @@ type pipeReturn struct {
// Shared pipe structure. // Shared pipe structure.
type pipe struct { type pipe struct {
rclosed bool; // Read end closed? rclosed bool; // Read end closed?
rerr os.Error; // Error supplied to CloseReader
wclosed bool; // Write end closed? wclosed bool; // Write end closed?
werr os.Error; // Error supplied to CloseWriter
wpend []byte; // Written data waiting to be read. wpend []byte; // Written data waiting to be read.
wtot int; // Bytes consumed so far in current write. wtot int; // Bytes consumed so far in current write.
cr chan []byte; // Write sends data here... cr chan []byte; // Write sends data here...
...@@ -39,7 +41,7 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) { ...@@ -39,7 +41,7 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) {
p.wpend = <-p.cr; p.wpend = <-p.cr;
} }
if p.wpend == nil { if p.wpend == nil {
return 0, nil; return 0, p.werr;
} }
p.wtot = 0; p.wtot = 0;
} }
...@@ -70,7 +72,7 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) { ...@@ -70,7 +72,7 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) {
return 0, os.EINVAL; return 0, os.EINVAL;
} }
if p.rclosed { if p.rclosed {
return 0, os.EPIPE; return 0, p.rerr;
} }
// Send data to reader. // Send data to reader.
...@@ -81,29 +83,34 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) { ...@@ -81,29 +83,34 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) {
return res.n, res.err; return res.n, res.err;
} }
func (p *pipe) CloseReader() os.Error { func (p *pipe) CloseReader(rerr os.Error) os.Error {
if p == nil || p.rclosed { if p == nil || p.rclosed {
return os.EINVAL; return os.EINVAL;
} }
// Stop any future writes. // Stop any future writes.
p.rclosed = true; p.rclosed = true;
if rerr == nil {
rerr = os.EPIPE;
}
p.rerr = rerr;
// Stop the current write. // Stop the current write.
if !p.wclosed { if !p.wclosed {
p.cw <- pipeReturn{p.wtot, os.EPIPE}; p.cw <- pipeReturn{p.wtot, rerr};
} }
return nil; return nil;
} }
func (p *pipe) CloseWriter() os.Error { func (p *pipe) CloseWriter(werr os.Error) os.Error {
if p == nil || p.wclosed { if p == nil || p.wclosed {
return os.EINVAL; return os.EINVAL;
} }
// Stop any future reads. // Stop any future reads.
p.wclosed = true; p.wclosed = true;
p.werr = werr;
// Stop the current read. // Stop the current read.
if !p.rclosed { if !p.rclosed {
...@@ -121,70 +128,98 @@ func (p *pipe) CloseWriter() os.Error { ...@@ -121,70 +128,98 @@ func (p *pipe) CloseWriter() os.Error {
// 2. Clients cannot use interface conversions on the // 2. Clients cannot use interface conversions on the
// read end to find the Write method, and vice versa. // read end to find the Write method, and vice versa.
// Read half of pipe. // A PipeReader is the read half of a pipe.
type pipeRead struct { type PipeReader struct {
lock sync.Mutex; lock sync.Mutex;
p *pipe; p *pipe;
} }
func (r *pipeRead) Read(data []byte) (n int, err os.Error) { // Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is nil.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
r.lock.Lock(); r.lock.Lock();
defer r.lock.Unlock(); defer r.lock.Unlock();
return r.p.Read(data); return r.p.Read(data);
} }
func (r *pipeRead) Close() os.Error { // Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
r.lock.Lock();
defer r.lock.Unlock();
return r.p.CloseReader(nil);
}
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error rerr.
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
r.lock.Lock(); r.lock.Lock();
defer r.lock.Unlock(); defer r.lock.Unlock();
return r.p.CloseReader(); return r.p.CloseReader(rerr);
} }
func (r *pipeRead) finish() { func (r *PipeReader) finish() {
r.Close(); r.Close();
} }
// Write half of pipe. // Write half of pipe.
type pipeWrite struct { type PipeWriter struct {
lock sync.Mutex; lock sync.Mutex;
p *pipe; p *pipe;
} }
func (w *pipeWrite) Write(data []byte) (n int, err os.Error) { // Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
w.lock.Lock(); w.lock.Lock();
defer w.lock.Unlock(); defer w.lock.Unlock();
return w.p.Write(data); return w.p.Write(data);
} }
func (w *pipeWrite) Close() os.Error { // Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and a nil error.
func (w *PipeWriter) Close() os.Error {
w.lock.Lock();
defer w.lock.Unlock();
return w.p.CloseWriter(nil);
}
// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error werr.
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
w.lock.Lock(); w.lock.Lock();
defer w.lock.Unlock(); defer w.lock.Unlock();
return w.p.CloseWriter(); return w.p.CloseWriter(werr);
} }
func (w *pipeWrite) finish() { func (w *PipeWriter) finish() {
w.Close(); w.Close();
} }
// Pipe creates a synchronous in-memory pipe. // Pipe creates a synchronous in-memory pipe.
// Used to connect code expecting an io.Reader // It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer. // with code expecting an io.Writer.
// // Reads on one end are matched with writes on the other,
// Reads on one end are matched by writes on the other. // copying data directly between the two; there is no internal buffering.
// Writes don't complete until all the data has been func Pipe() (*PipeReader, *PipeWriter) {
// written or the read end is closed. Reads return
// any available data or block until the next write
// or the write end is closed.
func Pipe() (io.ReadCloser, io.WriteCloser) {
p := new(pipe); p := new(pipe);
p.cr = make(chan []byte, 1); p.cr = make(chan []byte, 1);
p.cw = make(chan pipeReturn, 1); p.cw = make(chan pipeReturn, 1);
r := new(pipeRead); r := new(PipeReader);
r.p = p; r.p = p;
w := new(pipeWrite); w := new(PipeWriter);
w.p = p; w.p = p;
return r, w; return r, w;
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package io package io
import ( import (
"fmt";
"io"; "io";
"os"; "os";
"testing"; "testing";
...@@ -132,27 +133,58 @@ func TestPipe3(t *testing.T) { ...@@ -132,27 +133,58 @@ func TestPipe3(t *testing.T) {
// Test read after/before writer close. // Test read after/before writer close.
func delayClose(t *testing.T, cl Closer, ch chan int) { type closer interface {
time.Sleep(1000*1000); // 1 ms CloseWithError(os.Error) os.Error;
if err := cl.Close(); err != nil { Close() os.Error;
}
type pipeTest struct {
async bool;
err os.Error;
closeWithError bool;
}
func (p pipeTest) String() string {
return fmt.Sprintf("async=%v err=%v closeWithError=%v", p.async, p.err, p.closeWithError);
}
var pipeTests = []pipeTest {
pipeTest{ true, nil, false },
pipeTest{ true, nil, true },
pipeTest{ true, io.ErrShortWrite, true },
pipeTest{ false, nil, false },
pipeTest{ false, nil, true },
pipeTest{ false, io.ErrShortWrite, true },
}
func delayClose(t *testing.T, cl closer, ch chan int, tt pipeTest) {
time.Sleep(1e6); // 1 ms
var err os.Error;
if tt.closeWithError {
err = cl.CloseWithError(tt.err);
} else {
err = cl.Close();
}
if err != nil {
t.Errorf("delayClose: %v", err); t.Errorf("delayClose: %v", err);
} }
ch <- 0; ch <- 0;
} }
func testPipeReadClose(t *testing.T, async bool) { func TestPipeReadClose(t *testing.T) {
for _, tt := range pipeTests {
c := make(chan int, 1); c := make(chan int, 1);
r, w := Pipe(); r, w := Pipe();
if async { if tt.async {
go delayClose(t, w, c); go delayClose(t, w, c, tt);
} else { } else {
delayClose(t, w, c); delayClose(t, w, c, tt);
} }
var buf = make([]byte, 64); var buf = make([]byte, 64);
n, err := r.Read(buf); n, err := r.Read(buf);
<-c; <-c;
if err != nil { if err != tt.err {
t.Errorf("read from closed pipe: %v", err); t.Errorf("read from closed pipe: %v want %v", err, tt.err);
} }
if n != 0 { if n != 0 {
t.Errorf("read on closed pipe returned %d", n); t.Errorf("read on closed pipe returned %d", n);
...@@ -160,22 +192,28 @@ func testPipeReadClose(t *testing.T, async bool) { ...@@ -160,22 +192,28 @@ func testPipeReadClose(t *testing.T, async bool) {
if err = r.Close(); err != nil { if err = r.Close(); err != nil {
t.Errorf("r.Close: %v", err); t.Errorf("r.Close: %v", err);
} }
}
} }
// Test write after/before reader close. // Test write after/before reader close.
func testPipeWriteClose(t *testing.T, async bool) { func TestPipeWriteClose(t *testing.T) {
for _, tt := range pipeTests {
c := make(chan int, 1); c := make(chan int, 1);
r, w := Pipe(); r, w := Pipe();
if async { if tt.async {
go delayClose(t, r, c); go delayClose(t, r, c, tt);
} else { } else {
delayClose(t, r, c); delayClose(t, r, c, tt);
} }
n, err := WriteString(w, "hello, world"); n, err := WriteString(w, "hello, world");
<-c; <-c;
if err != os.EPIPE { expect := tt.err;
t.Errorf("write on closed pipe: %v", err); if expect == nil {
expect = os.EPIPE;
}
if err != expect {
t.Errorf("write on closed pipe: %v want %v", err, expect);
} }
if n != 0 { if n != 0 {
t.Errorf("write on closed pipe returned %d", n); t.Errorf("write on closed pipe returned %d", n);
...@@ -183,21 +221,5 @@ func testPipeWriteClose(t *testing.T, async bool) { ...@@ -183,21 +221,5 @@ func testPipeWriteClose(t *testing.T, async bool) {
if err = w.Close(); err != nil { if err = w.Close(); err != nil {
t.Errorf("w.Close: %v", err); t.Errorf("w.Close: %v", err);
} }
}
} }
func TestPipeReadCloseAsync(t *testing.T) {
testPipeReadClose(t, true);
}
func TestPipeReadCloseSync(t *testing.T) {
testPipeReadClose(t, false);
}
func TestPipeWriteCloseAsync(t *testing.T) {
testPipeWriteClose(t, true);
}
func TestPipeWriteCloseSync(t *testing.T) {
testPipeWriteClose(t, false);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment