Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,16 @@ else if (listener instanceof MessageListener) {
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
this.commonErrorHandler = determineCommonErrorHandler();
// Setup async failure callback for suspend functions when CommonErrorHandler is explicitly configured
if (getCommonErrorHandler() != null && this.listener != null) {
MessageListener<?, ?> target = unwrapDelegateIfAny(this.listener);
if (target instanceof RecordMessagingMessageListenerAdapter<?, ?>) {
@SuppressWarnings("unchecked")
RecordMessagingMessageListenerAdapter<K, V> adapter =
(RecordMessagingMessageListenerAdapter<K, V>) target;
adapter.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
}
}
Assert.state(!this.isBatchListener || !this.isRecordAck,
"Cannot use AckMode.RECORD with a batch listener");
if (this.containerProperties.getScheduler() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.kafka.listener.KafkaListenerErrorHandler
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.test.EmbeddedKafkaBroker
Expand All @@ -59,7 +60,8 @@ import java.util.concurrent.TimeUnit
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1)
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1",
"kotlinAsyncTestTopicCommonHandler"], partitions = 1)
class EnableKafkaKotlinCoroutinesTests {

@Autowired
Expand Down Expand Up @@ -108,6 +110,13 @@ class EnableKafkaKotlinCoroutinesTests {
assertThat(cr?.value() ?: "null").isEqualTo("FOO")
}

@Test
fun `test suspend function with CommonErrorHandler`() {
this.template.send("kotlinAsyncTestTopicCommonHandler", "fail")
assertThat(this.config.commonHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue()
assertThat(this.config.commonHandlerInvoked).isTrue()
}

@KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"],
containerFactory = "kafkaListenerContainerFactory")
class Listener {
Expand Down Expand Up @@ -138,6 +147,9 @@ class EnableKafkaKotlinCoroutinesTests {
@Volatile
var batchError: Boolean = false

@Volatile
var commonHandlerInvoked: Boolean = false

val latch1 = CountDownLatch(1)

val latch2 = CountDownLatch(1)
Expand All @@ -146,6 +158,8 @@ class EnableKafkaKotlinCoroutinesTests {

val batchLatch2 = CountDownLatch(1)

val commonHandlerLatch = CountDownLatch(1)

@Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private lateinit var brokerAddresses: String

Expand Down Expand Up @@ -217,6 +231,23 @@ class EnableKafkaKotlinCoroutinesTests {
return factory
}

@Bean
fun commonErrorHandler(): DefaultErrorHandler {
return DefaultErrorHandler { record, exception ->
commonHandlerInvoked = true
commonHandlerLatch.countDown()
}
}

@Bean
fun kafkaListenerContainerFactoryWithCommonHandler(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
= ConcurrentKafkaListenerContainerFactory()
factory.setConsumerFactory(kcf())
factory.setCommonErrorHandler(commonErrorHandler())
return factory
}

@KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"],
containerFactory = "kafkaListenerContainerFactory")
suspend fun listen(value: String, acknowledgment: Acknowledgment) {
Expand Down Expand Up @@ -247,6 +278,14 @@ class EnableKafkaKotlinCoroutinesTests {
}
}

@KafkaListener(id = "kotlin-common-handler", topics = ["kotlinAsyncTestTopicCommonHandler"],
containerFactory = "kafkaListenerContainerFactoryWithCommonHandler")
suspend fun listenWithCommonHandler(value: String) {
if (value == "fail") {
throw RuntimeException("Test exception for CommonErrorHandler")
}
}

}

}