Skip to content

Commit 882bcf4

Browse files
authored
chore: add flag for worker pool (#21)
1 parent 771fb9e commit 882bcf4

File tree

3 files changed

+51
-3
lines changed

3 files changed

+51
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ Metrics will be available at `http://localhost:2112/metrics`
7575
- `--balance.addresses`: Comma-separated Celestia addresses to monitor (enables balance checking)
7676
- `--balance.consensus-rpc-urls`: Comma-separated consensus RPC URLs for balance queries (required if balance.addresses is set)
7777
- `--balance.scrape-interval`: Balance check scrape interval in seconds (default: 30)
78+
- `--verifier.workers`: Number of concurrent workers for block verification (default: 50)
7879
- `--verbose`: Enable verbose logging (default: false)
7980

8081
### Example with Custom Endpoints

cmd/monitor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
flagBalanceAddresses = "balance.addresses"
4343
flagBalanceRpcUrls = "balance.consensus-rpc-urls"
4444
flagBalanceScrapeInterval = "balance.scrape-interval"
45+
flagVerifierWorkers = "verifier.workers"
4546

4647
metricsPath = "/metrics"
4748
)
@@ -67,6 +68,7 @@ type flagValues struct {
6768
balanceAddresses string
6869
balanceRpcUrls string
6970
balanceScrapeInterval int
71+
verifierWorkers int
7072
}
7173

7274
func NewMonitorCmd() *cobra.Command {
@@ -99,6 +101,7 @@ func NewMonitorCmd() *cobra.Command {
99101
cmd.Flags().StringVar(&flags.balanceAddresses, flagBalanceAddresses, "", "Comma-separated celestia addresses to monitor (enables balance checking)")
100102
cmd.Flags().StringVar(&flags.balanceRpcUrls, flagBalanceRpcUrls, "", "Comma-separated consensus rpc urls for balance queries (required if balance.addresses is set)")
101103
cmd.Flags().IntVar(&flags.balanceScrapeInterval, flagBalanceScrapeInterval, 30, "Balance check scrape interval in seconds (default: 30)")
104+
cmd.Flags().IntVar(&flags.verifierWorkers, flagVerifierWorkers, 50, "Number of concurrent workers for block verification (default: 50)")
102105

103106
if err := cmd.MarkFlagRequired(flagHeaderNS); err != nil {
104107
panic(err)
@@ -204,6 +207,7 @@ func monitorAndExportMetrics(_ *cobra.Command, _ []string) error {
204207
cfg.HeaderNS,
205208
cfg.DataNS,
206209
flags.chainID,
210+
flags.verifierWorkers,
207211
logger,
208212
),
209213
}

pkg/exporters/verifier/verifier.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package verifier
22

33
import (
44
"context"
5+
"sync"
6+
"time"
7+
58
"github.com/ethereum/go-ethereum/core/types"
69
"github.com/evstack/ev-metrics/internal/clients/celestia"
710
"github.com/evstack/ev-metrics/internal/clients/evm"
811
"github.com/evstack/ev-metrics/internal/clients/evnode"
912
"github.com/evstack/ev-metrics/pkg/metrics"
1013
"github.com/rs/zerolog"
11-
"time"
1214
)
1315

1416
var _ metrics.Exporter = &exporter{}
@@ -20,6 +22,7 @@ func NewMetricsExporter(
2022
evmClient *evm.Client,
2123
headerNS, dataNS []byte,
2224
chainID string,
25+
workers int,
2326
logger zerolog.Logger,
2427
) metrics.Exporter {
2528
return &exporter{
@@ -29,6 +32,7 @@ func NewMetricsExporter(
2932
headerNS: headerNS,
3033
dataNS: dataNS,
3134
chainID: chainID,
35+
workers: workers,
3236
logger: logger.With().Str("component", "verification_monitor").Logger(),
3337
}
3438
}
@@ -41,6 +45,7 @@ type exporter struct {
4145
headerNS []byte
4246
dataNS []byte
4347
chainID string
48+
workers int
4449
logger zerolog.Logger
4550
}
4651

@@ -53,14 +58,33 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
5358
}
5459
defer sub.Unsubscribe()
5560

61+
// create buffered channel for block queue
62+
blockQueue := make(chan *types.Header, e.workers*2)
63+
64+
// start work pool
65+
var workerGroup sync.WaitGroup
66+
for i := 0; i < e.workers; i++ {
67+
workerGroup.Add(1)
68+
workerID := i
69+
go func() {
70+
defer workerGroup.Done()
71+
e.processBlocks(ctx, m, workerID, blockQueue)
72+
}()
73+
}
74+
75+
e.logger.Info().Int("workers", e.workers).Msg("started verification work pool")
76+
5677
// ticker to refresh submission duration metric every 10 seconds
5778
refreshTicker := time.NewTicker(10 * time.Second)
5879
defer refreshTicker.Stop()
5980

81+
// main subscription loop
6082
for {
6183
select {
6284
case <-ctx.Done():
6385
e.logger.Info().Msg("stopping block verification")
86+
close(blockQueue)
87+
workerGroup.Wait()
6488
return nil
6589
case <-refreshTicker.C:
6690
// ensure that submission duration is always included in the 60 second window.
@@ -75,12 +99,31 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
7599
Time("arrival_time", arrivalTime).
76100
Msg("received block header from subscription")
77101

78-
// spawn a goroutine to handle this block's retries
79-
go e.verifyBlock(ctx, m, header)
102+
// send block to work pool, blocking until space is available
103+
select {
104+
case blockQueue <- header:
105+
// block queued successfully
106+
case <-ctx.Done():
107+
close(blockQueue)
108+
workerGroup.Wait()
109+
return nil
110+
}
80111
}
81112
}
82113
}
83114

115+
// processBlocks processes blocks from the queue
116+
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue <-chan *types.Header) {
117+
logger := e.logger.With().Int("worker_id", workerID).Logger()
118+
logger.Debug().Msg("worker started")
119+
120+
for header := range blockQueue {
121+
e.verifyBlock(ctx, m, header)
122+
}
123+
124+
logger.Debug().Msg("worker stopped")
125+
}
126+
84127
func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, daHeight uint64, verified bool, submissionDuration time.Duration) {
85128
if verified {
86129
m.RecordSubmissionDaHeight(e.chainID, namespace, daHeight)

0 commit comments

Comments
 (0)