Skip to content

Commit edfcd06

Browse files
committed
create a custom protocol for each network address for Kafka
1 parent 204e3d3 commit edfcd06

File tree

13 files changed

+108
-28
lines changed

13 files changed

+108
-28
lines changed

nebula-cli/src/main/kotlin/com/orbitalhq/nebula/cli/NebulaCli.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.orbitalhq.nebula.cli
22

3+
import com.orbitalhq.nebula.HostConfig
34
import com.orbitalhq.nebula.NebulaConfig
45
import com.orbitalhq.nebula.NebulaStackWithSource
56
import com.orbitalhq.nebula.StackRunner
@@ -87,7 +88,7 @@ class Nebula : Callable<Int> {
8788

8889
val scriptRunner = NebulaScriptExecutor()
8990
val stack = scriptRunner.runScript(file)
90-
val stackWithSource = NebulaStackWithSource(stack, file.readText())
91+
val stackWithSource = NebulaStackWithSource(stack, file.readText(), HostConfig.UNKNOWN)
9192

9293
val stackRunner = StackRunner(nebulaConfig)
9394
stackRunner.submit(stackWithSource)

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/InfrastructureComponent.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ interface InfrastructureComponent<T> {
2222
* Used for display / diagnostics
2323
*/
2424
val type: ComponentType
25-
fun start(nebulaConfig: NebulaConfig): ComponentInfo<T>
25+
fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig = HostConfig.UNKNOWN): ComponentInfo<T>
2626
fun stop()
2727

2828
val componentInfo: ComponentInfo<T>?

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/NebulaStack.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ typealias StackName = String
1818

1919
data class NebulaStackWithSource(
2020
val stack: NebulaStack,
21-
val source: String
21+
val source: String,
22+
val hostConfig: HostConfig
2223
) {
2324
fun withName(id: String):NebulaStackWithSource {
2425
return this.copy(stack = stack.withName(id))
@@ -54,10 +55,10 @@ class NebulaStack(
5455
return component
5556
}
5657

57-
fun startComponents(config: NebulaConfig): Map<String, ComponentInfo<out Any?>> {
58+
fun startComponents(config: NebulaConfig, hostConfig: HostConfig): Map<String, ComponentInfo<out Any?>> {
5859
stackStateEventSource.listenForEvents(name, components)
5960
return components.associate { component ->
60-
component.type to component.start(config)
61+
component.type to component.start(config, hostConfig)
6162
}
6263
}
6364

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/StackRunner.kt

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@ data class NebulaConfig(
1313
val networkName: String = "nebula_network",
1414
val network: Network = Network.newNetwork()
1515
)
16+
17+
/**
18+
* Captures the details of the host, as captured when a stack
19+
* is submitted.
20+
* We can use this to determine things like the IP address that the
21+
* stack is accessible via, to configure components who are strict
22+
* about which network interfaces they listen on (eg., Kafka)
23+
*/
24+
data class HostConfig(
25+
val hostAddresses:List<String>
26+
) {
27+
companion object {
28+
val UNKNOWN = HostConfig(emptyList())
29+
}
30+
}
1631
class StackRunner(private val config: NebulaConfig = NebulaConfig()) {
1732
private val logger = KotlinLogging.logger {}
1833
val stacks = ConcurrentHashMap<StackName, NebulaStackWithSource>()
@@ -63,12 +78,14 @@ class StackRunner(private val config: NebulaConfig = NebulaConfig()) {
6378

6479

6580
private fun start(name: String) {
66-
val stack = this.stacks[name]?.stack ?: error("Stack $name not found")
81+
val stackWithSource = this.stacks[name] ?: error("Stack $name not found")
82+
val stack = stackWithSource.stack
83+
val hostConfig = stackWithSource.hostConfig
6784
stack.lifecycleEvents.subscribe { event ->
6885
logger.info { event.toString() }
6986
}
7087
logger.info { "Starting ${stack.name}" }
71-
val state: Map<String, ComponentInfo<out Any?>> = stack.startComponents(config)
88+
val state: Map<String, ComponentInfo<out Any?>> = stack.startComponents(config, hostConfig)
7289

7390
_stackState[name] = state
7491
}
@@ -114,7 +131,7 @@ fun NebulaStackWithSource.start(): StackRunner {
114131
*/
115132
fun NebulaStack.start(): StackRunner {
116133
val executor = StackRunner()
117-
val stackWithSource = NebulaStackWithSource(this, "Source not provided - Random UUID follows - ${UUID.randomUUID()}" )
134+
val stackWithSource = NebulaStackWithSource(this, "Source not provided - Random UUID follows - ${UUID.randomUUID()}" , hostConfig = HostConfig.UNKNOWN)
118135
executor.submit(stackWithSource)
119136
return executor
120137
}

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/hazelcast/HazelcastExecutor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.orbitalhq.nebula.hazelcast
22

3+
import com.orbitalhq.nebula.HostConfig
34
import com.orbitalhq.nebula.InfrastructureComponent
45
import com.orbitalhq.nebula.NebulaConfig
56
import com.orbitalhq.nebula.StackRunner
@@ -34,7 +35,7 @@ class HazelcastExecutor(private val config: HazelcastConfig) : InfrastructureCom
3435
override val type: ComponentType = "hazelcast"
3536
private val eventSource = ComponentLifecycleEventSource()
3637

37-
override fun start(nebulaConfig: NebulaConfig): ComponentInfo<HazelcastContainerConfig> {
38+
override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig): ComponentInfo<HazelcastContainerConfig> {
3839
eventSource.starting()
3940
container = GenericContainer(DockerImageName.parse(config.imageName))
4041
.withExposedPorts(5701)

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/http/HttpExecutor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.orbitalhq.nebula.http
22

3+
import com.orbitalhq.nebula.HostConfig
34
import com.orbitalhq.nebula.InfrastructureComponent
45
import com.orbitalhq.nebula.NebulaConfig
56
import com.orbitalhq.nebula.StackRunner
@@ -47,7 +48,7 @@ class HttpExecutor(private val config: HttpConfig) : InfrastructureComponent<Htt
4748
override var componentInfo: ComponentInfo<HttpServerConfig>? = null
4849
private set
4950

50-
override fun start(nebulaConfig: NebulaConfig):ComponentInfo<HttpServerConfig> {
51+
override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig):ComponentInfo<HttpServerConfig> {
5152
eventSource.starting()
5253
server = embeddedServer(Netty, port = port) {
5354
routing {

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaExecutor.kt

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.orbitalhq.nebula.kafka
22

3+
import com.orbitalhq.nebula.HostConfig
34
import com.orbitalhq.nebula.InfrastructureComponent
45
import com.orbitalhq.nebula.NebulaConfig
56
import com.orbitalhq.nebula.StackRunner
@@ -10,6 +11,7 @@ import com.orbitalhq.nebula.core.HostNameAwareContainerConfig
1011
import com.orbitalhq.nebula.events.ComponentLifecycleEventSource
1112
import com.orbitalhq.nebula.utils.updateHostReferences
1213
import io.github.oshai.kotlinlogging.KotlinLogging
14+
import io.ktor.http.*
1315
import kotlinx.coroutines.CoroutineScope
1416
import kotlinx.coroutines.Dispatchers
1517
import kotlinx.coroutines.Job
@@ -58,10 +60,15 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
5860
override var componentInfo: ComponentInfo<KafkaContainerConfig>? = null
5961
private set
6062

61-
override fun start(nebulaConfig: NebulaConfig): ComponentInfo<KafkaContainerConfig> {
63+
override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig): ComponentInfo<KafkaContainerConfig> {
64+
6265
kafkaContainer = KafkaContainer(DockerImageName.parse(config.imageName))
66+
.let { container ->
67+
configureExternalListenerAddresses(hostConfig, container)
68+
}
6369
.withNetwork(nebulaConfig.network)
6470
.withNetworkAliases(config.componentName)
71+
6572
eventSource.startContainerAndEmitEvents(kafkaContainer)
6673

6774
val bootstrapServers = kafkaContainer.bootstrapServers
@@ -104,7 +111,8 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
104111
componentInfo = ComponentInfo(
105112
containerInfoFrom(kafkaContainer),
106113
KafkaContainerConfig(
107-
kafkaContainer.bootstrapServers
114+
kafkaContainer.bootstrapServers,
115+
addressToProtocol
108116
),
109117
type = type,
110118
name = name,
@@ -115,6 +123,41 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
115123
eventSource.running()
116124
return componentInfo!!
117125
}
126+
private val addressToProtocol = mutableMapOf<String,String>()
127+
128+
/**
129+
* Configures listeners for the external addresses that
130+
* we're known by. Otherwise consumers who are connecting from
131+
* an external IP address can't connect to Kafka.
132+
*
133+
* Works by establishing a custom protocol mapping (test containers adds a TC-0 / TC-1 etc.. to the front)
134+
*/
135+
private fun configureExternalListenerAddresses(
136+
hostConfig: HostConfig,
137+
container: KafkaContainer
138+
): KafkaContainer {
139+
hostConfig.hostAddresses.forEachIndexed { index, externalAddress ->
140+
// each custom address gets a port incrementally higher than 9094
141+
val port = 9094 + index
142+
143+
container.addExposedPort(port)
144+
container.withListener {
145+
try {
146+
// If this doesn't throw an error, we're building the KAFKA_ADVERTISED_LISTENER part
147+
val mappedPort = container.getMappedPort(port)
148+
logger.info { "Configuring external Kafka listener for $externalAddress:$port" }
149+
// Test containers maps these as TC-0://0.0.0.0:909X
150+
val protocol = "TC-$index"
151+
addressToProtocol.put(externalAddress, "$protocol://$externalAddress:$mappedPort")
152+
"$externalAddress:$mappedPort"
153+
} catch (e: IllegalStateException) {
154+
// The container isn't ready yet, so we're building the internal KAFKA_LISTENER
155+
"0.0.0.0:$port"
156+
}
157+
}
158+
}
159+
return container
160+
}
118161

119162
override fun stop() {
120163
eventSource.stopping()
@@ -146,15 +189,15 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
146189
val adminProps = Properties().apply {
147190
put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
148191
}
149-
192+
150193
AdminClient.create(adminProps).use { adminClient ->
151194
// Group topics by name to handle duplicates and use max partitions
152195
val topicConfigs = producers.groupBy { it.topic }
153196
.mapValues { (_, configs) -> configs.maxOf { it.partitions } }
154197
.map { (topicName, partitions) ->
155198
NewTopic(topicName, partitions, 1.toShort()) // replication factor of 1 for single broker
156199
}
157-
200+
158201
if (topicConfigs.isNotEmpty()) {
159202
try {
160203
val result = adminClient.createTopics(topicConfigs)
@@ -170,8 +213,14 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent<K
170213
}
171214
}
172215

173-
data class KafkaContainerConfig(val bootstrapServers: String) : HostNameAwareContainerConfig<KafkaContainerConfig> {
216+
data class KafkaContainerConfig(val bootstrapServers: String, private val externalAddressToListenerProtocol:Map<String,String>) : HostNameAwareContainerConfig<KafkaContainerConfig> {
174217
override fun updateHostReferences(containerHost: String, publicHost: String): KafkaContainerConfig {
175-
return copy(bootstrapServers = bootstrapServers.updateHostReferences(containerHost, publicHost))
218+
// If we configured a dedicated bootstrap server protocol for this host, then use that
219+
val dedicatedBootstrapServer = externalAddressToListenerProtocol[publicHost]
220+
return if (dedicatedBootstrapServer != null) {
221+
copy(bootstrapServers = dedicatedBootstrapServer)
222+
} else {
223+
this
224+
}
176225
}
177226
}

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/mongo/MongoExecutor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.orbitalhq.nebula.mongo
22

33
import com.mongodb.client.MongoClients
4+
import com.orbitalhq.nebula.HostConfig
45
import com.orbitalhq.nebula.InfrastructureComponent
56
import com.orbitalhq.nebula.NebulaConfig
67
import com.orbitalhq.nebula.StackRunner
@@ -34,7 +35,7 @@ class MongoExecutor(private val config: MongoConfig) : InfrastructureComponent<M
3435
private val logger = KotlinLogging.logger {}
3536
}
3637

37-
override fun start(nebulaConfig: NebulaConfig): ComponentInfo<MongoContainerConfig> {
38+
override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig): ComponentInfo<MongoContainerConfig> {
3839
mongoContainer = MongoDBContainer(DockerImageName.parse(config.imageName))
3940
.withNetwork(nebulaConfig.network)
4041
.withNetworkAliases(config.componentName)

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/s3/S3Executor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.orbitalhq.nebula.s3
22

3+
import com.orbitalhq.nebula.HostConfig
34
import com.orbitalhq.nebula.InfrastructureComponent
45
import com.orbitalhq.nebula.NebulaConfig
56
import com.orbitalhq.nebula.StackRunner
@@ -43,7 +44,7 @@ class S3Executor(private val config: S3Config) : InfrastructureComponent<Localst
4344
override var componentInfo: ComponentInfo<LocalstackContainerConfig>? = null
4445
private set
4546

46-
override fun start(nebulaConfig: NebulaConfig):ComponentInfo<LocalstackContainerConfig> {
47+
override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig):ComponentInfo<LocalstackContainerConfig> {
4748
localstack = LocalStackContainer(DockerImageName.parse(config.imageName))
4849
.withServices(LocalStackContainer.Service.S3)
4950
.withNetwork(nebulaConfig.network)

nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/sql/DatabaseExecutor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.orbitalhq.nebula.sql
22

3+
import com.orbitalhq.nebula.HostConfig
34
import com.orbitalhq.nebula.InfrastructureComponent
45
import com.orbitalhq.nebula.NebulaConfig
56
import com.orbitalhq.nebula.StackRunner
@@ -45,7 +46,7 @@ class DatabaseExecutor(private val config: DatabaseConfig) : InfrastructureCompo
4546
override var componentInfo: ComponentInfo<DatabaseContainerConfig>? = null
4647
private set
4748

48-
override fun start(nebulaConfig: NebulaConfig): ComponentInfo<DatabaseContainerConfig> {
49+
override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig): ComponentInfo<DatabaseContainerConfig> {
4950
databaseContainer = config.container.withDatabaseName(config.databaseName)
5051
.withNetwork(nebulaConfig.network)
5152
.withNetworkAliases(config.componentName)

0 commit comments

Comments
 (0)