Skip to content

Commit c49df9f

Browse files
committed
43
Signed-off-by: Zhigao TONG <tongzhigao@pingcap.com>
1 parent 060069a commit c49df9f

File tree

7 files changed

+147
-90
lines changed

7 files changed

+147
-90
lines changed

pkg/mvs/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ go_library(
3333
"//pkg/util/logutil",
3434
"//pkg/util/memory",
3535
"//pkg/util/sqlexec",
36+
"@com_github_prometheus_client_golang//prometheus",
3637
"@org_uber_go_zap//:zap",
3738
],
3839
)

pkg/mvs/consistenthash.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,10 @@ func NewConsistentHash(replicas int) *ConsistentHash {
3535
}
3636

3737
// Rebuild rebuilds the consistent hash with the given nodes.
38-
func (c *ConsistentHash) Rebuild(nodes []string) {
38+
func (c *ConsistentHash) Rebuild(nodes map[string]serverInfo) {
3939
c.ring = make([]virtualNode, 0, len(nodes)*c.replicas)
4040
c.data = make(map[string][]virtualNode, len(nodes))
41-
for _, node := range nodes {
42-
c.doInsert(node)
43-
}
44-
c.doResort()
45-
}
46-
47-
// RebuildFromMap rebuilds the consistent hash from a template map keyed by node.
48-
func RebuildFromMap[T any](c *ConsistentHash, nodeMap map[string]T) {
49-
c.ring = make([]virtualNode, 0, len(nodeMap)*c.replicas)
50-
c.data = make(map[string][]virtualNode, len(nodeMap))
51-
for node := range nodeMap {
41+
for node := range nodes {
5242
c.doInsert(node)
5343
}
5444
c.doResort()

pkg/mvs/impl.go

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ import (
1717
basic "github.com/pingcap/tidb/pkg/util"
1818
"github.com/pingcap/tidb/pkg/util/chunk"
1919
"github.com/pingcap/tidb/pkg/util/sqlexec"
20+
"github.com/prometheus/client_golang/prometheus"
2021
)
2122

2223
type serverHelper struct {
23-
durationObserverCache metricCache[mvMetricTypeResultKey, mvMetricObserver]
24-
runEventCounterCache metricCache[string, mvMetricCounter]
24+
durationObserverCache durationObserverCache
25+
runEventCounterCache runEventCounterCache
2526

2627
reportCache struct {
2728
submittedCount int64
@@ -37,56 +38,39 @@ type mvMetricTypeResultKey struct {
3738
result string
3839
}
3940

40-
type mvMetricObserver interface {
41-
Observe(float64)
42-
}
43-
44-
type mvMetricCounter interface {
45-
Inc()
46-
}
47-
48-
type metricCache[K comparable, V any] struct {
41+
type durationObserverCache struct {
4942
mu sync.RWMutex
50-
data map[K]V
43+
data map[mvMetricTypeResultKey]prometheus.Observer
5144
}
5245

53-
// newMetricCache creates a map-backed cache with an optional initial capacity.
54-
func newMetricCache[K comparable, V any](capacity int) metricCache[K, V] {
46+
func newDurationObserverCache(capacity int) durationObserverCache {
5547
if capacity < 0 {
5648
capacity = 0
5749
}
58-
return metricCache[K, V]{
59-
data: make(map[K]V, capacity),
50+
return durationObserverCache{
51+
data: make(map[mvMetricTypeResultKey]prometheus.Observer, capacity),
6052
}
6153
}
6254

63-
// getOrCreate returns the cached value for key, creating and caching it on demand.
64-
func (c *metricCache[K, V]) getOrCreate(key K, create func() V) V {
65-
c.mu.RLock()
66-
if value, ok := c.data[key]; ok {
67-
c.mu.RUnlock()
68-
return value
69-
}
70-
c.mu.RUnlock()
55+
type runEventCounterCache struct {
56+
mu sync.RWMutex
57+
data map[string]prometheus.Counter
58+
}
7159

72-
c.mu.Lock()
73-
defer c.mu.Unlock()
74-
if c.data == nil {
75-
c.data = make(map[K]V)
60+
func newRunEventCounterCache(capacity int) runEventCounterCache {
61+
if capacity < 0 {
62+
capacity = 0
7663
}
77-
if value, ok := c.data[key]; ok {
78-
return value
64+
return runEventCounterCache{
65+
data: make(map[string]prometheus.Counter, capacity),
7966
}
80-
value := create()
81-
c.data[key] = value
82-
return value
8367
}
8468

8569
// newServerHelper builds a default helper used by MVService.
8670
func newServerHelper() *serverHelper {
8771
return &serverHelper{
88-
durationObserverCache: newMetricCache[mvMetricTypeResultKey, mvMetricObserver](8),
89-
runEventCounterCache: newMetricCache[string, mvMetricCounter](16),
72+
durationObserverCache: newDurationObserverCache(8),
73+
runEventCounterCache: newRunEventCounterCache(16),
9074
}
9175
}
9276

pkg/mvs/metrics_reporter.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
tidbmetrics "github.com/pingcap/tidb/pkg/metrics"
21+
"github.com/prometheus/client_golang/prometheus"
2122
)
2223

2324
// reportCounterDelta reports only the positive delta since last flush.
@@ -72,22 +73,46 @@ func (h *serverHelper) observeRunEvent(eventType string) {
7273
}
7374

7475
// getDurationObserver returns a cached observer for (type, result) labels.
75-
func (h *serverHelper) getDurationObserver(metricType, result string) mvMetricObserver {
76+
func (h *serverHelper) getDurationObserver(metricType, result string) prometheus.Observer {
7677
if h == nil {
7778
return tidbmetrics.MVServiceMetaFetchDurationHistogramVec.WithLabelValues(metricType, result)
7879
}
7980
key := mvMetricTypeResultKey{typ: metricType, result: result}
80-
return h.durationObserverCache.getOrCreate(key, func() mvMetricObserver {
81-
return tidbmetrics.MVServiceMetaFetchDurationHistogramVec.WithLabelValues(metricType, result)
82-
})
81+
h.durationObserverCache.mu.RLock()
82+
if observer, ok := h.durationObserverCache.data[key]; ok {
83+
h.durationObserverCache.mu.RUnlock()
84+
return observer
85+
}
86+
h.durationObserverCache.mu.RUnlock()
87+
88+
h.durationObserverCache.mu.Lock()
89+
defer h.durationObserverCache.mu.Unlock()
90+
if observer, ok := h.durationObserverCache.data[key]; ok {
91+
return observer
92+
}
93+
observer := tidbmetrics.MVServiceMetaFetchDurationHistogramVec.WithLabelValues(metricType, result)
94+
h.durationObserverCache.data[key] = observer
95+
return observer
8396
}
8497

8598
// getRunEventCounter returns a cached counter for eventType label.
86-
func (h *serverHelper) getRunEventCounter(eventType string) mvMetricCounter {
99+
func (h *serverHelper) getRunEventCounter(eventType string) prometheus.Counter {
87100
if h == nil {
88101
return tidbmetrics.MVServiceRunEventCounterVec.WithLabelValues(eventType)
89102
}
90-
return h.runEventCounterCache.getOrCreate(eventType, func() mvMetricCounter {
91-
return tidbmetrics.MVServiceRunEventCounterVec.WithLabelValues(eventType)
92-
})
103+
h.runEventCounterCache.mu.RLock()
104+
if counter, ok := h.runEventCounterCache.data[eventType]; ok {
105+
h.runEventCounterCache.mu.RUnlock()
106+
return counter
107+
}
108+
h.runEventCounterCache.mu.RUnlock()
109+
110+
h.runEventCounterCache.mu.Lock()
111+
defer h.runEventCounterCache.mu.Unlock()
112+
if counter, ok := h.runEventCounterCache.data[eventType]; ok {
113+
return counter
114+
}
115+
counter := tidbmetrics.MVServiceRunEventCounterVec.WithLabelValues(eventType)
116+
h.runEventCounterCache.data[eventType] = counter
117+
return counter
93118
}

pkg/mvs/server_maintainer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (sch *ServerConsistentHash) refresh() error {
130130
sch.mu.Lock() // Guard server map and hash-ring rebuild.
131131

132132
sch.servers = newServerInfos
133-
RebuildFromMap(&sch.chash, sch.servers)
133+
sch.chash.Rebuild(sch.servers)
134134

135135
sch.mu.Unlock() // Release guard after rebuild.
136136
}

pkg/mvs/utils_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ func TestConsistentHash_RebuildResetsRing(t *testing.T) {
145145
t.Fatalf("expected non-empty node before rebuild")
146146
}
147147

148-
c.Rebuild([]string{"nodeC"})
148+
c.Rebuild(map[string]serverInfo{
149+
"nodeC": {ID: "nodeC"},
150+
})
149151

150152
if got := c.NodeCount(); got != 1 {
151153
t.Fatalf("expected 1 node after rebuild, got %d", got)
@@ -161,7 +163,7 @@ func TestConsistentHash_RebuildResetsRing(t *testing.T) {
161163
}
162164
}
163165

164-
func TestConsistentHash_RebuildFromMap(t *testing.T) {
166+
func TestConsistentHash_Rebuild(t *testing.T) {
165167
installMockTimeForTest(t)
166168
mapping := map[string]uint32{
167169
"nodeA#0": 10,
@@ -174,11 +176,10 @@ func TestConsistentHash_RebuildFromMap(t *testing.T) {
174176
c := NewConsistentHash(2)
175177
c.hashFunc = mustHash(mapping)
176178

177-
nodes := map[string]int{
178-
"nodeA": 1,
179-
"nodeB": 2,
180-
}
181-
RebuildFromMap(c, nodes)
179+
c.Rebuild(map[string]serverInfo{
180+
"nodeA": {ID: "nodeA"},
181+
"nodeB": {ID: "nodeB"},
182+
})
182183

183184
if got := c.NodeCount(); got != 2 {
184185
t.Fatalf("expected 2 nodes, got %d", got)

pkg/server/handler/mvhandler/mv_service_handler.go

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,28 @@ type settingsFieldUpdater func(form url.Values, settings *mvServiceRuntimeSettin
7474

7575
// mvServiceSettingsFieldUpdaters defines how each form field is parsed, validated, and applied.
7676
var mvServiceSettingsFieldUpdaters = []settingsFieldUpdater{
77-
newSettingsFieldUpdater(mvServiceTaskMaxConcurrencyFormField, strconv.Atoi, func(v int) bool { return v > 0 }, func(settings *mvServiceRuntimeSettings, v int) {
77+
newIntSettingsFieldUpdater(mvServiceTaskMaxConcurrencyFormField, func(v int) bool { return v > 0 }, func(settings *mvServiceRuntimeSettings, v int) {
7878
settings.maxConcurrency = v
7979
}),
80-
newSettingsFieldUpdater(mvServiceTaskTimeoutFormField, time.ParseDuration, func(v time.Duration) bool { return v >= 0 }, func(settings *mvServiceRuntimeSettings, v time.Duration) {
80+
newDurationSettingsFieldUpdater(mvServiceTaskTimeoutFormField, func(v time.Duration) bool { return v >= 0 }, func(settings *mvServiceRuntimeSettings, v time.Duration) {
8181
settings.timeout = v
8282
}),
83-
newSettingsFieldUpdater(mvServiceBackpressureEnabledFormField, strconv.ParseBool, nil, func(settings *mvServiceRuntimeSettings, v bool) {
83+
newBoolSettingsFieldUpdater(mvServiceBackpressureEnabledFormField, nil, func(settings *mvServiceRuntimeSettings, v bool) {
8484
settings.backpressureCfg.Enabled = v
8585
}),
86-
newSettingsFieldUpdater(mvServiceBackpressureCPUThresholdFormField, parseFloat64FieldValue, nil, func(settings *mvServiceRuntimeSettings, v float64) {
86+
newFloat64SettingsFieldUpdater(mvServiceBackpressureCPUThresholdFormField, nil, func(settings *mvServiceRuntimeSettings, v float64) {
8787
settings.backpressureCfg.CPUThreshold = v
8888
}),
89-
newSettingsFieldUpdater(mvServiceBackpressureMemThresholdFormField, parseFloat64FieldValue, nil, func(settings *mvServiceRuntimeSettings, v float64) {
89+
newFloat64SettingsFieldUpdater(mvServiceBackpressureMemThresholdFormField, nil, func(settings *mvServiceRuntimeSettings, v float64) {
9090
settings.backpressureCfg.MemThreshold = v
9191
}),
92-
newSettingsFieldUpdater(mvServiceBackpressureDelayFormField, time.ParseDuration, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) {
92+
newDurationSettingsFieldUpdater(mvServiceBackpressureDelayFormField, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) {
9393
settings.backpressureCfg.Delay = v
9494
}),
95-
newSettingsFieldUpdater(mvServiceTaskFailRetryBaseDelayFormField, time.ParseDuration, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) {
95+
newDurationSettingsFieldUpdater(mvServiceTaskFailRetryBaseDelayFormField, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) {
9696
settings.retryBase = v
9797
}),
98-
newSettingsFieldUpdater(mvServiceTaskFailRetryMaxDelayFormField, time.ParseDuration, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) {
98+
newDurationSettingsFieldUpdater(mvServiceTaskFailRetryMaxDelayFormField, nil, func(settings *mvServiceRuntimeSettings, v time.Duration) {
9999
settings.retryMax = v
100100
}),
101101
}
@@ -229,21 +229,42 @@ func parseMVServiceSettingsUpdateFromForm(form url.Values, current mvServiceRunt
229229
return updated, changed, nil
230230
}
231231

232-
// newSettingsFieldUpdater builds a reusable updater for one form field.
233-
func newSettingsFieldUpdater[T any](
232+
func newIntSettingsFieldUpdater(
234233
field string,
235-
parse func(string) (T, error),
236-
validate func(T) bool,
237-
assign func(settings *mvServiceRuntimeSettings, value T),
234+
validate func(int) bool,
235+
assign func(settings *mvServiceRuntimeSettings, value int),
238236
) settingsFieldUpdater {
239237
return func(form url.Values, settings *mvServiceRuntimeSettings) (changed bool, err error) {
240-
value, ok, err := parseOptionalFieldValue(form, field, parse)
238+
text, ok := parseOptionalFieldText(form, field)
239+
if !ok {
240+
return false, nil
241+
}
242+
value, err := strconv.Atoi(text)
241243
if err != nil {
242-
return false, err
244+
return false, newIllegalMVServiceSettingsFieldError(field)
245+
}
246+
if validate != nil && !validate(value) {
247+
return false, newIllegalMVServiceSettingsFieldError(field)
243248
}
249+
assign(settings, value)
250+
return true, nil
251+
}
252+
}
253+
254+
func newDurationSettingsFieldUpdater(
255+
field string,
256+
validate func(time.Duration) bool,
257+
assign func(settings *mvServiceRuntimeSettings, value time.Duration),
258+
) settingsFieldUpdater {
259+
return func(form url.Values, settings *mvServiceRuntimeSettings) (changed bool, err error) {
260+
text, ok := parseOptionalFieldText(form, field)
244261
if !ok {
245262
return false, nil
246263
}
264+
value, err := time.ParseDuration(text)
265+
if err != nil {
266+
return false, newIllegalMVServiceSettingsFieldError(field)
267+
}
247268
if validate != nil && !validate(value) {
248269
return false, newIllegalMVServiceSettingsFieldError(field)
249270
}
@@ -252,22 +273,57 @@ func newSettingsFieldUpdater[T any](
252273
}
253274
}
254275

255-
// parseOptionalFieldValue parses a field only when it is provided.
256-
func parseOptionalFieldValue[T any](form url.Values, field string, parse func(string) (T, error)) (value T, ok bool, err error) {
257-
text := form.Get(field)
258-
if text == "" {
259-
return value, false, nil
276+
func newBoolSettingsFieldUpdater(
277+
field string,
278+
validate func(bool) bool,
279+
assign func(settings *mvServiceRuntimeSettings, value bool),
280+
) settingsFieldUpdater {
281+
return func(form url.Values, settings *mvServiceRuntimeSettings) (changed bool, err error) {
282+
text, ok := parseOptionalFieldText(form, field)
283+
if !ok {
284+
return false, nil
285+
}
286+
value, err := strconv.ParseBool(text)
287+
if err != nil {
288+
return false, newIllegalMVServiceSettingsFieldError(field)
289+
}
290+
if validate != nil && !validate(value) {
291+
return false, newIllegalMVServiceSettingsFieldError(field)
292+
}
293+
assign(settings, value)
294+
return true, nil
260295
}
261-
value, err = parse(text)
262-
if err != nil {
263-
return value, false, newIllegalMVServiceSettingsFieldError(field)
296+
}
297+
298+
func newFloat64SettingsFieldUpdater(
299+
field string,
300+
validate func(float64) bool,
301+
assign func(settings *mvServiceRuntimeSettings, value float64),
302+
) settingsFieldUpdater {
303+
return func(form url.Values, settings *mvServiceRuntimeSettings) (changed bool, err error) {
304+
text, ok := parseOptionalFieldText(form, field)
305+
if !ok {
306+
return false, nil
307+
}
308+
value, err := strconv.ParseFloat(text, 64)
309+
if err != nil {
310+
return false, newIllegalMVServiceSettingsFieldError(field)
311+
}
312+
if validate != nil && !validate(value) {
313+
return false, newIllegalMVServiceSettingsFieldError(field)
314+
}
315+
assign(settings, value)
316+
return true, nil
264317
}
265-
return value, true, nil
266318
}
267319

268-
// parseFloat64FieldValue parses a float64 from text.
269-
func parseFloat64FieldValue(text string) (float64, error) {
270-
return strconv.ParseFloat(text, 64)
320+
// parseOptionalFieldText returns the field text only when it is provided.
321+
func parseOptionalFieldText(form url.Values, field string) (text string, ok bool) {
322+
text = form.Get(field)
323+
if text == "" {
324+
return "", false
325+
}
326+
return text, true
271327
}
272328

273329
// newIllegalMVServiceSettingsFieldError builds a uniform field validation error.

0 commit comments

Comments
 (0)