Skip to content

Commit f596599

Browse files
committed
feat: relay meta key-values from src messages to target
1 parent 3da2883 commit f596599

File tree

2 files changed

+82
-4
lines changed

2 files changed

+82
-4
lines changed

internal/relay/relay.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,13 +268,13 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
268268
}
269269
}
270270

271-
// Pass the src message time as a meta header to the target.
271+
// Add the src message time as a meta header to the target.
272272
// The target consumer can check the lag between the src and target message time if required.
273273
// Repurpose &kgo.Record and forward it to producer to reduce allocs.
274-
rec.Headers = []kgo.RecordHeader{{
275-
Key: "t",
274+
rec.Headers = append(rec.Headers, kgo.RecordHeader{
275+
Key: "_t",
276276
Value: nsToBytes(rec.Timestamp.UnixNano()),
277-
}}
277+
})
278278
rec.Timestamp = time.Time{}
279279
rec.Topic = re.topic.TargetTopic
280280
if !re.topic.AutoTargetPartition {

scripts/produce_messages.sh

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/bin/bash
2+
3+
# Configuration variables
4+
TOPIC_NAME="test-topic" # Target Kafka topic
5+
NUM_MESSAGES=1000000 # Number of messages to produce
6+
BATCH_SIZE=10000 # Number of messages per batch
7+
#BOOTSTRAP_SERVER="localhost:9091" # Kafka broker address
8+
BOOTSTRAP_SERVER="$1" # Kafka broker address
9+
10+
# Function to check if Kafka is running
11+
check_kafka() {
12+
if ! nc -z localhost 9092; then
13+
echo "Error: Kafka broker is not running on localhost:9092"
14+
echo "Please start Kafka before running this script"
15+
exit 1
16+
fi
17+
}
18+
19+
# Function to create topic if it doesn't exist
20+
create_topic() {
21+
kaf topic create $TOPIC_NAME
22+
}
23+
24+
# Function to produce messages in batches
25+
produce_messages() {
26+
local batch_count=$((NUM_MESSAGES / BATCH_SIZE))
27+
local message_count=0
28+
29+
echo "Starting message production..."
30+
echo "Total messages to produce: $NUM_MESSAGES"
31+
echo "Batch size: $BATCH_SIZE"
32+
echo "Number of batches: $batch_count"
33+
34+
for ((batch=1; batch<=batch_count; batch++)); do
35+
# Create a temporary file for the batch
36+
local temp_file=$(mktemp)
37+
38+
# Generate batch of messages
39+
for ((i=1; i<=BATCH_SIZE; i++)); do
40+
message_count=$((message_count + 1))
41+
echo $message_count >> $temp_file
42+
done
43+
44+
# Send batch to Kafka using kaf
45+
cat $temp_file | kaf produce $TOPIC_NAME --brokers $BOOTSTRAP_SERVER
46+
47+
# Clean up temp file
48+
rm $temp_file
49+
50+
# Progress update
51+
if ((batch % 10 == 0)); then
52+
echo "Progress: $message_count / $NUM_MESSAGES messages produced"
53+
fi
54+
done
55+
56+
echo "Message production completed!"
57+
echo "Total messages produced: $message_count"
58+
}
59+
60+
# Main execution
61+
echo "Kafka Message Producer Script"
62+
echo "----------------------------"
63+
64+
# Check if kaf is installed
65+
if ! command -v kaf &> /dev/null; then
66+
echo "Error: kaf is not installed"
67+
echo "Please install kaf before running this script"
68+
exit 1
69+
fi
70+
71+
# Check if Kafka is running
72+
check_kafka
73+
74+
# Create topic if it doesn't exist
75+
create_topic
76+
77+
# Start producing messages
78+
produce_messages

0 commit comments

Comments
 (0)