Skip to content

Commit 43e191d

Browse files
authored
Merge pull request #24 from zerodha/perf
Add several performance optimisations and fixes to the control flow.
2 parents 3e082b8 + e5c601e commit 43e191d

File tree

9 files changed

+81
-81
lines changed

9 files changed

+81
-81
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (f *TestFilter) IsAllowed(msg []byte) bool {
6666
* Copy this plugin code to a directory. `mkdir testfilter && cp sample.go testfilter`
6767
* Build the plugin. `CGO_ENABLED=1 go build -a -ldflags="-s -w" -buildmode=plugin -o testfilter.filter sample.go`
6868
* Change the config.toml to add the filter provider config.
69-
* Run kaf-relay with the filter pluing. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`
69+
* Run kaf-relay with the filter plugin. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`
7070

7171
## Metrics
7272

config.sample.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ source_topic2 = "target_topic2"
1515
[source_pool]
1616
# Kafka client config common to all upstream sources ([[sources]]).
1717
initial_offset = "start"
18+
# Static memmbership to pin the member for the consumer group for respawn / reconnect and fence other members from connecting using the same id.
1819
instance_id = "client_instance_id"
20+
# Consumer group id.
1921
group_id = "consumer_group"
2022

2123
# Frequency at which source servers are polled for health/lag.

go.mod

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ require (
88
github.com/knadh/koanf/providers/posflag v0.1.0
99
github.com/knadh/koanf/v2 v2.0.1
1010
github.com/spf13/pflag v1.0.5
11-
github.com/twmb/franz-go v1.15.4
12-
github.com/twmb/franz-go/pkg/kmsg v1.7.0
11+
github.com/twmb/franz-go v1.17.0
12+
github.com/twmb/franz-go/pkg/kmsg v1.8.0
1313
)
1414

1515
require (
@@ -19,15 +19,15 @@ require (
1919
github.com/mitchellh/mapstructure v1.5.0 // indirect
2020
github.com/mitchellh/reflectwalk v1.0.2 // indirect
2121
github.com/pelletier/go-toml v1.9.5 // indirect
22-
github.com/pierrec/lz4/v4 v4.1.19 // indirect
22+
github.com/pierrec/lz4/v4 v4.1.21 // indirect
2323
github.com/valyala/fastrand v1.1.0 // indirect
2424
github.com/valyala/histogram v1.2.0 // indirect
25-
golang.org/x/sys v0.15.0 // indirect
25+
golang.org/x/sys v0.20.0 // indirect
2626
)
2727

2828
require (
2929
github.com/VictoriaMetrics/metrics v1.23.1
30-
github.com/klauspost/compress v1.16.7 // indirect
30+
github.com/klauspost/compress v1.17.8 // indirect
3131
github.com/twmb/franz-go/pkg/kadm v1.8.1
32-
golang.org/x/crypto v0.17.0 // indirect
32+
golang.org/x/crypto v0.23.0 // indirect
3333
)

go.sum

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
44
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
55
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
66
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
7-
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
8-
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
7+
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
8+
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
99
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
1010
github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
1111
github.com/knadh/koanf/parsers/toml v0.1.0 h1:S2hLqS4TgWZYj4/7mI5m1CQQcWurxUz6ODgOub/6LCI=
@@ -24,28 +24,28 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx
2424
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
2525
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
2626
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
27-
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
28-
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
27+
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
28+
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
2929
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3030
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3131
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
3232
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
3333
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
3434
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
35-
github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw=
36-
github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU=
35+
github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
36+
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
3737
github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE=
3838
github.com/twmb/franz-go/pkg/kadm v1.8.1/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8=
39-
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
40-
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
39+
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
40+
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
4141
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
4242
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
4343
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
4444
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
45-
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
46-
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
45+
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
46+
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
4747
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
48-
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
49-
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
48+
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
49+
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
5050
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
5151
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/relay/common.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,14 @@ func retryBackoff(min, max time.Duration) func(int) time.Duration {
228228

229229
// waitTries waits for the timer to hit for the deadline with the backoff duration.
230230
func waitTries(ctx context.Context, waitDuration time.Duration) {
231+
if waitDuration == 0 {
232+
return
233+
}
234+
231235
deadline := time.Now().Add(waitDuration)
232236
after := time.NewTimer(time.Until(deadline))
237+
defer after.Stop()
238+
233239
select {
234240
case <-ctx.Done():
235241
return

internal/relay/relay.go

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"log/slog"
66
"sync"
7+
"time"
78

89
"github.com/twmb/franz-go/pkg/kadm"
910
"github.com/twmb/franz-go/pkg/kgo"
@@ -70,7 +71,6 @@ func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filt
7071
// Start starts the consumer loop on kafka (A), fetch messages and relays over to kafka (B) using an async
7172
func (re *Relay) Start(globalCtx context.Context) error {
7273
wg := &sync.WaitGroup{}
73-
defer wg.Wait()
7474

7575
// Derive a cancellable context from the global context (which captures kill signals) to use
7676
// for subsequent connections/health tracking/retries etc.
@@ -95,41 +95,44 @@ func (re *Relay) Start(globalCtx context.Context) error {
9595
re.log.Info("starting producer worker")
9696
go func() {
9797
defer wg.Done()
98-
if err := re.target.Start(ctx); err != nil {
98+
if err := re.target.Start(); err != nil {
9999
re.log.Error("error starting producer worker", "err", err)
100100
}
101-
102-
if ctx.Err() != context.Canceled {
103-
cancel()
104-
}
105101
}()
106102

107103
// Start the consumer group worker by trigger a signal to the relay loop to fetch
108104
// a consumer worker to fetch initial healthy node.
109105
re.log.Info("starting consumer worker")
110106
re.signalCh <- struct{}{}
111107

108+
wg.Add(1)
109+
// Relay teardown.
110+
go func() {
111+
defer wg.Done()
112+
// Wait till main ctx is cancelled.
113+
<-globalCtx.Done()
114+
115+
// Stop consumer group.
116+
re.source.Close()
117+
}()
118+
112119
// Start the indefinite poll that asks for new connections
113120
// and then consumes messages from them.
114121
if err := re.startPoll(ctx); err != nil {
115122
re.log.Error("error starting consumer worker", "err", err)
116123
}
117124

118-
// Close the target/producer on exit.
119-
re.target.CloseBatchCh()
125+
// Close the producer inlet channel.
126+
close(re.target.inletCh)
127+
128+
// Close producer.
129+
re.target.Close()
120130

121131
wg.Wait()
122132

123133
return nil
124134
}
125135

126-
// Close close the underlying kgo.Client(s)
127-
func (re *Relay) Close() {
128-
re.log.Debug("closing relay consumer, producer...")
129-
re.source.Close()
130-
re.target.Close()
131-
}
132-
133136
// startPoll starts the consumer worker which polls the kafka cluster for messages.
134137
func (re *Relay) startPoll(ctx context.Context) error {
135138
var (
@@ -246,21 +249,28 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
246249
return nil
247250
}
248251

249-
msg := &kgo.Record{
250-
Key: rec.Key,
251-
Value: rec.Value,
252-
Topic: t.TargetTopic,
253-
Partition: rec.Partition,
252+
// Repurpose &kgo.Record and forward it to producer to reduce allocs.
253+
rec.Headers = nil
254+
rec.Timestamp = time.Time{}
255+
rec.Topic = t.TargetTopic
256+
if !t.AutoTargetPartition {
257+
rec.Partition = int32(t.TargetPartition)
254258
}
259+
rec.Attrs = kgo.RecordAttrs{}
260+
rec.ProducerEpoch = 0
261+
rec.ProducerID = 0
262+
rec.LeaderEpoch = 0
263+
rec.Offset = 0
264+
rec.Context = nil
255265

256266
// Queue the message for writing to target.
257267
select {
258268
case <-ctx.Done():
259269
return ctx.Err()
260-
case re.target.GetBatchCh() <- msg:
270+
case re.target.GetBatchCh() <- rec:
261271
default:
262272
re.log.Error("target inlet channel blocked")
263-
re.target.GetBatchCh() <- msg
273+
re.target.GetBatchCh() <- rec
264274
}
265275

266276
return nil

internal/relay/source_pool.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) {
138138
// indefinitely long to return based on the config.
139139
func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
140140
retries := 0
141+
loop:
141142
for {
142143
select {
143144
case <-globalCtx.Done():
@@ -155,6 +156,7 @@ func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
155156
retries++
156157
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
157158
waitTries(globalCtx, sp.backoffFn(retries))
159+
continue loop
158160
}
159161

160162
// Cache the current live connection internally.
@@ -201,18 +203,19 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
201203
// RecordOffsets records the offsets of the latest fetched records per topic.
202204
// This is used to resume consumption on new connections/reconnections from the source during runtime.
203205
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
204-
oMap := make(map[int32]kgo.Offset)
205-
oMap[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
206-
if sp.offsets != nil {
207-
if o, ok := sp.offsets[rec.Topic]; ok {
208-
o[rec.Partition] = oMap[rec.Partition]
209-
sp.offsets[rec.Topic] = o
210-
} else {
211-
sp.offsets[rec.Topic] = oMap
212-
}
213-
} else {
206+
if sp.offsets == nil {
214207
sp.offsets = make(map[string]map[int32]kgo.Offset)
215-
sp.offsets[rec.Topic] = oMap
208+
}
209+
210+
if o, ok := sp.offsets[rec.Topic]; ok {
211+
// If the topic already exists, update the offset for the partition.
212+
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
213+
sp.offsets[rec.Topic] = o
214+
} else {
215+
// If the topic does not exist, create a new map for the topic.
216+
o := make(map[int32]kgo.Offset)
217+
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
218+
sp.offsets[rec.Topic] = o
216219
}
217220
}
218221

internal/relay/target.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ type Target struct {
2828
metrics *metrics.Set
2929
log *slog.Logger
3030

31-
// Map of optional destination topic partitions.
32-
topicPartitions map[string]int32
33-
3431
// Map of target topics and their config.
3532
targetTopics Topics
3633

@@ -66,34 +63,24 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic
6663
return p, nil
6764
}
6865

69-
// Close closes the kafka client.
66+
// Close remove the producer topics from &kgo.Client.
7067
func (tg *Target) Close() {
7168
if tg.client != nil {
7269
// prevent blocking on close
7370
tg.client.PurgeTopicsFromProducing()
7471
}
7572
}
7673

77-
// CloseBatchCh closes the Producer batch channel.
78-
func (tg *Target) CloseBatchCh() {
79-
close(tg.inletCh)
80-
}
81-
8274
// GetBatchCh returns the Producer batch channel.
8375
func (tg *Target) GetBatchCh() chan *kgo.Record {
8476
return tg.inletCh
8577
}
8678

87-
// prepareRecord checks if custom topic partition mapping is defined.
88-
// If required, it updates the records partition
89-
func (tg *Target) prepareRecord(rec *kgo.Record) {
90-
if part, ok := tg.topicPartitions[rec.Topic]; ok {
91-
rec.Partition = part
92-
}
93-
}
94-
9579
// Start starts the blocking producer which flushes messages to the target Kafka.
96-
func (tg *Target) Start(ctx context.Context) error {
80+
func (tg *Target) Start() error {
81+
ctx, cancel := context.WithCancel(context.Background())
82+
defer cancel()
83+
9784
tick := time.NewTicker(tg.pCfg.FlushFrequency)
9885
defer tick.Stop()
9986

@@ -106,13 +93,6 @@ func (tg *Target) Start(ctx context.Context) error {
10693

10794
for {
10895
select {
109-
case <-ctx.Done():
110-
if err := tg.drain(); err != nil {
111-
return err
112-
}
113-
114-
return ctx.Err()
115-
11696
// Queue the message to and flush if the batch size is reached.
11797
case msg, ok := <-tg.inletCh:
11898
if !ok {
@@ -124,7 +104,6 @@ func (tg *Target) Start(ctx context.Context) error {
124104
return nil
125105
}
126106

127-
tg.prepareRecord(msg)
128107
tg.batch = append(tg.batch, msg)
129108
if len(tg.batch) >= tg.pCfg.FlushBatchSize {
130109
if err := tg.flush(ctx); err != nil {
@@ -250,7 +229,6 @@ outerLoop:
250229
func (tg *Target) drain() error {
251230
now := time.Now()
252231
for rec := range tg.inletCh {
253-
tg.prepareRecord(rec)
254232
tg.batch = append(tg.batch, rec)
255233
}
256234

main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ func main() {
8686
log.Fatalf("error starting relay controller: %v", err)
8787
}
8888

89-
metrSrv.Shutdown(globalCtx)
90-
relay.Close()
89+
if metrSrv != nil {
90+
metrSrv.Shutdown(globalCtx)
91+
}
9192
lo.Info("bye")
9293
}

0 commit comments

Comments
 (0)