Commit 33563d1c authored by Austin Clements's avatar Austin Clements

internal/trace: support for mutator utilization distributions

This adds support for computing the quantiles of a mutator utilization
distribution.

Change-Id: Ia8b3ed14bf415c234e2f567360fd1b361d28bd40
Reviewed-on: https://go-review.googlesource.com/c/60799
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: 's avatarHyang-Ah Hana Kim <hyangah@gmail.com>
parent b2e8dd18
......@@ -273,6 +273,10 @@ func NewMMUCurve(utils [][]MutatorUtil) *MMUCurve {
return &MMUCurve{series}
}
// bandsPerSeries is the number of bands to divide each series into.
// This is only changed by tests.
var bandsPerSeries = 1000
func newMMUSeries(util []MutatorUtil) mmuSeries {
// Compute cumulative sum.
sums := make([]totalUtil, len(util))
......@@ -289,7 +293,7 @@ func newMMUSeries(util []MutatorUtil) mmuSeries {
// these bands.
//
// Compute the duration of each band.
numBands := 1000
numBands := bandsPerSeries
if numBands > len(util) {
// There's no point in having lots of bands if there
// aren't many events.
......@@ -393,8 +397,8 @@ func (h *utilHeap) Pop() interface{} {
return x
}
// An accumulator collects different MMU-related statistics depending
// on what's desired.
// An accumulator takes a windowed mutator utilization function and
// tracks various statistics for that function.
type accumulator struct {
mmu float64
......@@ -406,10 +410,30 @@ type accumulator struct {
// Worst N window tracking
nWorst int
wHeap utilHeap
}
// addMU records mutator utilization mu over the given window starting
// at time.
// Mutator utilization distribution tracking
mud *mud
// preciseMass is the distribution mass that must be precise
// before accumulation is stopped.
preciseMass float64
// lastTime and lastMU are the previous point added to the
// windowed mutator utilization function.
lastTime int64
lastMU float64
}
// resetTime declares a discontinuity in the windowed mutator
// utilization function by resetting the current time.
func (acc *accumulator) resetTime() {
// This only matters for distribution collection, since that's
// the only thing that depends on the progression of the
// windowed mutator utilization function.
acc.lastTime = math.MaxInt64
}
// addMU adds a point to the windowed mutator utilization function at
// (time, mu). This must be called for monotonically increasing values
// of time.
//
// It returns true if further calls to addMU would be pointless.
func (acc *accumulator) addMU(time int64, mu float64, window time.Duration) bool {
......@@ -458,6 +482,25 @@ func (acc *accumulator) addMU(time int64, mu float64, window time.Duration) bool
acc.bound = math.Max(acc.bound, acc.wHeap[0].MutatorUtil)
}
if acc.mud != nil {
if acc.lastTime != math.MaxInt64 {
// Update distribution.
acc.mud.add(acc.lastMU, mu, float64(time-acc.lastTime))
}
acc.lastTime, acc.lastMU = time, mu
if _, mudBound, ok := acc.mud.approxInvCumulativeSum(); ok {
acc.bound = math.Max(acc.bound, mudBound)
} else {
// We haven't accumulated enough total precise
// mass yet to even reach our goal, so keep
// accumulating.
acc.bound = 1
}
// It's not worth checking percentiles every time, so
// just keep accumulating this band.
return false
}
// If we've found enough 0 utilizations, we can stop immediately.
return len(acc.wHeap) == acc.nWorst && acc.wHeap[0].MutatorUtil == 0
}
......@@ -484,6 +527,85 @@ func (c *MMUCurve) Examples(window time.Duration, n int) (worst []UtilWindow) {
return ([]UtilWindow)(acc.wHeap)
}
// MUD returns mutator utilization distribution quantiles for the
// given window size.
//
// The mutator utilization distribution is the distribution of mean
// mutator utilization across all windows of the given window size in
// the trace.
//
// The minimum mutator utilization is the minimum (0th percentile) of
// this distribution. (However, if only the minimum is desired, it's
// more efficient to use the MMU method.)
func (c *MMUCurve) MUD(window time.Duration, quantiles []float64) []float64 {
if len(quantiles) == 0 {
return []float64{}
}
// Each unrefined band contributes a known total mass to the
// distribution (bandDur except at the end), but in an unknown
// way. However, we know that all the mass it contributes must
// be at or above its worst-case mean mutator utilization.
//
// Hence, we refine bands until the highest desired
// distribution quantile is less than the next worst-case mean
// mutator utilization. At this point, all further
// contributions to the distribution must be beyond the
// desired quantile and hence cannot affect it.
//
// First, find the highest desired distribution quantile.
maxQ := quantiles[0]
for _, q := range quantiles {
if q > maxQ {
maxQ = q
}
}
// The distribution's mass is in units of time (it's not
// normalized because this would make it more annoying to
// account for future contributions of unrefined bands). The
// total final mass will be the duration of the trace itself
// minus the window size. Using this, we can compute the mass
// corresponding to quantile maxQ.
var duration int64
for _, s := range c.series {
duration1 := s.util[len(s.util)-1].Time - s.util[0].Time
if duration1 >= int64(window) {
duration += duration1 - int64(window)
}
}
qMass := float64(duration) * maxQ
// Accumulate the MUD until we have precise information for
// everything to the left of qMass.
acc := accumulator{mmu: 1.0, bound: 1.0, preciseMass: qMass, mud: new(mud)}
acc.mud.setTrackMass(qMass)
c.mmu(window, &acc)
// Evaluate the quantiles on the accumulated MUD.
out := make([]float64, len(quantiles))
for i := range out {
mu, _ := acc.mud.invCumulativeSum(float64(duration) * quantiles[i])
if math.IsNaN(mu) {
// There are a few legitimate ways this can
// happen:
//
// 1. If the window is the full trace
// duration, then the windowed MU function is
// only defined at a single point, so the MU
// distribution is not well-defined.
//
// 2. If there are no events, then the MU
// distribution has no mass.
//
// Either way, all of the quantiles will have
// converged toward the MMU at this point.
mu = acc.mmu
}
out[i] = mu
}
return out
}
func (c *MMUCurve) mmu(window time.Duration, acc *accumulator) {
if window <= 0 {
acc.mmu = 0
......@@ -607,12 +729,16 @@ func (c *mmuSeries) bandMMU(bandIdx int, window time.Duration, acc *accumulator)
if utilEnd := util[len(util)-1].Time - int64(window); utilEnd < endTime {
endTime = utilEnd
}
acc.resetTime()
for {
// Advance edges to time and time+window.
mu := (right.advance(time+int64(window)) - left.advance(time)).mean(window)
if acc.addMU(time, mu, window) {
break
}
if time == endTime {
break
}
// The maximum slope of the windowed mutator
// utilization function is 1/window, so we can always
......@@ -632,7 +758,10 @@ func (c *mmuSeries) bandMMU(bandIdx int, window time.Duration, acc *accumulator)
time = minTime
}
if time >= endTime {
break
// For MMUs we could stop here, but for MUDs
// it's important that we span the entire
// band.
time = endTime
}
}
}
......
......@@ -75,7 +75,8 @@ func TestMMU(t *testing.T) {
}
func TestMMUTrace(t *testing.T) {
t.Parallel()
// Can't be t.Parallel() because it modifies the
// testingOneBand package variable.
p, err := New("../trace/testdata/stress_1_10_good")
if err != nil {
......@@ -96,6 +97,25 @@ func TestMMUTrace(t *testing.T) {
t.Errorf("want %f, got %f mutator utilization in window %s", want, got, window)
}
}
// Test MUD with band optimization against MUD without band
// optimization. We don't have a simple testing implementation
// of MUDs (the simplest implementation is still quite
// complex), but this is still a pretty good test.
defer func(old int) { bandsPerSeries = old }(bandsPerSeries)
bandsPerSeries = 1
mmuCurve2 := NewMMUCurve(mu)
quantiles := []float64{0, 1 - .999, 1 - .99}
for window := time.Microsecond; window < time.Second; window *= 10 {
mud1 := mmuCurve.MUD(window, quantiles)
mud2 := mmuCurve2.MUD(window, quantiles)
for i := range mud1 {
if !aeq(mud1[i], mud2[i]) {
t.Errorf("for quantiles %v at window %v, want %v, got %v", quantiles, window, mud2, mud1)
break
}
}
}
}
func BenchmarkMMU(b *testing.B) {
......
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package traceparser
import (
"math"
"sort"
)
// mud is an updatable mutator utilization distribution.
//
// This is a continuous distribution of duration over mutator
// utilization. For example, the integral from mutator utilization a
// to b is the total duration during which the mutator utilization was
// in the range [a, b].
//
// This distribution is *not* normalized (it is not a probability
// distribution). This makes it easier to work with as it's being
// updated.
//
// It is represented as the sum of scaled uniform distribution
// functions and Dirac delta functions (which are treated as
// degenerate uniform distributions).
type mud struct {
sorted, unsorted []edge
// trackMass is the inverse cumulative sum to track as the
// distribution is updated.
trackMass float64
// trackBucket is the bucket in which trackMass falls. If the
// total mass of the distribution is < trackMass, this is
// len(hist).
trackBucket int
// trackSum is the cumulative sum of hist[:trackBucket]. Once
// trackSum >= trackMass, trackBucket must be recomputed.
trackSum float64
// hist is a hierarchical histogram of distribution mass.
hist [mudDegree]float64
}
const (
// mudDegree is the number of buckets in the MUD summary
// histogram.
mudDegree = 1024
)
type edge struct {
// At x, the function increases by y.
x, delta float64
// Additionally at x is a Dirac delta function with area dirac.
dirac float64
}
// add adds a uniform function over [l, r] scaled so the total weight
// of the uniform is area. If l==r, this adds a Dirac delta function.
func (d *mud) add(l, r, area float64) {
if area == 0 {
return
}
if r < l {
l, r = r, l
}
// Add the edges.
if l == r {
d.unsorted = append(d.unsorted, edge{l, 0, area})
} else {
delta := area / (r - l)
d.unsorted = append(d.unsorted, edge{l, delta, 0}, edge{r, -delta, 0})
}
// Update the histogram.
h := &d.hist
lbFloat, lf := math.Modf(l * mudDegree)
lb := int(lbFloat)
if lb >= mudDegree {
lb, lf = mudDegree-1, 1
}
if l == r {
h[lb] += area
} else {
rbFloat, rf := math.Modf(r * mudDegree)
rb := int(rbFloat)
if rb >= mudDegree {
rb, rf = mudDegree-1, 1
}
if lb == rb {
h[lb] += area
} else {
perBucket := area / (r - l) / mudDegree
h[lb] += perBucket * (1 - lf)
h[rb] += perBucket * rf
for i := lb + 1; i < rb; i++ {
h[i] += perBucket
}
}
}
// Update mass tracking.
if thresh := float64(d.trackBucket) / mudDegree; l < thresh {
if r < thresh {
d.trackSum += area
} else {
d.trackSum += area * (thresh - l) / (r - l)
}
if d.trackSum >= d.trackMass {
// The tracked mass now falls in a different
// bucket. Recompute the inverse cumulative sum.
d.setTrackMass(d.trackMass)
}
}
}
// setTrackMass sets the mass to track the inverse cumulative sum for.
//
// Specifically, mass is a cumulative duration, and the mutator
// utilization bounds for this duration can be queried using
// approxInvCumulativeSum.
func (d *mud) setTrackMass(mass float64) {
d.trackMass = mass
// Find the bucket currently containing trackMass by computing
// the cumulative sum.
sum := 0.0
for i, val := range d.hist[:] {
newSum := sum + val
if newSum > mass {
// mass falls in bucket i.
d.trackBucket = i
d.trackSum = sum
return
}
sum = newSum
}
d.trackBucket = len(d.hist)
d.trackSum = sum
}
// approxInvCumulativeSum is like invCumulativeSum, but specifically
// operates on the tracked mass and returns an upper and lower bound
// approximation of the inverse cumulative sum.
//
// The true inverse cumulative sum will be in the range [lower, upper).
func (d *mud) approxInvCumulativeSum() (float64, float64, bool) {
if d.trackBucket == len(d.hist) {
return math.NaN(), math.NaN(), false
}
return float64(d.trackBucket) / mudDegree, float64(d.trackBucket+1) / mudDegree, true
}
// invCumulativeSum returns x such that the integral of d from -∞ to x
// is y. If the total weight of d is less than y, it returns the
// maximum of the distribution and false.
//
// Specifically, y is a cumulative duration, and invCumulativeSum
// returns the mutator utilization x such that at least y time has
// been spent with mutator utilization <= x.
func (d *mud) invCumulativeSum(y float64) (float64, bool) {
if len(d.sorted) == 0 && len(d.unsorted) == 0 {
return math.NaN(), false
}
// Sort edges.
edges := d.unsorted
sort.Slice(edges, func(i, j int) bool {
return edges[i].x < edges[j].x
})
// Merge with sorted edges.
d.unsorted = nil
if d.sorted == nil {
d.sorted = edges
} else {
oldSorted := d.sorted
newSorted := make([]edge, len(oldSorted)+len(edges))
i, j := 0, 0
for o := range newSorted {
if i >= len(oldSorted) {
copy(newSorted[o:], edges[j:])
break
} else if j >= len(edges) {
copy(newSorted[o:], oldSorted[i:])
break
} else if oldSorted[i].x < edges[j].x {
newSorted[o] = oldSorted[i]
i++
} else {
newSorted[o] = edges[j]
j++
}
}
d.sorted = newSorted
}
// Traverse edges in order computing a cumulative sum.
csum, rate, prevX := 0.0, 0.0, 0.0
for _, e := range d.sorted {
newCsum := csum + (e.x-prevX)*rate
if newCsum >= y {
// y was exceeded between the previous edge
// and this one.
if rate == 0 {
// Anywhere between prevX and
// e.x will do. We return e.x
// because that takes care of
// the y==0 case naturally.
return e.x, true
}
return (y-csum)/rate + prevX, true
}
newCsum += e.dirac
if newCsum >= y {
// y was exceeded by the Dirac delta at e.x.
return e.x, true
}
csum, prevX = newCsum, e.x
rate += e.delta
}
return prevX, false
}
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package traceparser
import (
"math/rand"
"testing"
)
func TestMUD(t *testing.T) {
// Insert random uniforms and check histogram mass and
// cumulative sum approximations.
rnd := rand.New(rand.NewSource(42))
mass := 0.0
var mud mud
for i := 0; i < 100; i++ {
area, l, r := rnd.Float64(), rnd.Float64(), rnd.Float64()
if rnd.Intn(10) == 0 {
r = l
}
t.Log(l, r, area)
mud.add(l, r, area)
mass += area
// Check total histogram weight.
hmass := 0.0
for _, val := range mud.hist {
hmass += val
}
if !aeq(mass, hmass) {
t.Fatalf("want mass %g, got %g", mass, hmass)
}
// Check inverse cumulative sum approximations.
for j := 0.0; j < mass; j += mass * 0.099 {
mud.setTrackMass(j)
l, u, ok := mud.approxInvCumulativeSum()
inv, ok2 := mud.invCumulativeSum(j)
if !ok || !ok2 {
t.Fatalf("inverse cumulative sum failed: approx %v, exact %v", ok, ok2)
}
if !(l <= inv && inv < u) {
t.Fatalf("inverse(%g) = %g, not ∈ [%g, %g)", j, inv, l, u)
}
}
}
}
func TestMUDTracking(t *testing.T) {
// Test that the tracked mass is tracked correctly across
// updates.
rnd := rand.New(rand.NewSource(42))
const uniforms = 100
for trackMass := 0.0; trackMass < uniforms; trackMass += uniforms / 50 {
var mud mud
mass := 0.0
mud.setTrackMass(trackMass)
for i := 0; i < uniforms; i++ {
area, l, r := rnd.Float64(), rnd.Float64(), rnd.Float64()
mud.add(l, r, area)
mass += area
l, u, ok := mud.approxInvCumulativeSum()
inv, ok2 := mud.invCumulativeSum(trackMass)
if mass < trackMass {
if ok {
t.Errorf("approx(%g) = [%g, %g), but mass = %g", trackMass, l, u, mass)
}
if ok2 {
t.Errorf("exact(%g) = %g, but mass = %g", trackMass, inv, mass)
}
} else {
if !ok {
t.Errorf("approx(%g) failed, but mass = %g", trackMass, mass)
}
if !ok2 {
t.Errorf("exact(%g) failed, but mass = %g", trackMass, mass)
}
if ok && ok2 && !(l <= inv && inv < u) {
t.Errorf("inverse(%g) = %g, not ∈ [%g, %g)", trackMass, inv, l, u)
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment