Commit 90f3cb13 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov Committed by Russ Cox

runtime: improve performance of sync channels

1. SudoG always contains a pointer to the element
(thus no variable size, and less copying).
2. chansend/chanrecv allocate SudoG on the stack.
3. Copying of elements and gorotuine notifications
are moved out of critical sections.

benchmark                        old ns/op    new ns/op    delta
BenchmarkSelectUncontended          515.00       514.00   -0.19%
BenchmarkSelectUncontended-2        291.00       281.00   -3.44%
BenchmarkSelectUncontended-4        213.00       189.00  -11.27%
BenchmarkSelectUncontended-8         78.30        79.00   +0.89%
BenchmarkSelectContended            518.00       514.00   -0.77%
BenchmarkSelectContended-2          655.00       631.00   -3.66%
BenchmarkSelectContended-4         1026.00      1051.00   +2.44%
BenchmarkSelectContended-8         2026.00      2128.00   +5.03%
BenchmarkSelectNonblock             175.00       173.00   -1.14%
BenchmarkSelectNonblock-2            85.10        87.70   +3.06%
BenchmarkSelectNonblock-4            60.10        43.30  -27.95%
BenchmarkSelectNonblock-8            37.60        25.50  -32.18%
BenchmarkChanUncontended            109.00       114.00   +4.59%
BenchmarkChanUncontended-2           54.60        57.20   +4.76%
BenchmarkChanUncontended-4           27.40        28.70   +4.74%
BenchmarkChanUncontended-8           14.60        15.10   +3.42%
BenchmarkChanContended              108.00       114.00   +5.56%
BenchmarkChanContended-2            621.00       617.00   -0.64%
BenchmarkChanContended-4            759.00       677.00  -10.80%
BenchmarkChanContended-8           1635.00      1517.00   -7.22%
BenchmarkChanSync                   299.00       256.00  -14.38%
BenchmarkChanSync-2                5055.00      4624.00   -8.53%
BenchmarkChanSync-4                4998.00      4680.00   -6.36%
BenchmarkChanSync-8                5019.00      4760.00   -5.16%
BenchmarkChanProdCons0              316.00       274.00  -13.29%
BenchmarkChanProdCons0-2           1280.00       617.00  -51.80%
BenchmarkChanProdCons0-4           2433.00      1332.00  -45.25%
BenchmarkChanProdCons0-8           3651.00      1934.00  -47.03%
BenchmarkChanProdCons10             153.00       152.00   -0.65%
BenchmarkChanProdCons10-2           626.00       581.00   -7.19%
BenchmarkChanProdCons10-4          1440.00      1323.00   -8.12%
BenchmarkChanProdCons10-8          2036.00      2017.00   -0.93%

R=rsc, ken
CC=golang-dev
https://golang.org/cl/4790042
parent ba2e3af1
...@@ -21,7 +21,7 @@ struct SudoG ...@@ -21,7 +21,7 @@ struct SudoG
int16 offset; // offset of case number int16 offset; // offset of case number
int8 isfree; // offset of case number int8 isfree; // offset of case number
SudoG* link; SudoG* link;
byte elem[8]; // synch data element (+ more) byte* elem; // data element
}; };
struct WaitQ struct WaitQ
...@@ -170,6 +170,7 @@ void ...@@ -170,6 +170,7 @@ void
runtime·chansend(Hchan *c, byte *ep, bool *pres) runtime·chansend(Hchan *c, byte *ep, bool *pres)
{ {
SudoG *sg; SudoG *sg;
SudoG mysg;
G* gp; G* gp;
if(c == nil) if(c == nil)
...@@ -185,7 +186,6 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres) ...@@ -185,7 +186,6 @@ runtime·chansend(Hchan *c, byte *ep, bool *pres)
} }
runtime·lock(c); runtime·lock(c);
loop:
if(c->closed) if(c->closed)
goto closed; goto closed;
...@@ -194,12 +194,12 @@ loop: ...@@ -194,12 +194,12 @@ loop:
sg = dequeue(&c->recvq, c); sg = dequeue(&c->recvq, c);
if(sg != nil) { if(sg != nil) {
if(ep != nil) runtime·unlock(c);
c->elemalg->copy(c->elemsize, sg->elem, ep);
gp = sg->g; gp = sg->g;
gp->param = sg; gp->param = sg;
runtime·unlock(c); if(sg->elem != nil)
c->elemalg->copy(c->elemsize, sg->elem, ep);
runtime·ready(gp); runtime·ready(gp);
if(pres != nil) if(pres != nil)
...@@ -213,21 +213,22 @@ loop: ...@@ -213,21 +213,22 @@ loop:
return; return;
} }
sg = allocsg(c); mysg.elem = ep;
if(ep != nil) mysg.g = g;
c->elemalg->copy(c->elemsize, sg->elem, ep); mysg.selgen = g->selgen;
g->param = nil; g->param = nil;
g->status = Gwaiting; g->status = Gwaiting;
enqueue(&c->sendq, sg); enqueue(&c->sendq, &mysg);
runtime·unlock(c); runtime·unlock(c);
runtime·gosched(); runtime·gosched();
if(g->param == nil) {
runtime·lock(c); runtime·lock(c);
sg = g->param; if(!c->closed)
if(sg == nil) runtime·throw("chansend: spurious wakeup");
goto loop; goto closed;
freesg(c, sg); }
runtime·unlock(c);
return; return;
asynch: asynch:
...@@ -240,16 +241,17 @@ asynch: ...@@ -240,16 +241,17 @@ asynch:
*pres = false; *pres = false;
return; return;
} }
sg = allocsg(c); mysg.g = g;
mysg.elem = nil;
mysg.selgen = g->selgen;
g->status = Gwaiting; g->status = Gwaiting;
enqueue(&c->sendq, sg); enqueue(&c->sendq, &mysg);
runtime·unlock(c); runtime·unlock(c);
runtime·gosched(); runtime·gosched();
runtime·lock(c); runtime·lock(c);
goto asynch; goto asynch;
} }
if(ep != nil)
c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep); c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
if(++c->sendx == c->dataqsiz) if(++c->sendx == c->dataqsiz)
c->sendx = 0; c->sendx = 0;
...@@ -258,7 +260,6 @@ asynch: ...@@ -258,7 +260,6 @@ asynch:
sg = dequeue(&c->recvq, c); sg = dequeue(&c->recvq, c);
if(sg != nil) { if(sg != nil) {
gp = sg->g; gp = sg->g;
freesg(c, sg);
runtime·unlock(c); runtime·unlock(c);
runtime·ready(gp); runtime·ready(gp);
} else } else
...@@ -277,6 +278,7 @@ void ...@@ -277,6 +278,7 @@ void
runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received) runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
{ {
SudoG *sg; SudoG *sg;
SudoG mysg;
G *gp; G *gp;
if(c == nil) if(c == nil)
...@@ -289,8 +291,6 @@ runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received) ...@@ -289,8 +291,6 @@ runtime·chanrecv(Hchan* c, byte *ep, bool *selected, bool *received)
runtime·printf("chanrecv: chan=%p\n", c); runtime·printf("chanrecv: chan=%p\n", c);
runtime·lock(c); runtime·lock(c);
loop:
if(c->dataqsiz > 0) if(c->dataqsiz > 0)
goto asynch; goto asynch;
...@@ -299,13 +299,12 @@ loop: ...@@ -299,13 +299,12 @@ loop:
sg = dequeue(&c->sendq, c); sg = dequeue(&c->sendq, c);
if(sg != nil) { if(sg != nil) {
runtime·unlock(c);
if(ep != nil) if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem); c->elemalg->copy(c->elemsize, ep, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
gp = sg->g; gp = sg->g;
gp->param = sg; gp->param = sg;
runtime·unlock(c);
runtime·ready(gp); runtime·ready(gp);
if(selected != nil) if(selected != nil)
...@@ -321,25 +320,24 @@ loop: ...@@ -321,25 +320,24 @@ loop:
return; return;
} }
sg = allocsg(c); mysg.elem = ep;
mysg.g = g;
mysg.selgen = g->selgen;
g->param = nil; g->param = nil;
g->status = Gwaiting; g->status = Gwaiting;
enqueue(&c->recvq, sg); enqueue(&c->recvq, &mysg);
runtime·unlock(c); runtime·unlock(c);
runtime·gosched(); runtime·gosched();
if(g->param == nil) {
runtime·lock(c); runtime·lock(c);
sg = g->param; if(!c->closed)
if(sg == nil) runtime·throw("chanrecv: spurious wakeup");
goto loop; goto closed;
}
if(ep != nil)
c->elemalg->copy(c->elemsize, ep, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
if(received != nil) if(received != nil)
*received = true; *received = true;
freesg(c, sg);
runtime·unlock(c);
return; return;
asynch: asynch:
...@@ -354,9 +352,11 @@ asynch: ...@@ -354,9 +352,11 @@ asynch:
*received = false; *received = false;
return; return;
} }
sg = allocsg(c); mysg.g = g;
mysg.elem = nil;
mysg.selgen = g->selgen;
g->status = Gwaiting; g->status = Gwaiting;
enqueue(&c->recvq, sg); enqueue(&c->recvq, &mysg);
runtime·unlock(c); runtime·unlock(c);
runtime·gosched(); runtime·gosched();
...@@ -369,10 +369,10 @@ asynch: ...@@ -369,10 +369,10 @@ asynch:
if(++c->recvx == c->dataqsiz) if(++c->recvx == c->dataqsiz)
c->recvx = 0; c->recvx = 0;
c->qcount--; c->qcount--;
sg = dequeue(&c->sendq, c); sg = dequeue(&c->sendq, c);
if(sg != nil) { if(sg != nil) {
gp = sg->g; gp = sg->g;
freesg(c, sg);
runtime·unlock(c); runtime·unlock(c);
runtime·ready(gp); runtime·ready(gp);
} else } else
...@@ -721,7 +721,6 @@ selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so ...@@ -721,7 +721,6 @@ selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so
cas->so = so; cas->so = so;
cas->kind = CaseRecv; cas->kind = CaseRecv;
cas->u.recv.elemp = elem; cas->u.recv.elemp = elem;
cas->u.recv.receivedp = nil;
cas->u.recv.receivedp = received; cas->u.recv.receivedp = received;
if(debug) if(debug)
...@@ -911,6 +910,7 @@ loop: ...@@ -911,6 +910,7 @@ loop:
} }
if(dfl != nil) { if(dfl != nil) {
selunlock(sel);
cas = dfl; cas = dfl;
goto retc; goto retc;
} }
...@@ -926,12 +926,12 @@ loop: ...@@ -926,12 +926,12 @@ loop:
switch(cas->kind) { switch(cas->kind) {
case CaseRecv: case CaseRecv:
sg->elem = cas->u.recv.elemp;
enqueue(&c->recvq, sg); enqueue(&c->recvq, sg);
break; break;
case CaseSend: case CaseSend:
if(c->dataqsiz == 0) sg->elem = cas->u.elem;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg); enqueue(&c->sendq, sg);
break; break;
} }
...@@ -965,10 +965,8 @@ loop: ...@@ -965,10 +965,8 @@ loop:
cas = sel->scase[o]; cas = sel->scase[o];
c = cas->chan; c = cas->chan;
if(c->dataqsiz > 0) { if(c->dataqsiz > 0)
// prints("shouldnt happen\n"); runtime·throw("selectgo: shouldnt happen");
goto loop;
}
if(debug) if(debug)
runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n", runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d o=%d\n",
...@@ -977,12 +975,10 @@ loop: ...@@ -977,12 +975,10 @@ loop:
if(cas->kind == CaseRecv) { if(cas->kind == CaseRecv) {
if(cas->u.recv.receivedp != nil) if(cas->u.recv.receivedp != nil)
*cas->u.recv.receivedp = true; *cas->u.recv.receivedp = true;
if(cas->u.recv.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
} }
freesg(c, sg); freesg(c, sg);
selunlock(sel);
goto retc; goto retc;
asyncrecv: asyncrecv:
...@@ -998,14 +994,15 @@ asyncrecv: ...@@ -998,14 +994,15 @@ asyncrecv:
sg = dequeue(&c->sendq, c); sg = dequeue(&c->sendq, c);
if(sg != nil) { if(sg != nil) {
gp = sg->g; gp = sg->g;
freesg(c, sg); selunlock(sel);
runtime·ready(gp); runtime·ready(gp);
} else {
selunlock(sel);
} }
goto retc; goto retc;
asyncsend: asyncsend:
// can send to buffer // can send to buffer
if(cas->u.elem != nil)
c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem); c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->u.elem);
if(++c->sendx == c->dataqsiz) if(++c->sendx == c->dataqsiz)
c->sendx = 0; c->sendx = 0;
...@@ -1013,20 +1010,22 @@ asyncsend: ...@@ -1013,20 +1010,22 @@ asyncsend:
sg = dequeue(&c->recvq, c); sg = dequeue(&c->recvq, c);
if(sg != nil) { if(sg != nil) {
gp = sg->g; gp = sg->g;
freesg(c, sg); selunlock(sel);
runtime·ready(gp); runtime·ready(gp);
} else {
selunlock(sel);
} }
goto retc; goto retc;
syncrecv: syncrecv:
// can receive from sleeping sender (sg) // can receive from sleeping sender (sg)
selunlock(sel);
if(debug) if(debug)
runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o); runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
if(cas->u.recv.receivedp != nil) if(cas->u.recv.receivedp != nil)
*cas->u.recv.receivedp = true; *cas->u.recv.receivedp = true;
if(cas->u.recv.elemp != nil) if(cas->u.recv.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem); c->elemalg->copy(c->elemsize, cas->u.recv.elemp, sg->elem);
c->elemalg->copy(c->elemsize, sg->elem, nil);
gp = sg->g; gp = sg->g;
gp->param = sg; gp->param = sg;
runtime·ready(gp); runtime·ready(gp);
...@@ -1034,6 +1033,7 @@ syncrecv: ...@@ -1034,6 +1033,7 @@ syncrecv:
rclose: rclose:
// read at end of closed channel // read at end of closed channel
selunlock(sel);
if(cas->u.recv.receivedp != nil) if(cas->u.recv.receivedp != nil)
*cas->u.recv.receivedp = false; *cas->u.recv.receivedp = false;
if(cas->u.recv.elemp != nil) if(cas->u.recv.elemp != nil)
...@@ -1042,18 +1042,16 @@ rclose: ...@@ -1042,18 +1042,16 @@ rclose:
syncsend: syncsend:
// can send to sleeping receiver (sg) // can send to sleeping receiver (sg)
selunlock(sel);
if(debug) if(debug)
runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o); runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
if(c->closed) if(sg->elem != nil)
goto sclose;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem); c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g; gp = sg->g;
gp->param = sg; gp->param = sg;
runtime·ready(gp); runtime·ready(gp);
retc: retc:
selunlock(sel);
// return to pc corresponding to chosen case // return to pc corresponding to chosen case
pc = cas->pc; pc = cas->pc;
as = (byte*)selp + cas->so; as = (byte*)selp + cas->so;
...@@ -1093,7 +1091,6 @@ runtime·closechan(Hchan *c) ...@@ -1093,7 +1091,6 @@ runtime·closechan(Hchan *c)
break; break;
gp = sg->g; gp = sg->g;
gp->param = nil; gp->param = nil;
freesg(c, sg);
runtime·ready(gp); runtime·ready(gp);
} }
...@@ -1104,7 +1101,6 @@ runtime·closechan(Hchan *c) ...@@ -1104,7 +1101,6 @@ runtime·closechan(Hchan *c)
break; break;
gp = sg->g; gp = sg->g;
gp->param = nil; gp->param = nil;
freesg(c, sg);
runtime·ready(gp); runtime·ready(gp);
} }
...@@ -1203,7 +1199,7 @@ allocsg(Hchan *c) ...@@ -1203,7 +1199,7 @@ allocsg(Hchan *c)
if(sg != nil) { if(sg != nil) {
c->free = sg->link; c->free = sg->link;
} else } else
sg = runtime·mal(sizeof(*sg) + c->elemsize - sizeof(sg->elem)); sg = runtime·mal(sizeof(*sg));
sg->selgen = g->selgen; sg->selgen = g->selgen;
sg->g = g; sg->g = g;
sg->offset = 0; sg->offset = 0;
...@@ -1220,6 +1216,7 @@ freesg(Hchan *c, SudoG *sg) ...@@ -1220,6 +1216,7 @@ freesg(Hchan *c, SudoG *sg)
runtime·throw("chan.freesg: already free"); runtime·throw("chan.freesg: already free");
sg->isfree = 1; sg->isfree = 1;
sg->link = c->free; sg->link = c->free;
sg->elem = nil;
c->free = sg; c->free = sg;
} }
} }
......
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime_test
import (
"runtime"
"sync/atomic"
"testing"
)
func BenchmarkSelectUncontended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
myc1 := make(chan int, 1)
myc2 := make(chan int, 1)
myc1 <- 0
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
select {
case <-myc1:
myc2 <- 0
case <-myc2:
myc1 <- 0
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkSelectContended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
myc1 := make(chan int, procs)
myc2 := make(chan int, procs)
for p := 0; p < procs; p++ {
myc1 <- 0
go func() {
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
select {
case <-myc1:
myc2 <- 0
case <-myc2:
myc1 <- 0
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkSelectNonblock(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
myc1 := make(chan int)
myc2 := make(chan int)
myc3 := make(chan int, 1)
myc4 := make(chan int, 1)
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
select {
case <-myc1:
default:
}
select {
case myc2 <- 0:
default:
}
select {
case <-myc3:
default:
}
select {
case myc4 <- 0:
default:
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkChanUncontended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
myc := make(chan int, CallsPerSched)
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
myc <- 0
}
for g := 0; g < CallsPerSched; g++ {
<-myc
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkChanContended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
myc := make(chan int, procs*CallsPerSched)
for p := 0; p < procs; p++ {
go func() {
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
myc <- 0
}
for g := 0; g < CallsPerSched; g++ {
<-myc
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkChanSync(b *testing.B) {
const CallsPerSched = 1000
procs := 2
N := int32(b.N / CallsPerSched / procs * procs)
c := make(chan bool, procs)
myc := make(chan int)
for p := 0; p < procs; p++ {
go func() {
for {
i := atomic.AddInt32(&N, -1)
if i < 0 {
break
}
for g := 0; g < CallsPerSched; g++ {
if i%2 == 0 {
<-myc
myc <- 0
} else {
myc <- 0
<-myc
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, 2*procs)
myc := make(chan int, chanSize)
for p := 0; p < procs; p++ {
go func() {
foo := 0
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
myc <- 1
}
}
myc <- 0
c <- foo == 42
}()
go func() {
foo := 0
for {
v := <-myc
if v == 0 {
break
}
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
}
c <- foo == 42
}()
}
for p := 0; p < procs; p++ {
<-c
<-c
}
}
func BenchmarkChanProdCons0(b *testing.B) {
benchmarkChanProdCons(b, 0, 0)
}
func BenchmarkChanProdCons10(b *testing.B) {
benchmarkChanProdCons(b, 10, 0)
}
func BenchmarkChanProdCons100(b *testing.B) {
benchmarkChanProdCons(b, 100, 0)
}
func BenchmarkChanProdConsWork0(b *testing.B) {
benchmarkChanProdCons(b, 0, 100)
}
func BenchmarkChanProdConsWork10(b *testing.B) {
benchmarkChanProdCons(b, 10, 100)
}
func BenchmarkChanProdConsWork100(b *testing.B) {
benchmarkChanProdCons(b, 100, 100)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment