Commit ee6e1a3f authored by Dmitriy Vyukov's avatar Dmitriy Vyukov Committed by Russ Cox

sync: add fast paths to WaitGroup

benchmark                                        old ns/op    new ns/op    delta
BenchmarkWaitGroupUncontended                        93.50        33.60  -64.06%
BenchmarkWaitGroupUncontended-2                      44.30        16.90  -61.85%
BenchmarkWaitGroupUncontended-4                      21.80         8.47  -61.15%
BenchmarkWaitGroupUncontended-8                      12.10         4.86  -59.83%
BenchmarkWaitGroupUncontended-16                      7.38         3.35  -54.61%
BenchmarkWaitGroupAddDone                            58.40        33.70  -42.29%
BenchmarkWaitGroupAddDone-2                         293.00        85.80  -70.72%
BenchmarkWaitGroupAddDone-4                         243.00        51.10  -78.97%
BenchmarkWaitGroupAddDone-8                         236.00        52.20  -77.88%
BenchmarkWaitGroupAddDone-16                        215.00        43.30  -79.86%
BenchmarkWaitGroupAddDoneWork                       826.00       794.00   -3.87%
BenchmarkWaitGroupAddDoneWork-2                     450.00       424.00   -5.78%
BenchmarkWaitGroupAddDoneWork-4                     277.00       220.00  -20.58%
BenchmarkWaitGroupAddDoneWork-8                     440.00       116.00  -73.64%
BenchmarkWaitGroupAddDoneWork-16                    569.00        66.50  -88.31%
BenchmarkWaitGroupWait                               29.00         8.04  -72.28%
BenchmarkWaitGroupWait-2                             74.10         4.15  -94.40%
BenchmarkWaitGroupWait-4                            117.00         2.30  -98.03%
BenchmarkWaitGroupWait-8                            111.00         1.31  -98.82%
BenchmarkWaitGroupWait-16                           104.00         1.27  -98.78%
BenchmarkWaitGroupWaitWork                          802.00       792.00   -1.25%
BenchmarkWaitGroupWaitWork-2                        411.00       401.00   -2.43%
BenchmarkWaitGroupWaitWork-4                        210.00       199.00   -5.24%
BenchmarkWaitGroupWaitWork-8                        206.00       105.00  -49.03%
BenchmarkWaitGroupWaitWork-16                       334.00        54.40  -83.71%

R=rsc
CC=golang-dev
https://golang.org/cl/4672050
parent 92c6061b
......@@ -85,3 +85,12 @@ addloop:
MOVL BX, retlo+12(FP)
MOVL CX, rethi+16(FP)
RET
TEXT ·LoadInt32(SB),7,$0
JMP ·LoadUint32(SB)
TEXT ·LoadUint32(SB),7,$0
MOVL addrptr+0(FP), AX
MOVL 0(AX), AX
MOVL AX, ret+4(FP)
RET
......@@ -57,3 +57,13 @@ TEXT ·AddUint64(SB),7,$0
ADDQ AX, CX
MOVQ CX, ret+16(FP)
RET
TEXT ·LoadInt32(SB),7,$0
JMP ·LoadUint32(SB)
TEXT ·LoadUint32(SB),7,$0
MOVQ addrptr+0(FP), AX
MOVL 0(AX), AX
MOVL AX, ret+8(FP)
RET
......@@ -83,3 +83,16 @@ TEXT ·AddInt64(SB),7,$0
TEXT ·AddUint64(SB),7,$0
B ·armAddUint64(SB)
TEXT ·LoadInt32(SB),7,$0
B ·LoadUint32(SB)
TEXT ·LoadUint32(SB),7,$0
MOVW addrptr+0(FP), R2
loadloop1:
MOVW 0(R2), R0
MOVW R0, R1
BL cas<>(SB)
BCC loadloop1
MOVW R0, val+4(FP)
RET
......@@ -308,6 +308,46 @@ func TestCompareAndSwapUintptr(t *testing.T) {
}
}
func TestLoadInt32(t *testing.T) {
var x struct {
before int32
i int32
after int32
}
x.before = magic32
x.after = magic32
for delta := int32(1); delta+delta > delta; delta += delta {
k := LoadInt32(&x.i)
if k != x.i {
t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
}
x.i += delta
}
if x.before != magic32 || x.after != magic32 {
t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
}
}
func TestLoadUint32(t *testing.T) {
var x struct {
before uint32
i uint32
after uint32
}
x.before = magic32
x.after = magic32
for delta := uint32(1); delta+delta > delta; delta += delta {
k := LoadUint32(&x.i)
if k != x.i {
t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
}
x.i += delta
}
if x.before != magic32 || x.after != magic32 {
t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
}
}
// Tests of correct behavior, with contention.
// (Is the function atomic?)
//
......@@ -537,3 +577,65 @@ func TestHammer64(t *testing.T) {
}
}
}
func hammerLoadInt32(t *testing.T, uval *uint32) {
val := (*int32)(unsafe.Pointer(uval))
for {
v := LoadInt32(val)
vlo := v & ((1 << 16) - 1)
vhi := v >> 16
if vlo != vhi {
t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi)
}
new := v + 1 + 1<<16
if vlo == 1e4 {
new = 0
}
if CompareAndSwapInt32(val, v, new) {
break
}
}
}
func hammerLoadUint32(t *testing.T, val *uint32) {
for {
v := LoadUint32(val)
vlo := v & ((1 << 16) - 1)
vhi := v >> 16
if vlo != vhi {
t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi)
}
new := v + 1 + 1<<16
if vlo == 1e4 {
new = 0
}
if CompareAndSwapUint32(val, v, new) {
break
}
}
}
func TestHammerLoad(t *testing.T) {
tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32}
n := 100000
if testing.Short() {
n = 10000
}
const procs = 8
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs))
for _, tt := range tests {
c := make(chan int)
var val uint32
for p := 0; p < procs; p++ {
go func() {
for i := 0; i < n; i++ {
tt(t, &val)
}
c <- 1
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
}
......@@ -56,6 +56,12 @@ func AddUint64(val *uint64, delta uint64) (new uint64)
// AddUintptr atomically adds delta to *val and returns the new value.
func AddUintptr(val *uintptr, delta uintptr) (new uintptr)
// LoadInt32 atomically loads *addr.
func LoadInt32(addr *int32) (val int32)
// LoadUint32 atomically loads *addr.
func LoadUint32(addr *uint32) (val uint32)
// Helper for ARM. Linker will discard on other systems
func panic64() {
panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)")
......
......@@ -4,7 +4,10 @@
package sync
import "runtime"
import (
"runtime"
"sync/atomic"
)
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
......@@ -28,8 +31,8 @@ import "runtime"
//
type WaitGroup struct {
m Mutex
counter int
waiters int
counter int32
waiters int32
sema *uint32
}
......@@ -48,19 +51,19 @@ type WaitGroup struct {
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait() are released.
func (wg *WaitGroup) Add(delta int) {
wg.m.Lock()
if delta < -wg.counter {
wg.m.Unlock()
v := atomic.AddInt32(&wg.counter, int32(delta))
if v < 0 {
panic("sync: negative WaitGroup count")
}
wg.counter += delta
if wg.counter == 0 && wg.waiters > 0 {
for i := 0; i < wg.waiters; i++ {
runtime.Semrelease(wg.sema)
}
wg.waiters = 0
wg.sema = nil
if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
return
}
wg.m.Lock()
for i := int32(0); i < wg.waiters; i++ {
runtime.Semrelease(wg.sema)
}
wg.waiters = 0
wg.sema = nil
wg.m.Unlock()
}
......@@ -71,12 +74,20 @@ func (wg *WaitGroup) Done() {
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
if atomic.LoadInt32(&wg.counter) == 0 {
return
}
wg.m.Lock()
if wg.counter == 0 {
atomic.AddInt32(&wg.waiters, 1)
// This code is racing with the unlocked path in Add above.
// The code above modifies counter and then reads waiters.
// We must modify waiters and then read counter (the opposite order)
// to avoid missing an Add.
if atomic.LoadInt32(&wg.counter) == 0 {
atomic.AddInt32(&wg.waiters, -1)
wg.m.Unlock()
return
}
wg.waiters++
if wg.sema == nil {
wg.sema = new(uint32)
}
......
......@@ -5,7 +5,9 @@
package sync_test
import (
"runtime"
. "sync"
"sync/atomic"
"testing"
)
......@@ -58,3 +60,106 @@ func TestWaitGroupMisuse(t *testing.T) {
wg.Done()
t.Fatal("Should panic")
}
func BenchmarkWaitGroupUncontended(b *testing.B) {
type PaddedWaitGroup struct {
WaitGroup
pad [128]uint8
}
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
var wg PaddedWaitGroup
for atomic.AddInt32(&N, -1) >= 0 {
runtime.Gosched()
for g := 0; g < CallsPerSched; g++ {
wg.Add(1)
wg.Done()
wg.Wait()
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
var wg WaitGroup
for p := 0; p < procs; p++ {
go func() {
foo := 0
for atomic.AddInt32(&N, -1) >= 0 {
runtime.Gosched()
for g := 0; g < CallsPerSched; g++ {
wg.Add(1)
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
wg.Done()
}
}
c <- foo == 42
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkWaitGroupAddDone(b *testing.B) {
benchmarkWaitGroupAddDone(b, 0)
}
func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
benchmarkWaitGroupAddDone(b, 100)
}
func benchmarkWaitGroupWait(b *testing.B, localWork int) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
var wg WaitGroup
wg.Add(procs)
for p := 0; p < procs; p++ {
go wg.Done()
}
for p := 0; p < procs; p++ {
go func() {
foo := 0
for atomic.AddInt32(&N, -1) >= 0 {
runtime.Gosched()
for g := 0; g < CallsPerSched; g++ {
wg.Wait()
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
}
}
c <- foo == 42
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkWaitGroupWait(b *testing.B) {
benchmarkWaitGroupWait(b, 0)
}
func BenchmarkWaitGroupWaitWork(b *testing.B) {
benchmarkWaitGroupWait(b, 100)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment