Commit c3922f0a authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

testing: ease writing parallel benchmarks

Add b.RunParallel function that captures parallel benchmark boilerplate:
creates worker goroutines, joins worker goroutines, distributes work
among them in an efficient way, auto-tunes grain size.
Fixes #7090.

R=bradfitz, iant, josharian, tracey.brendan, r, rsc, gobot
CC=golang-codereviews
https://golang.org/cl/57270043
parent a1aee55b
......@@ -4,5 +4,6 @@ misc/dist: renamed misc/makerelease (CL 39920043)
runtime: output how long goroutines are blocked (CL 50420043)
syscall: add NewCallbackCDecl to use for windows callbacks (CL 36180044)
testing: diagnose buggy tests that panic(nil) (CL 55780043)
testing: add b.RunParallel function (CL 57270043)
misc/benchcmp has been replaced by go tool benchcmp (CL 47980043)
cmd/go, go/build: support .m files (CL 60590044)
......@@ -455,17 +455,11 @@ func BenchmarkChanNonblocking(b *testing.B) {
}
func BenchmarkSelectUncontended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
b.RunParallel(func(pb *testing.PB) {
myc1 := make(chan int, 1)
myc2 := make(chan int, 1)
myc1 <- 0
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
for pb.Next() {
select {
case <-myc1:
myc2 <- 0
......@@ -473,27 +467,16 @@ func BenchmarkSelectUncontended(b *testing.B) {
myc1 <- 0
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
})
}
func BenchmarkSelectContended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
procs := runtime.GOMAXPROCS(0)
myc1 := make(chan int, procs)
myc2 := make(chan int, procs)
for p := 0; p < procs; p++ {
b.RunParallel(func(pb *testing.PB) {
myc1 <- 0
go func() {
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
for pb.Next() {
select {
case <-myc1:
myc2 <- 0
......@@ -501,28 +484,16 @@ func BenchmarkSelectContended(b *testing.B) {
myc1 <- 0
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
})
}
func BenchmarkSelectNonblock(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
b.RunParallel(func(pb *testing.PB) {
myc1 := make(chan int)
myc2 := make(chan int)
myc3 := make(chan int, 1)
myc4 := make(chan int, 1)
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
for pb.Next() {
select {
case <-myc1:
default:
......@@ -540,61 +511,37 @@ func BenchmarkSelectNonblock(b *testing.B) {
default:
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
})
}
func BenchmarkChanUncontended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
myc := make(chan int, CallsPerSched)
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
const C = 100
b.RunParallel(func(pb *testing.PB) {
myc := make(chan int, C)
for pb.Next() {
for i := 0; i < C; i++ {
myc <- 0
}
for g := 0; g < CallsPerSched; g++ {
for i := 0; i < C; i++ {
<-myc
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
})
}
func BenchmarkChanContended(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
myc := make(chan int, procs*CallsPerSched)
for p := 0; p < procs; p++ {
go func() {
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
const C = 100
myc := make(chan int, C*runtime.GOMAXPROCS(0))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < C; i++ {
myc <- 0
}
for g := 0; g < CallsPerSched; g++ {
for i := 0; i < C; i++ {
<-myc
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
})
}
func BenchmarkChanSync(b *testing.B) {
......@@ -755,25 +702,13 @@ func BenchmarkSelectProdCons(b *testing.B) {
}
func BenchmarkChanCreation(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, procs)
for p := 0; p < procs; p++ {
go func() {
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
myc := make(chan int, 1)
myc <- 0
<-myc
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
})
}
func BenchmarkChanSem(b *testing.B) {
......
......@@ -10,6 +10,7 @@ import (
"os"
"runtime"
"sync"
"sync/atomic"
"time"
)
......@@ -35,11 +36,14 @@ type InternalBenchmark struct {
type B struct {
common
N int
previousN int // number of iterations in the previous run
previousDuration time.Duration // total duration of the previous run
benchmark InternalBenchmark
bytes int64
timerOn bool
showAllocResult bool
result BenchmarkResult
parallelism int // RunParallel creates parallelism*GOMAXPROCS goroutines
// The initial states of memStats.Mallocs and memStats.TotalAlloc.
startAllocs uint64
startBytes uint64
......@@ -114,10 +118,13 @@ func (b *B) runN(n int) {
// by clearing garbage from previous runs.
runtime.GC()
b.N = n
b.parallelism = 1
b.ResetTimer()
b.StartTimer()
b.benchmark.F(b)
b.StopTimer()
b.previousN = n
b.previousDuration = b.duration
}
func min(x, y int) int {
......@@ -343,6 +350,84 @@ func (b *B) trimOutput() {
}
}
// A PB is used by RunParallel for running parallel benchmarks.
type PB struct {
globalN *uint64 // shared between all worker goroutines iteration counter
grain uint64 // acquire that many iterations from globalN at once
cache uint64 // local cache of acquired iterations
bN uint64 // total number of iterations to execute (b.N)
}
// Next reports whether there are more iterations to execute.
func (pb *PB) Next() bool {
if pb.cache == 0 {
n := atomic.AddUint64(pb.globalN, pb.grain)
if n <= pb.bN {
pb.cache = pb.grain
} else if n < pb.bN+pb.grain {
pb.cache = pb.bN + pb.grain - n
} else {
return false
}
}
pb.cache--
return true
}
// RunParallel runs a benchmark in parallel.
// It creates multiple goroutines and distributes b.N iterations among them.
// The number of goroutines defaults to GOMAXPROCS. To increase parallelism for
// non-CPU-bound benchmarks, call SetParallelism before RunParallel.
// RunParallel is usually used with the go test -cpu flag.
//
// The body function will be run in each goroutine. It should set up any
// goroutine-local state and then iterate until pb.Next returns false.
// It should not use the StartTimer, StopTimer, or ResetTimer functions,
// because they have global effect.
func (b *B) RunParallel(body func(*PB)) {
// Calculate grain size as number of iterations that take ~100µs.
// 100µs is enough to amortize the overhead and provide sufficient
// dynamic load balancing.
grain := uint64(0)
if b.previousN > 0 && b.previousDuration > 0 {
grain = 1e5 * uint64(b.previousN) / uint64(b.previousDuration)
}
if grain < 1 {
grain = 1
}
// We expect the inner loop and function call to take at least 10ns,
// so do not do more than 100µs/10ns=1e4 iterations.
if grain > 1e4 {
grain = 1e4
}
n := uint64(0)
numProcs := b.parallelism * runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
wg.Add(numProcs)
for p := 0; p < numProcs; p++ {
go func() {
defer wg.Done()
pb := &PB{
globalN: &n,
grain: grain,
bN: uint64(b.N),
}
body(pb)
}()
}
wg.Wait()
}
// SetParallelism sets the number of goroutines used by RunParallel to p*GOMAXPROCS.
// There is usually no need to call SetParallelism for CPU-bound benchmarks.
// If p is less than 1, this call will have no effect.
func (b *B) SetParallelism(p int) {
if p >= 1 {
b.parallelism = p
}
}
// Benchmark benchmarks a single function. Useful for creating
// custom benchmarks that do not use the "go test" command.
func Benchmark(f func(b *B)) BenchmarkResult {
......
......@@ -5,7 +5,11 @@
package testing_test
import (
"bytes"
"runtime"
"sync/atomic"
"testing"
"text/template"
)
var roundDownTests = []struct {
......@@ -56,3 +60,53 @@ func TestRoundUp(t *testing.T) {
}
}
}
func TestRunParallel(t *testing.T) {
testing.Benchmark(func(b *testing.B) {
procs := uint32(0)
iters := uint64(0)
b.SetParallelism(3)
b.RunParallel(func(pb *testing.PB) {
atomic.AddUint32(&procs, 1)
for pb.Next() {
atomic.AddUint64(&iters, 1)
}
})
if want := uint32(3 * runtime.GOMAXPROCS(0)); procs != want {
t.Errorf("got %v procs, want %v", procs, want)
}
if iters != uint64(b.N) {
t.Errorf("got %v iters, want %v", iters, b.N)
}
})
}
func TestRunParallelFail(t *testing.T) {
testing.Benchmark(func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
// The function must be able to log/abort
// w/o crashing/deadlocking the whole benchmark.
b.Log("log")
b.Error("error")
b.Fatal("fatal")
})
})
}
func ExampleB_RunParallel() {
// Parallel benchmark for text/template.Template.Execute on a single object.
testing.Benchmark(func(b *testing.B) {
templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
// RunParallel will create GOMAXPROCS goroutines
// and distribute work among them.
b.RunParallel(func(pb *testing.PB) {
// Each goroutine has its own bytes.Buffer.
var buf bytes.Buffer
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
buf.Reset()
templ.Execute(&buf, "World")
}
})
})
}
......@@ -43,6 +43,7 @@
//
// If a benchmark needs some expensive setup before running, the timer
// may be reset:
//
// func BenchmarkBigLen(b *testing.B) {
// big := NewBig()
// b.ResetTimer()
......@@ -51,6 +52,21 @@
// }
// }
//
// If a benchmark needs to test performance in a parallel setting, it may use
// the RunParallel helper function; such benchmarks are intended to be used with
// the go test -cpu flag:
//
// func BenchmarkTemplateParallel(b *testing.B) {
// templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
// b.RunParallel(func(pb *testing.PB) {
// var buf bytes.Buffer
// for pb.Next() {
// buf.Reset()
// templ.Execute(&buf, "World")
// }
// })
// }
//
// Examples
//
// The package also runs and verifies example code. Example functions may
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment