diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fc8eb10d9..cb5772b3d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- Add Kafka consumer instrumentation for Spring Boot 3 ([#5255](https://github.com/getsentry/sentry-java/pull/5255)) - Add Kafka producer instrumentation for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)) - Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250)) - Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136)) diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index bc95af0859..c5ca7444c0 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -244,6 +244,12 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object; } +public final class io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { public fun ()V public fun getOrder ()I @@ -254,6 +260,15 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper : o public fun (Lorg/springframework/kafka/core/KafkaTemplate;Lio/sentry/IScopes;)V } +public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V + public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V +} + public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java new file mode 100644 index 0000000000..0fd52aa6c4 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -0,0 +1,61 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.ScopesAdapter; +import java.lang.reflect.Field; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} + * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. + */ +@ApiStatus.Internal +public final class SentryKafkaConsumerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof AbstractKafkaListenerContainerFactory) { + final @NotNull AbstractKafkaListenerContainerFactory factory = + (AbstractKafkaListenerContainerFactory) bean; + + final @Nullable RecordInterceptor existing = getExistingInterceptor(factory); + if (existing instanceof SentryKafkaRecordInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final RecordInterceptor sentryInterceptor = + new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); + factory.setRecordInterceptor(sentryInterceptor); + } + return bean; + } + + @SuppressWarnings("unchecked") + private @Nullable RecordInterceptor getExistingInterceptor( + final @NotNull AbstractKafkaListenerContainerFactory factory) { + try { + final @NotNull Field field = + AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); + field.setAccessible(true); + return (RecordInterceptor) field.get(factory); + } catch (NoSuchFieldException | IllegalAccessException e) { + return null; + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java new file mode 100644 index 0000000000..419e7834a1 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -0,0 +1,201 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka + * records with distributed tracing support. + */ +@ApiStatus.Internal +public final class SentryKafkaRecordInterceptor implements RecordInterceptor { + + static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer"; + + private final @NotNull IScopes scopes; + private final @Nullable RecordInterceptor delegate; + + private static final @NotNull ThreadLocal currentContext = + new ThreadLocal<>(); + + public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { + this(scopes, null); + } + + public SentryKafkaRecordInterceptor( + final @NotNull IScopes scopes, final @Nullable RecordInterceptor delegate) { + this.scopes = scopes; + this.delegate = delegate; + } + + @Override + public @Nullable ConsumerRecord intercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (!scopes.getOptions().isEnableQueueTracing()) { + return delegateIntercept(record, consumer); + } + + final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor"); + final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); + + continueTrace(forkedScopes, record); + + final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); + currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); + + return delegateIntercept(record, consumer); + } + + @Override + public void success( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.success(record, consumer); + } + } finally { + finishSpan(SpanStatus.OK, null); + } + } + + @Override + public void failure( + final @NotNull ConsumerRecord record, + final @NotNull Exception exception, + final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.failure(record, exception, consumer); + } + } finally { + finishSpan(SpanStatus.INTERNAL_ERROR, exception); + } + } + + @Override + public void afterRecord( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.afterRecord(record, consumer); + } + } + + private @Nullable ConsumerRecord delegateIntercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + return delegate.intercept(record, consumer); + } + return record; + } + + private void continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = + forkedScopes.startTransaction( + new TransactionContext("queue.process", "queue.process"), txOptions); + + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, "messaging.message.id"); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr != null) { + try { + final long enqueuedTime = Long.parseLong(enqueuedTimeStr); + final long latencyMs = System.currentTimeMillis() - enqueuedTime; + if (latencyMs >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); + } + } catch (NumberFormatException ignored) { + // ignore malformed header + } + } + + return transaction; + } + + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { + final @Nullable SentryRecordContext ctx = currentContext.get(); + if (ctx == null) { + return; + } + currentContext.remove(); + + try { + final @Nullable ITransaction transaction = ctx.transaction; + if (transaction != null) { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } + } finally { + ctx.lifecycleToken.close(); + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } + + private static final class SentryRecordContext { + final @NotNull ISentryLifecycleToken lifecycleToken; + final @Nullable ITransaction transaction; + + SentryRecordContext( + final @NotNull ISentryLifecycleToken lifecycleToken, + final @Nullable ITransaction transaction) { + this.lifecycleToken = lifecycleToken; + this.transaction = transaction; + } + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt new file mode 100644 index 0000000000..8595cb9ae7 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -0,0 +1,58 @@ +package io.sentry.spring.jakarta.kafka + +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.mockito.kotlin.mock +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory + +class SentryKafkaConsumerBeanPostProcessorTest { + + @Test + fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + // Verify via reflection that the interceptor was set + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val interceptor = field.get(factory) + assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + // First wrap + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val firstInterceptor = field.get(factory) + + // Second wrap — should be idempotent + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + val secondInterceptor = field.get(factory) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not wrap non-factory beans`() { + val someBean = "not a factory" + val processor = SentryKafkaConsumerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt new file mode 100644 index 0000000000..9b92f19749 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -0,0 +1,202 @@ +package io.sentry.spring.jakarta.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.TransactionContext +import java.nio.charset.StandardCharsets +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.listener.RecordInterceptor + +class SentryKafkaRecordInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + private lateinit var consumer: Consumer + private lateinit var lifecycleToken: ISentryLifecycleToken + + @BeforeTest + fun setup() { + scopes = mock() + consumer = mock() + lifecycleToken = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + whenever(scopes.options).thenReturn(options) + whenever(scopes.isEnabled).thenReturn(true) + + val forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) + + val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + } + + private fun createRecord( + topic: String = "my-topic", + headers: RecordHeaders = RecordHeaders(), + ): ConsumerRecord { + val record = ConsumerRecord(topic, 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + private fun createRecordWithHeaders( + sentryTrace: String? = null, + baggage: String? = null, + enqueuedTime: Long? = null, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER, + it.toString().toByteArray(StandardCharsets.UTF_8), + ) + } + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + @Test + fun `intercept creates forked scopes`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(scopes).forkedScopes("SentryKafkaRecordInterceptor") + } + + @Test + fun `intercept continues trace from headers`() { + val forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) + + val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept calls continueTrace with null when no headers`() { + val forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(scopes.forkedScopes(any())).thenReturn(forkedScopes) + + val tx = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())).thenReturn(tx) + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(scopes, never()).forkedScopes(any()) + assertEquals(record, result) + } + + @Test + fun `delegates to existing interceptor`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + interceptor.intercept(record, consumer) + + verify(delegate).intercept(record, consumer) + } + + @Test + fun `success finishes transaction and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + // intercept first to set up context + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + + verify(delegate).success(record, consumer) + } + + @Test + fun `failure finishes transaction with error and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + val exception = RuntimeException("processing failed") + + interceptor.intercept(record, consumer) + interceptor.failure(record, exception, consumer) + + verify(delegate).failure(record, exception, consumer) + } + + @Test + fun `afterRecord delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.afterRecord(record, consumer) + + verify(delegate).afterRecord(record, consumer) + } + + @Test + fun `trace origin is set correctly`() { + assertEquals( + "auto.queue.spring_jakarta.kafka.consumer", + SentryKafkaRecordInterceptor.TRACE_ORIGIN, + ) + } +}