Skip to content

Commit e495324

Browse files
author
Angelo Marletta
committed
Shard aggregator lock
1 parent 00fb40e commit e495324

File tree

5 files changed

+199
-88
lines changed

5 files changed

+199
-88
lines changed

statsd/aggregator.go

Lines changed: 88 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,31 @@ type (
1414
bufferedMetricMap map[string]*bufferedMetric
1515
)
1616

17+
type countShard struct {
18+
sync.RWMutex
19+
counts countsMap
20+
}
21+
22+
type gaugeShard struct {
23+
sync.RWMutex
24+
gauges gaugesMap
25+
}
26+
27+
type setShard struct {
28+
sync.RWMutex
29+
sets setsMap
30+
}
31+
1732
type aggregator struct {
1833
nbContextGauge uint64
1934
nbContextCount uint64
2035
nbContextSet uint64
2136

22-
countsM sync.RWMutex
23-
gaugesM sync.RWMutex
24-
setsM sync.RWMutex
37+
shardsCount int
38+
countShards []*countShard
39+
gaugeShards []*gaugeShard
40+
setShards []*setShard
2541

26-
gauges gaugesMap
27-
counts countsMap
28-
sets setsMap
2942
histograms bufferedMetricContexts
3043
distributions bufferedMetricContexts
3144
timings bufferedMetricContexts
@@ -43,18 +56,25 @@ type aggregator struct {
4356
wg sync.WaitGroup
4457
}
4558

46-
func newAggregator(c *ClientEx, maxSamplesPerContext int64) *aggregator {
47-
return &aggregator{
59+
func newAggregator(c *ClientEx, maxSamplesPerContext int64, shardsCount int) *aggregator {
60+
agg := &aggregator{
4861
client: c,
49-
counts: countsMap{},
50-
gauges: gaugesMap{},
51-
sets: setsMap{},
62+
shardsCount: shardsCount,
63+
countShards: make([]*countShard, shardsCount),
64+
gaugeShards: make([]*gaugeShard, shardsCount),
65+
setShards: make([]*setShard, shardsCount),
5266
histograms: newBufferedContexts(newHistogramMetric, maxSamplesPerContext),
5367
distributions: newBufferedContexts(newDistributionMetric, maxSamplesPerContext),
5468
timings: newBufferedContexts(newTimingMetric, maxSamplesPerContext),
5569
closed: make(chan struct{}),
5670
stopChannelMode: make(chan struct{}),
5771
}
72+
for i := 0; i < shardsCount; i++ {
73+
agg.countShards[i] = &countShard{counts: countsMap{}}
74+
agg.gaugeShards[i] = &gaugeShard{gauges: gaugesMap{}}
75+
agg.setShards[i] = &setShard{sets: setsMap{}}
76+
}
77+
return agg
5878
}
5979

6080
func (a *aggregator) start(flushInterval time.Duration) {
@@ -135,40 +155,43 @@ func (a *aggregator) flushMetrics() []metric {
135155
// We reset the values to avoid sending 'zero' values for metrics not
136156
// sampled during this flush interval
137157

138-
a.setsM.Lock()
139-
sets := a.sets
140-
a.sets = setsMap{}
141-
a.setsM.Unlock()
142-
143-
for _, s := range sets {
144-
metrics = append(metrics, s.flushUnsafe()...)
158+
for _, shard := range a.setShards {
159+
shard.Lock()
160+
sets := shard.sets
161+
shard.sets = setsMap{}
162+
shard.Unlock()
163+
for _, s := range sets {
164+
metrics = append(metrics, s.flushUnsafe()...)
165+
}
166+
atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
145167
}
146168

147-
a.gaugesM.Lock()
148-
gauges := a.gauges
149-
a.gauges = gaugesMap{}
150-
a.gaugesM.Unlock()
151-
152-
for _, g := range gauges {
153-
metrics = append(metrics, g.flushUnsafe())
169+
for _, shard := range a.gaugeShards {
170+
shard.Lock()
171+
gauges := shard.gauges
172+
shard.gauges = gaugesMap{}
173+
shard.Unlock()
174+
for _, g := range gauges {
175+
metrics = append(metrics, g.flushUnsafe())
176+
}
177+
atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
154178
}
155179

156-
a.countsM.Lock()
157-
counts := a.counts
158-
a.counts = countsMap{}
159-
a.countsM.Unlock()
160-
161-
for _, c := range counts {
162-
metrics = append(metrics, c.flushUnsafe())
180+
for _, shard := range a.countShards {
181+
shard.Lock()
182+
counts := shard.counts
183+
shard.counts = countsMap{}
184+
shard.Unlock()
185+
for _, c := range counts {
186+
metrics = append(metrics, c.flushUnsafe())
187+
}
188+
atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
163189
}
164190

165191
metrics = a.histograms.flush(metrics)
166192
metrics = a.distributions.flush(metrics)
167193
metrics = a.timings.flush(metrics)
168194

169-
atomic.AddUint64(&a.nbContextCount, uint64(len(counts)))
170-
atomic.AddUint64(&a.nbContextGauge, uint64(len(gauges)))
171-
atomic.AddUint64(&a.nbContextSet, uint64(len(sets)))
172195
return metrics
173196
}
174197

@@ -226,72 +249,75 @@ func getContextAndTags(name string, tags []string, cardinality Cardinality) (str
226249
func (a *aggregator) count(name string, value int64, tags []string, cardinality Cardinality) error {
227250
resolvedCardinality := resolveCardinality(cardinality)
228251
context := getContext(name, tags, resolvedCardinality)
229-
a.countsM.RLock()
230-
if count, found := a.counts[context]; found {
252+
shard := a.countShards[hashString32(context)%uint32(a.shardsCount)]
253+
shard.RLock()
254+
if count, found := shard.counts[context]; found {
231255
count.sample(value)
232-
a.countsM.RUnlock()
256+
shard.RUnlock()
233257
return nil
234258
}
235-
a.countsM.RUnlock()
259+
shard.RUnlock()
236260

237-
a.countsM.Lock()
261+
shard.Lock()
238262
// Check if another goroutines hasn't created the value betwen the RUnlock and 'Lock'
239-
if count, found := a.counts[context]; found {
263+
if count, found := shard.counts[context]; found {
240264
count.sample(value)
241-
a.countsM.Unlock()
265+
shard.Unlock()
242266
return nil
243267
}
244268

245-
a.counts[context] = newCountMetric(name, value, tags, resolvedCardinality)
246-
a.countsM.Unlock()
269+
shard.counts[context] = newCountMetric(name, value, tags, resolvedCardinality)
270+
shard.Unlock()
247271
return nil
248272
}
249273

250274
func (a *aggregator) gauge(name string, value float64, tags []string, cardinality Cardinality) error {
251275
resolvedCardinality := resolveCardinality(cardinality)
252276
context := getContext(name, tags, resolvedCardinality)
253-
a.gaugesM.RLock()
254-
if gauge, found := a.gauges[context]; found {
277+
shard := a.gaugeShards[hashString32(context)%uint32(a.shardsCount)]
278+
shard.RLock()
279+
if gauge, found := shard.gauges[context]; found {
255280
gauge.sample(value)
256-
a.gaugesM.RUnlock()
281+
shard.RUnlock()
257282
return nil
258283
}
259-
a.gaugesM.RUnlock()
284+
shard.RUnlock()
260285

261286
gauge := newGaugeMetric(name, value, tags, resolvedCardinality)
262287

263-
a.gaugesM.Lock()
288+
shard.Lock()
264289
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
265-
if gauge, found := a.gauges[context]; found {
290+
if gauge, found := shard.gauges[context]; found {
266291
gauge.sample(value)
267-
a.gaugesM.Unlock()
292+
shard.Unlock()
268293
return nil
269294
}
270-
a.gauges[context] = gauge
271-
a.gaugesM.Unlock()
295+
shard.gauges[context] = gauge
296+
shard.Unlock()
272297
return nil
273298
}
274299

275300
func (a *aggregator) set(name string, value string, tags []string, cardinality Cardinality) error {
276301
resolvedCardinality := resolveCardinality(cardinality)
277302
context := getContext(name, tags, resolvedCardinality)
278-
a.setsM.RLock()
279-
if set, found := a.sets[context]; found {
303+
shard := a.setShards[hashString32(context)%uint32(a.shardsCount)]
304+
shard.RLock()
305+
if set, found := shard.sets[context]; found {
280306
set.sample(value)
281-
a.setsM.RUnlock()
307+
shard.RUnlock()
282308
return nil
283309
}
284-
a.setsM.RUnlock()
310+
shard.RUnlock()
285311

286-
a.setsM.Lock()
312+
shard.Lock()
287313
// Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock'
288-
if set, found := a.sets[context]; found {
314+
if set, found := shard.sets[context]; found {
289315
set.sample(value)
290-
a.setsM.Unlock()
316+
shard.Unlock()
291317
return nil
292318
}
293-
a.sets[context] = newSetMetric(name, value, tags, resolvedCardinality)
294-
a.setsM.Unlock()
319+
shard.sets[context] = newSetMetric(name, value, tags, resolvedCardinality)
320+
shard.Unlock()
295321
return nil
296322
}
297323

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package statsd
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
)
7+
8+
func BenchmarkAggregatorSharding(b *testing.B) {
9+
shardCounts := []int{1, 8, 16, 32, 64}
10+
11+
for _, shards := range shardCounts {
12+
b.Run(fmt.Sprintf("Shards_%d", shards), func(b *testing.B) {
13+
a := newAggregator(nil, 0, shards)
14+
tags := []string{"tag:1", "tag:2"}
15+
16+
b.ResetTimer()
17+
b.RunParallel(func(pb *testing.PB) {
18+
i := 0
19+
for pb.Next() {
20+
i++
21+
name := fmt.Sprintf("metric.%d", i%100000)
22+
a.count(name, 1, tags, CardinalityLow)
23+
a.gauge(name, 10.0, tags, CardinalityLow)
24+
a.set(name, "val", tags, CardinalityLow)
25+
}
26+
})
27+
})
28+
}
29+
}

0 commit comments

Comments
 (0)