Commit f6ceefa2 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

runtime: add fast paths to non-blocking channel operations

benchmark                      old ns/op     new ns/op     delta
BenchmarkChanNonblocking       27.8          7.80          -71.94%
BenchmarkChanNonblocking-2     79.1          3.94          -95.02%
BenchmarkChanNonblocking-4     71.2          2.04          -97.13%

LGTM=rsc
R=golang-codereviews, rsc, dave
CC=golang-codereviews
https://golang.org/cl/110580043
parent 639dc6c7
......@@ -96,17 +96,37 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
println("chansend: chan=", c)
}
if raceenabled {
fn := chansend
pc := **(**uintptr)(unsafe.Pointer(&fn))
racereadpc(unsafe.Pointer(c), pc, callerpc)
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = gocputicks()
}
golock(&c.lock)
if raceenabled {
fn := chansend
pc := **(**uintptr)(unsafe.Pointer(&fn))
racereadpc(unsafe.Pointer(c), pc, callerpc)
}
if c.closed != 0 {
gounlock(&c.lock)
panic("send on closed channel")
......
......@@ -57,6 +57,27 @@ chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
runtime·prints("\n");
}
if(raceenabled)
runtime·racereadpc(c, pc, chansend);
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if(!block && !c->closed && ((c->dataqsiz == 0 && c->recvq.first == nil) ||
(c->dataqsiz > 0 && c->qcount == c->dataqsiz)))
return false;
t0 = 0;
mysg.releasetime = 0;
if(runtime·blockprofilerate > 0) {
......@@ -65,8 +86,6 @@ chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
}
runtime·lock(&c->lock);
if(raceenabled)
runtime·racereadpc(c, pc, chansend);
if(c->closed)
goto closed;
......@@ -183,6 +202,23 @@ chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
return false; // not reached
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if(!block && ((c->dataqsiz == 0 && c->sendq.first == nil) ||
(c->dataqsiz > 0 && runtime·atomicloadp((void**)&c->qcount) == 0)) &&
!runtime·atomicload(&c->closed))
return false;
t0 = 0;
mysg.releasetime = 0;
if(runtime·blockprofilerate > 0) {
......
......@@ -20,7 +20,7 @@ struct Hchan
uintgo dataqsiz; // size of the circular q
byte* buf;
uint16 elemsize;
bool closed;
uint32 closed;
Type* elemtype; // element type
uintgo sendx; // send index
uintgo recvx; // receive index
......
......@@ -198,6 +198,26 @@ func TestChan(t *testing.T) {
}
}
func TestNonblockRecvRace(t *testing.T) {
n := 10000
if testing.Short() {
n = 100
}
for i := 0; i < n; i++ {
c := make(chan int, 1)
c <- 1
go func() {
select {
case <-c:
default:
t.Fatal("chan is not ready")
}
}()
close(c)
<-c
}
}
func TestSelfSelect(t *testing.T) {
// Ensure that send/recv on the same chan in select
// does not crash nor deadlock.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment