@@ -3,6 +3,7 @@ package relay
33import (
44 "context"
55 "log/slog"
6+ "strconv"
67 "sync"
78 "time"
89
@@ -22,7 +23,7 @@ type Relay struct {
2223 target * Target
2324 log * slog.Logger
2425
25- topics Topics
26+ topic Topic
2627
2728 // signalCh is used to signal when the relay poll loop should look for a new healthy server.
2829 signalCh chan struct {}
@@ -33,21 +34,21 @@ type Relay struct {
3334 targetOffsets TopicOffsets
3435
3536 // Live topic offsets from source.
36- srcOffsets map [string ] map [ int32 ]int64
37+ srcOffsets map [int32 ]int64
3738
3839 // list of filter implementations for skipping messages
3940 filters map [string ]filter.Provider
4041}
4142
42- func NewRelay (cfg RelayCfg , src * SourcePool , target * Target , topics Topics , filters map [string ]filter.Provider , log * slog.Logger ) (* Relay , error ) {
43+ func NewRelay (cfg RelayCfg , src * SourcePool , target * Target , topic Topic , filters map [string ]filter.Provider , log * slog.Logger ) (* Relay , error ) {
4344 // If stop-at-end is set, fetch and cache the offsets to determine
4445 // when end is reached.
4546 var offsets TopicOffsets
4647 if cfg .StopAtEnd {
4748 if o , err := target .GetHighWatermark (); err != nil {
4849 return nil , err
4950 } else {
50- offsets = o .KOffsets ()
51+ offsets = o .KOffsets ()[ topic . TargetTopic ]
5152 }
5253 }
5354
@@ -57,10 +58,10 @@ func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filt
5758 target : target ,
5859 log : log ,
5960
60- topics : topics ,
61+ topic : topic ,
6162 signalCh : make (chan struct {}, 1 ),
6263
63- srcOffsets : make (map [string ] map [ int32 ]int64 ),
64+ srcOffsets : make (map [int32 ]int64 ),
6465 targetOffsets : offsets ,
6566 filters : filters ,
6667 }
@@ -116,9 +117,6 @@ func (re *Relay) Start(globalCtx context.Context) error {
116117 defer wg .Done ()
117118 // Wait till main ctx is cancelled.
118119 <- ctx .Done ()
119-
120- // Stop consumer group.
121- re .source .Close ()
122120 }()
123121
124122 // Start the indefinite poll that asks for new connections
@@ -195,7 +193,12 @@ loop:
195193 continue loop
196194 }
197195
198- re .cacheSrcOffsets (of )
196+ srcOffsets := make (map [int32 ]int64 )
197+ of .Each (func (lo kadm.ListedOffset ) {
198+ srcOffsets [lo .Partition ] = lo .Offset
199+ })
200+
201+ re .cacheSrcOffsets (srcOffsets )
199202 firstPoll = false
200203 }
201204
@@ -237,7 +240,6 @@ loop:
237240 }
238241
239242 re .log .Debug ("processed fetches" )
240- server .Client .AllowRebalance ()
241243 }
242244 }
243245}
@@ -246,7 +248,7 @@ loop:
246248func (re * Relay ) processMessage (ctx context.Context , rec * kgo.Record ) error {
247249 // Decrement the end offsets for the given topic and partition till we reach 0
248250 if re .cfg .StopAtEnd {
249- re .decrementSourceOffset (rec .Topic , rec . Partition )
251+ re .decrementSourceOffset (rec .Partition )
250252 }
251253
252254 // If there are filters, run the message through them to decide whether
@@ -266,17 +268,17 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
266268 }
267269 }
268270
269- t , ok := re .topics [rec .Topic ]
270- if ! ok {
271- return nil
272- }
273-
271+ // Add the src message time as a meta header to the target.
272+ // The target consumer can check the lag between the src and target message time if required.
274273 // Repurpose &kgo.Record and forward it to producer to reduce allocs.
275- rec .Headers = nil
274+ rec .Headers = append (rec .Headers , kgo.RecordHeader {
275+ Key : "_t" ,
276+ Value : nsToBytes (rec .Timestamp .UnixNano ()),
277+ })
276278 rec .Timestamp = time.Time {}
277- rec .Topic = t .TargetTopic
278- if ! t .AutoTargetPartition {
279- rec .Partition = int32 (t .TargetPartition )
279+ rec .Topic = re . topic .TargetTopic
280+ if ! re . topic .AutoTargetPartition {
281+ rec .Partition = int32 (re . topic .TargetPartition )
280282 }
281283 rec .Attrs = kgo.RecordAttrs {}
282284 rec .ProducerEpoch = 0
@@ -299,47 +301,35 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
299301}
300302
301303// decrementSourceOffset decrements the offset count for the given topic and partition in the source offsets map.
302- func (re * Relay ) decrementSourceOffset (topic string , partition int32 ) {
303- if o , ok := re .srcOffsets [topic ]; ok {
304- if offset , found := o [partition ]; found && offset > 0 {
305- o [partition ]--
306- re .srcOffsets [topic ] = o
307- }
304+ func (re * Relay ) decrementSourceOffset (partition int32 ) {
305+ if offset , found := re .srcOffsets [partition ]; found && offset > 0 {
306+ re .srcOffsets [partition ] -= 1
308307 }
309308}
310309
311310// cacheSrcOffsets sets the end offsets of the consumer during bootup to exit on consuming everything.
312- func (re * Relay ) cacheSrcOffsets (of kadm.ListedOffsets ) {
313- of .Each (func (lo kadm.ListedOffset ) {
314- ct , ok := re .srcOffsets [lo .Topic ]
315- if ! ok {
316- ct = make (map [int32 ]int64 )
317- }
318- ct [lo .Partition ] = lo .Offset
319- re .srcOffsets [lo .Topic ] = ct
320- })
321-
311+ func (re * Relay ) cacheSrcOffsets (of map [int32 ]int64 ) {
312+ re .srcOffsets = of
322313 // Read till the destination offsets and reduce it from the target weight.
323- for t , po := range re .targetOffsets {
324- for p , o := range po {
325- if ct , ok := re .srcOffsets [t ]; ok {
326- if _ , found := ct [p ]; found {
327- re.srcOffsets [t ][p ] -= o .EpochOffset ().Offset
328- }
329- }
314+ for p , o := range re .targetOffsets {
315+ if _ , ok := re .srcOffsets [p ]; ok {
316+ re .srcOffsets [p ] -= o .EpochOffset ().Offset
330317 }
331318 }
332319}
333320
334321// hasReachedEnd reports if there is any pending messages in given topic-partition.
335322func (re * Relay ) hasReachedEnd () bool {
336- for _ , p := range re .srcOffsets {
337- for _ , o := range p {
338- if o > 0 {
339- return false
340- }
323+ for _ , o := range re .srcOffsets {
324+ if o > 0 {
325+ return false
341326 }
342327 }
343-
344328 return true
345329}
330+
331+ func nsToBytes (ns int64 ) []byte {
332+ // Preallocate a buffer for the byte slice
333+ buf := make ([]byte , 0 , 10 ) // 10 is enough for most integers
334+ return strconv .AppendInt (buf , ns , 10 )
335+ }
0 commit comments