Skip to content

Commit a29e844

Browse files
authored
chore: ensure submission duration always has a reading within the 60 second window (#16)
* chore: add periodic refresh of last observed value * chore: adding some unit tests
1 parent 1330973 commit a29e844

File tree

3 files changed

+63
-3
lines changed

3 files changed

+63
-3
lines changed

pkg/exporters/verifier/verifier.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package verifier
22

33
import (
44
"context"
5+
"github.com/ethereum/go-ethereum/core/types"
56
"github.com/evstack/ev-metrics/internal/clients/celestia"
67
"github.com/evstack/ev-metrics/internal/clients/evm"
78
"github.com/evstack/ev-metrics/internal/clients/evnode"
89
"github.com/evstack/ev-metrics/pkg/metrics"
9-
"github.com/ethereum/go-ethereum/core/types"
1010
"github.com/rs/zerolog"
1111
"time"
1212
)
@@ -53,11 +53,18 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
5353
}
5454
defer sub.Unsubscribe()
5555

56+
// ticker to refresh submission duration metric every 10 seconds
57+
refreshTicker := time.NewTicker(10 * time.Second)
58+
defer refreshTicker.Stop()
59+
5660
for {
5761
select {
5862
case <-ctx.Done():
5963
e.logger.Info().Msg("stopping block verification")
6064
return nil
65+
case <-refreshTicker.C:
66+
// ensure that submission duration is always included in the 60 second window.
67+
m.RefreshSubmissionDuration()
6168
case header := <-headers:
6269
// record block arrival time for millisecond precision
6370
arrivalTime := time.Now()

pkg/metrics/metrics.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package metrics
33
import (
44
"fmt"
55
"sort"
6+
"strings"
67
"sync"
78
"time"
89

@@ -60,6 +61,9 @@ type Metrics struct {
6061
// internal tracking for block time calculation (uses arrival time for ms precision)
6162
lastBlockArrivalTime map[string]time.Time // key: chainID
6263

64+
// lastSubmissionDurations tracks the most recent submission durations.
65+
lastSubmissionDurations map[string]time.Duration // key: chainID:namespace
66+
6367
mu sync.Mutex
6468
ranges map[string][]*blockRange // key: blobType -> sorted slice of ranges
6569
}
@@ -257,8 +261,9 @@ func NewWithRegistry(namespace string, registerer prometheus.Registerer) *Metric
257261
},
258262
[]string{"chain_id", "endpoint", "error_type"},
259263
),
260-
ranges: make(map[string][]*blockRange),
261-
lastBlockArrivalTime: make(map[string]time.Time),
264+
ranges: make(map[string][]*blockRange),
265+
lastBlockArrivalTime: make(map[string]time.Time),
266+
lastSubmissionDurations: make(map[string]time.Duration),
262267
}
263268

264269
return m
@@ -490,7 +495,27 @@ func (m *Metrics) RecordBlockHeightDrift(chainID, targetEndpoint string, referen
490495

491496
// RecordSubmissionDuration records the da submission duration for a given submission type
492497
func (m *Metrics) RecordSubmissionDuration(chainID, submissionType string, duration time.Duration) {
498+
m.mu.Lock()
499+
defer m.mu.Unlock()
500+
493501
m.SubmissionDuration.WithLabelValues(chainID, submissionType).Observe(duration.Seconds())
502+
503+
key := fmt.Sprintf("%s:%s", chainID, submissionType)
504+
m.lastSubmissionDurations[key] = duration
505+
}
506+
507+
// RefreshSubmissionDuration re-observes the last known submission duration to keep the metric alive.
508+
func (m *Metrics) RefreshSubmissionDuration() {
509+
m.mu.Lock()
510+
defer m.mu.Unlock()
511+
512+
for key, duration := range m.lastSubmissionDurations {
513+
// assuming format "chainID:namespace"
514+
parts := strings.Split(key, ":")
515+
if len(parts) == 2 {
516+
m.SubmissionDuration.WithLabelValues(parts[0], parts[1]).Observe(duration.Seconds())
517+
}
518+
}
494519
}
495520

496521
// RecordBlockTime records the time between consecutive blocks using arrival time

pkg/metrics/metrics_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/prometheus/client_golang/prometheus"
78
"github.com/stretchr/testify/require"
@@ -535,6 +536,33 @@ func TestMetrics_ComplexScenario(t *testing.T) {
535536
}
536537
}
537538

539+
func TestMetrics_RecordSubmissionDuration(t *testing.T) {
540+
reg := prometheus.NewRegistry()
541+
m := NewWithRegistry("test", reg)
542+
543+
// record submission durations
544+
m.RecordSubmissionDuration("chain1", "header", 5*time.Second)
545+
// overwrite submission duration
546+
m.RecordSubmissionDuration("chain1", "header", 6*time.Second)
547+
m.RecordSubmissionDuration("chain1", "data", 10*time.Second)
548+
m.RecordSubmissionDuration("chain2", "header", 3*time.Second)
549+
550+
// verify stored in memory
551+
require.Equal(t, 6*time.Second, m.lastSubmissionDurations["chain1:header"], "last submission duration should be present")
552+
require.Equal(t, 10*time.Second, m.lastSubmissionDurations["chain1:data"])
553+
require.Equal(t, 3*time.Second, m.lastSubmissionDurations["chain2:header"])
554+
}
555+
556+
func TestMetrics_RefreshSubmissionDuration_Empty(t *testing.T) {
557+
reg := prometheus.NewRegistry()
558+
m := NewWithRegistry("test", reg)
559+
560+
// call refresh without any recorded values - should not panic
561+
require.NotPanics(t, func() {
562+
m.RefreshSubmissionDuration()
563+
})
564+
}
565+
538566
// helper types for table tests
539567
type blockToRecord struct {
540568
chain string

0 commit comments

Comments
 (0)