Commit 371eda45 authored by Joe Tsai's avatar Joe Tsai Committed by Joe Tsai

io: simplify pipe implementation

In the distant past, Pipe was implemented with channels and a
long running pipe.run goroutine (see CL 994043).
This approach of having all communication serialized through the
run method was error prone giving Pipe a history of deadlocks
and race conditions.

After the introduction of sync.Cond, the implementation was rewritten
(see CL 4252057) to use condition variables and avoid the
long running pipe.run goroutine. While this implementation is superior
to the previous one, this implementation is strange in that the
p.data field is always set immediately prior to signaling the other
goroutine with Cond.Signal, effectively making the combination of the
two a channel-like operation. Inferior to a channel, however, this still
requires explicit locking around the p.data field.

The data+rwait can be effectively be replaced by a "chan []byte" to
inform a reader that there is data available.
The data+wwait can be effectively be replaced by a "chan int" to
inform a writer of how many bytes were read.

This implementation is a simplified from net.Pipe in CL 37402.

Change-Id: Ia5b26320b0525934fd87a3b69a091c787167f5aa
Reviewed-on: https://go-review.googlesource.com/65330
Run-TryBot: Joe Tsai <thebrokentoaster@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarBryan Mills <bcmills@google.com>
parent e2dd8ca9
......@@ -35,7 +35,7 @@ import (
var pkgDeps = map[string][]string{
// L0 is the lowest level, core, nearly unavoidable packages.
"errors": {},
"io": {"errors", "sync"},
"io": {"errors", "sync", "sync/atomic"},
"runtime": {"unsafe", "runtime/internal/atomic", "runtime/internal/sys"},
"runtime/internal/sys": {},
"runtime/internal/atomic": {"unsafe", "runtime/internal/sys"},
......
......@@ -10,6 +10,7 @@ package io
import (
"errors"
"sync"
"sync/atomic"
)
// ErrClosedPipe is the error used for read or write operations on a closed pipe.
......@@ -17,103 +18,87 @@ var ErrClosedPipe = errors.New("io: read/write on closed pipe")
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
rl sync.Mutex // gates readers one at a time
wl sync.Mutex // gates writers one at a time
l sync.Mutex // protects remaining fields
data []byte // data remaining in pending write
rwait sync.Cond // waiting reader
wwait sync.Cond // waiting writer
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
}
func (p *pipe) read(b []byte) (n int, err error) {
// One reader at a time.
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return 0, ErrClosedPipe
}
if p.data != nil {
break
}
if p.werr != nil {
return 0, p.werr
}
p.rwait.Wait()
}
n = copy(b, p.data)
p.data = p.data[n:]
if len(p.data) == 0 {
p.data = nil
p.wwait.Signal()
}
return
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int
once sync.Once // Protects closing done
done chan struct{}
rerr atomic.Value
werr atomic.Value
}
var zero [0]byte
func (p *pipe) Read(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
func (p *pipe) write(b []byte) (n int, err error) {
// pipe uses nil to mean not available
if b == nil {
b = zero[:]
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
func (p *pipe) readCloseError() error {
_, rok := p.rerr.Load().(error)
if werr, wok := p.werr.Load().(error); !rok && wok {
return werr
}
return ErrClosedPipe
}
p.l.Lock()
defer p.l.Unlock()
if p.werr != nil {
func (p *pipe) CloseRead(err error) error {
if err == nil {
err = ErrClosedPipe
return
}
p.data = b
p.rwait.Signal()
for {
if p.data == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}
func (p *pipe) Write(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}
if p.werr != nil {
err = ErrClosedPipe
break
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
p.wwait.Wait()
}
n = len(b) - len(p.data)
p.data = nil // in case of rerr or werr
return
return n, nil
}
func (p *pipe) rclose(err error) {
if err == nil {
err = ErrClosedPipe
func (p *pipe) writeCloseError() error {
_, wok := p.werr.Load().(error)
if rerr, rok := p.rerr.Load().(error); !wok && rok {
return rerr
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
return ErrClosedPipe
}
func (p *pipe) wclose(err error) {
func (p *pipe) CloseWrite(err error) error {
if err == nil {
err = EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}
// A PipeReader is the read half of a pipe.
......@@ -127,7 +112,7 @@ type PipeReader struct {
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *PipeReader) Read(data []byte) (n int, err error) {
return r.p.read(data)
return r.p.Read(data)
}
// Close closes the reader; subsequent writes to the
......@@ -139,8 +124,7 @@ func (r *PipeReader) Close() error {
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
func (r *PipeReader) CloseWithError(err error) error {
r.p.rclose(err)
return nil
return r.p.CloseRead(err)
}
// A PipeWriter is the write half of a pipe.
......@@ -154,7 +138,7 @@ type PipeWriter struct {
// If the read end is closed with an error, that err is
// returned as err; otherwise err is ErrClosedPipe.
func (w *PipeWriter) Write(data []byte) (n int, err error) {
return w.p.write(data)
return w.p.Write(data)
}
// Close closes the writer; subsequent reads from the
......@@ -169,8 +153,7 @@ func (w *PipeWriter) Close() error {
//
// CloseWithError always returns nil.
func (w *PipeWriter) CloseWithError(err error) error {
w.p.wclose(err)
return nil
return w.p.CloseWrite(err)
}
// Pipe creates a synchronous in-memory pipe.
......@@ -189,10 +172,10 @@ func (w *PipeWriter) CloseWithError(err error) error {
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReader{p}
w := &PipeWriter{p}
return r, w
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}
......@@ -5,8 +5,11 @@
package io_test
import (
"bytes"
"fmt"
. "io"
"sort"
"strings"
"testing"
"time"
)
......@@ -312,3 +315,84 @@ func TestWriteAfterWriterClose(t *testing.T) {
t.Errorf("got: %q; want: %q", writeErr, ErrClosedPipe)
}
}
func TestPipeConcurrent(t *testing.T) {
const (
input = "0123456789abcdef"
count = 8
readSize = 2
)
t.Run("Write", func(t *testing.T) {
r, w := Pipe()
for i := 0; i < count; i++ {
go func() {
time.Sleep(time.Millisecond) // Increase probability of race
if n, err := w.Write([]byte(input)); n != len(input) || err != nil {
t.Errorf("Write() = (%d, %v); want (%d, nil)", n, err, len(input))
}
}()
}
buf := make([]byte, count*len(input))
for i := 0; i < len(buf); i += readSize {
if n, err := r.Read(buf[i : i+readSize]); n != readSize || err != nil {
t.Errorf("Read() = (%d, %v); want (%d, nil)", n, err, readSize)
}
}
// Since each Write is fully gated, if multiple Read calls were needed,
// the contents of Write should still appear together in the output.
got := string(buf)
want := strings.Repeat(input, count)
if got != want {
t.Errorf("got: %q; want: %q", got, want)
}
})
t.Run("Read", func(t *testing.T) {
r, w := Pipe()
c := make(chan []byte, count*len(input)/readSize)
for i := 0; i < cap(c); i++ {
go func() {
time.Sleep(time.Millisecond) // Increase probability of race
buf := make([]byte, readSize)
if n, err := r.Read(buf); n != readSize || err != nil {
t.Errorf("Read() = (%d, %v); want (%d, nil)", n, err, readSize)
}
c <- buf
}()
}
for i := 0; i < count; i++ {
if n, err := w.Write([]byte(input)); n != len(input) || err != nil {
t.Errorf("Write() = (%d, %v); want (%d, nil)", n, err, len(input))
}
}
// Since each read is independent, the only guarantee about the output
// is that it is a permutation of the input in readSized groups.
got := make([]byte, 0, count*len(input))
for i := 0; i < cap(c); i++ {
got = append(got, (<-c)...)
}
got = sortBytesInGroups(got, readSize)
want := bytes.Repeat([]byte(input), count)
want = sortBytesInGroups(want, readSize)
if string(got) != string(want) {
t.Errorf("got: %q; want: %q", got, want)
}
})
}
func sortBytesInGroups(b []byte, n int) []byte {
var groups [][]byte
for len(b) > 0 {
groups = append(groups, b[:n])
b = b[n:]
}
sort.Slice(groups, func(i, j int) bool { return bytes.Compare(groups[i], groups[j]) < 0 })
return bytes.Join(groups, nil)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment