Skip to content

Commit db85799

Browse files
authored
telemetry: refactor annotator interface (#2526)
## Summary of Changes This refactors the Annotator interface of the flow enricher to make it less clumsy when adding annotators that require other external dependencies. Prior, external dependencies were passed as part of the Init method into all annotators whether they needed them or not, which then requires touching all Init method signatures when a single dependency is added. These dependencies are now moved into the annotator constructors themselves. ## Testing Verification Existing unit/integration tests should pass. There's no functional change that should be introduced.
1 parent f888ec0 commit db85799

File tree

4 files changed

+31
-23
lines changed

4 files changed

+31
-23
lines changed

telemetry/flow-enricher/cmd/flow-enricher/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,16 @@ func main() {
123123
rpcClient := rpc.NewWithRetries(networkConfig.LedgerPublicRPCURL, nil)
124124
serviceabilityClient := serviceability.New(rpcClient, networkConfig.ServiceabilityProgramID)
125125

126-
enricherOpts := []enricher.EnricherOption{
126+
e := enricher.NewEnricher(
127127
enricher.WithClickhouseWriter(chWriter),
128128
enricher.WithFlowConsumer(flowConsumer),
129129
enricher.WithLogger(logger),
130130
enricher.WithEnricherMetrics(enricher.NewEnricherMetrics(reg)),
131131
enricher.WithServiceabilityFetcher(serviceabilityClient),
132-
enricher.WithServiceabilityFetchInterval(10 * time.Second),
133-
}
134-
enricher := enricher.NewEnricher(enricherOpts...)
132+
enricher.WithServiceabilityFetchInterval(10*time.Second),
133+
)
134+
135+
e.AddAnnotator(enricher.NewServiceabilityAnnotator(e.ServiceabilityData))
135136

136137
// start prometheus
137138
go func() {
@@ -141,7 +142,7 @@ func main() {
141142
}()
142143

143144
logger.Info("starting enricher...")
144-
if err := enricher.Run(ctx); err != nil {
145+
if err := e.Run(ctx); err != nil {
145146
logger.Error("error while running enricher", "error", err)
146147
os.Exit(1)
147148
}

telemetry/flow-enricher/internal/flow-enricher/enricher.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
// records in protobuf format, enriches the flow with additional information from
44
// each annotator, and writes the flows as a batch to clickhouse.
55
//
6-
// Annotators must be registered in the RegisterAnnotators method of the enricher
7-
// and must implement the Annotator interface.
6+
// Annotators are added via the AddAnnotator method and
7+
// must implement the Annotator interface.
88
package enricher
99

1010
import (
@@ -106,6 +106,11 @@ func NewEnricher(opts ...EnricherOption) *Enricher {
106106
return e
107107
}
108108

109+
// AddAnnotator adds an annotator to the enricher after construction.
110+
func (e *Enricher) AddAnnotator(a Annotator) {
111+
e.annotators = append(e.annotators, a)
112+
}
113+
109114
// Run starts the enricher instance and begins processing flow records.
110115
func (e *Enricher) Run(ctx context.Context) error {
111116
if e.flowConsumer == nil {
@@ -128,8 +133,8 @@ func (e *Enricher) Run(ctx context.Context) error {
128133

129134
go e.fetchServiceabilityData(ctx)
130135

131-
// some annotators depends on serviceability data, so we need to register them after
132-
if err := e.RegisterAnnotators(ctx); err != nil {
136+
// initialize annotators before starting enrichment
137+
if err := e.initAnnotators(ctx); err != nil {
133138
return fmt.Errorf("error while initializing annotators: %v", err)
134139
}
135140

@@ -202,7 +207,9 @@ func (e *Enricher) fetchServiceabilityData(ctx context.Context) {
202207
}
203208
}
204209

205-
func (e *Enricher) serviceabilityData() serviceability.ProgramData {
210+
// ServiceabilityData returns a copy of the current serviceability program data.
211+
// This method is safe for concurrent use.
212+
func (e *Enricher) ServiceabilityData() serviceability.ProgramData {
206213
e.programDataMutex.Lock()
207214
defer e.programDataMutex.Unlock()
208215
if e.programData == nil {
@@ -211,21 +218,19 @@ func (e *Enricher) serviceabilityData() serviceability.ProgramData {
211218
return *e.programData
212219
}
213220

221+
// Annotator defines the interface for flow enrichment annotators.
222+
// Dependencies should be passed via the annotator's constructor.
214223
type Annotator interface {
215-
Init(context.Context, func() serviceability.ProgramData) error
224+
// Init initializes the annotator. Called once before any Annotate calls.
225+
Init(context.Context) error
216226
Annotate(*FlowSample) error
217227
String() string
218228
}
219229

220-
// RegisterAnnotators initializes a set of annotators for use during enrichment.
221-
// Annotators must implement the Annotator interface.
222-
func (e *Enricher) RegisterAnnotators(ctx context.Context) error {
223-
e.annotators = []Annotator{
224-
NewServiceabilityAnnotator(),
225-
}
226-
230+
// initAnnotators initializes all registered annotators.
231+
func (e *Enricher) initAnnotators(ctx context.Context) error {
227232
for _, a := range e.annotators {
228-
if err := a.Init(ctx, e.serviceabilityData); err != nil {
233+
if err := a.Init(ctx); err != nil {
229234
return fmt.Errorf("error initializing annotator %s: %v", a.String(), err)
230235
}
231236
}

telemetry/flow-enricher/internal/flow-enricher/enricher_integration_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ func TestFlowEnrichment(t *testing.T) {
246246
WithEnricherMetrics(NewEnricherMetrics(reg)),
247247
WithServiceabilityFetcher(mockServiceability),
248248
)
249+
enricher.AddAnnotator(NewServiceabilityAnnotator(enricher.ServiceabilityData))
250+
249251
go func() {
250252
if err := enricher.Run(ctx); err != nil {
251253
logger.Error("error during enrichment", "error", err)

telemetry/flow-enricher/internal/flow-enricher/serviceability.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ type ServiceabilityAnnotator struct {
2121
mu sync.RWMutex
2222
}
2323

24-
func NewServiceabilityAnnotator() *ServiceabilityAnnotator {
24+
func NewServiceabilityAnnotator(getProgramData func() serviceability.ProgramData) *ServiceabilityAnnotator {
2525
return &ServiceabilityAnnotator{
26-
name: "serviceability annotator",
26+
name: "serviceability annotator",
27+
getProgramData: getProgramData,
2728
}
2829
}
2930

30-
func (s *ServiceabilityAnnotator) Init(ctx context.Context, getProgramData func() serviceability.ProgramData) error {
31-
s.getProgramData = getProgramData
31+
func (s *ServiceabilityAnnotator) Init(ctx context.Context) error {
3232
s.updateServiceabilityCache()
3333

3434
go func() {

0 commit comments

Comments
 (0)