@@ -34,8 +34,11 @@ type Target struct {
3434 // Map of target topics and their config.
3535 targetTopics Topics
3636
37- batchCh chan * kgo.Record
38- batch []* kgo.Record
37+ // Inlet receives relayed messages into target for batching
38+ inletCh chan * kgo.Record
39+
40+ // Holds the active batch that is produced to destination topic
41+ batch []* kgo.Record
3942}
4043
4144// NewTarget returns a new producer relay that handles target Kafka instances.
@@ -49,7 +52,7 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic
4952 targetTopics : topics ,
5053
5154 batch : make ([]* kgo.Record , 0 , pCfg .BatchSize ),
52- batchCh : make (chan * kgo.Record ),
55+ inletCh : make (chan * kgo.Record , pCfg . BatchSize * 10 ),
5356 }
5457
5558 // Initialize the actual Kafka client.
@@ -73,12 +76,12 @@ func (tg *Target) Close() {
7376
7477// CloseBatchCh closes the Producer batch channel.
7578func (tg * Target ) CloseBatchCh () {
76- close (tg .batchCh )
79+ close (tg .inletCh )
7780}
7881
7982// GetBatchCh returns the Producer batch channel.
8083func (tg * Target ) GetBatchCh () chan * kgo.Record {
81- return tg .batchCh
84+ return tg .inletCh
8285}
8386
8487// prepareRecord checks if custom topic partition mapping is defined.
@@ -111,7 +114,7 @@ func (tg *Target) Start(ctx context.Context) error {
111114 return ctx .Err ()
112115
113116 // Queue the message to and flush if the batch size is reached.
114- case msg , ok := <- tg .batchCh :
117+ case msg , ok := <- tg .inletCh :
115118 if ! ok {
116119 // Flush and cleanup on exit.
117120 if err := tg .drain (); err != nil {
@@ -246,7 +249,7 @@ outerLoop:
246249// drain drains and flushes any pending messages in the producer.
247250func (tg * Target ) drain () error {
248251 now := time .Now ()
249- for rec := range tg .batchCh {
252+ for rec := range tg .inletCh {
250253 tg .prepareRecord (rec )
251254 tg .batch = append (tg .batch , rec )
252255 }
0 commit comments