Skip to content

Commit df8c926

Browse files
authored
Merge pull request #56 from orbitalapi/allow-kafka-partitions
Add partition support to Kafka topic creation
2 parents 12f5704 + 48d0c9b commit df8c926

File tree

2 files changed

+42
-3
lines changed

2 files changed

+42
-3
lines changed

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaDsl.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ class KafkaBuilder(private val imageName: String, private val componentName: Com
3636
private val producers = mutableListOf<ProducerConfig>()
3737

3838
fun producer(frequency: Duration, topic: String,
39+
partitions: Int = 1,
3940
keySerializer: MessageSerializer = MessageSerializer.String,
4041
valueSerializer: MessageSerializer = MessageSerializer.String,
4142
init: ProducerBuilder.() -> Unit) {
42-
producers.add(ProducerBuilder(frequency, topic, keySerializer.serializerClass, valueSerializer.serializerClass).apply(init).build())
43+
producers.add(ProducerBuilder(frequency, topic, partitions, keySerializer.serializerClass, valueSerializer.serializerClass).apply(init).build())
4344
}
4445

4546
fun build(): KafkaConfig = KafkaConfig(imageName, producers, componentName)
@@ -49,6 +50,7 @@ class KafkaBuilder(private val imageName: String, private val componentName: Com
4950
class ProducerBuilder(
5051
private val frequency: Duration,
5152
private val topic: String,
53+
private val partitions: Int,
5254
private val keySerializer: Class<out Serializer<*>>,
5355
private val valueSerializer: Class<out Serializer<*>>
5456
) {
@@ -153,7 +155,7 @@ class ProducerBuilder(
153155
}
154156

155157
fun build(): ProducerConfig {
156-
return ProducerConfig(frequency, topic, keySerializer, valueSerializer, messageGenerator)
158+
return ProducerConfig(frequency, topic, partitions, keySerializer, valueSerializer, messageGenerator)
157159
}
158160
}
159161

@@ -165,7 +167,9 @@ data class KafkaConfig(
165167
)
166168

167169
data class ProducerConfig(
168-
val frequency: Duration, val topic: String,
170+
val frequency: Duration,
171+
val topic: String,
172+
val partitions: Int,
169173
val keySerializer: Class<out Serializer<*>>,
170174
val valueSerializer: Class<out Serializer<*>>,
171175
val messageGenerator: () -> Any

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaExecutor.kt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,21 @@ import kotlinx.coroutines.Job
1414
import kotlinx.coroutines.delay
1515
import kotlinx.coroutines.isActive
1616
import kotlinx.coroutines.launch
17+
import org.apache.kafka.clients.admin.AdminClient
18+
import org.apache.kafka.clients.admin.AdminClientConfig
19+
import org.apache.kafka.clients.admin.NewTopic
1720
import org.apache.kafka.clients.producer.KafkaProducer
1821
import org.apache.kafka.clients.producer.Producer
1922
import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
2023
import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
2124
import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
2225
import org.apache.kafka.clients.producer.ProducerRecord
26+
import org.apache.kafka.common.errors.TopicExistsException
2327
import org.testcontainers.containers.KafkaContainer
2428
import org.testcontainers.utility.DockerImageName
2529
import reactor.core.publisher.Flux
2630
import java.util.*
31+
import java.util.concurrent.TimeUnit
2732

2833
val StackRunner.kafka: List<KafkaExecutor>
2934
get() {
@@ -60,6 +65,9 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
6065
val bootstrapServers = kafkaContainer.bootstrapServers
6166
logger.info { "Kafka container started - bootstrap servers: $bootstrapServers" }
6267

68+
// Create topics with specified partitions
69+
createTopics(bootstrapServers, config.producers)
70+
6371
config.producers.forEach { producerConfig ->
6472
val producer = createKafkaProducer(bootstrapServers, producerConfig)
6573
producers.add(producer)
@@ -131,6 +139,33 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
131139
}
132140
return KafkaProducer(props)
133141
}
142+
143+
private fun createTopics(bootstrapServers: String, producers: List<ProducerConfig>) {
144+
val adminProps = Properties().apply {
145+
put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
146+
}
147+
148+
AdminClient.create(adminProps).use { adminClient ->
149+
// Group topics by name to handle duplicates and use max partitions
150+
val topicConfigs = producers.groupBy { it.topic }
151+
.mapValues { (_, configs) -> configs.maxOf { it.partitions } }
152+
.map { (topicName, partitions) ->
153+
NewTopic(topicName, partitions, 1.toShort()) // replication factor of 1 for single broker
154+
}
155+
156+
if (topicConfigs.isNotEmpty()) {
157+
try {
158+
val result = adminClient.createTopics(topicConfigs)
159+
result.all().get(30, TimeUnit.SECONDS) // Wait for completion with timeout
160+
logger.info { "Created topics: ${topicConfigs.map { "${it.name()}(${it.numPartitions()} partitions)" }}" }
161+
} catch (e: TopicExistsException) {
162+
logger.warn(e) { "Some topics already exist: ${e.message}" }
163+
} catch (e: Exception) {
164+
logger.error(e) { "Failed to create topics due to unexpected error: ${e.message}" }
165+
}
166+
}
167+
}
168+
}
134169
}
135170

136171
data class KafkaContainerConfig(val bootstrapServers: String)

0 commit comments

Comments
 (0)