Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features

- Add Kafka consumer instrumentation for Spring Boot 3 ([#5255](https://github.com/getsentry/sentry-java/pull/5255))
- Add Kafka producer instrumentation for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254))
- Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250))
- Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136))
Expand Down
15 changes: 15 additions & 0 deletions sentry-spring-jakarta/api/sentry-spring-jakarta.api
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand
public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object;
}

public final class io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered {
public fun <init> ()V
public fun getOrder ()I
public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
}

public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered {
public fun <init> ()V
public fun getOrder ()I
Expand All @@ -254,6 +260,15 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerWrapper : o
public fun <init> (Lorg/springframework/kafka/core/KafkaTemplate;Lio/sentry/IScopes;)V
}

public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor {
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V
public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
}

public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration {
public fun <init> ()V
public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.sentry.spring.jakarta.kafka;

import io.sentry.ScopesAdapter;
import java.lang.reflect.Field;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.listener.RecordInterceptor;

/**
* Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory}
* beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate.
*/
@ApiStatus.Internal
public final class SentryKafkaConsumerBeanPostProcessor
implements BeanPostProcessor, PriorityOrdered {

@Override
@SuppressWarnings("unchecked")
public @NotNull Object postProcessAfterInitialization(
final @NotNull Object bean, final @NotNull String beanName) throws BeansException {
if (bean instanceof AbstractKafkaListenerContainerFactory) {
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory =
(AbstractKafkaListenerContainerFactory<?, ?, ?>) bean;

final @Nullable RecordInterceptor<?, ?> existing = getExistingInterceptor(factory);
if (existing instanceof SentryKafkaRecordInterceptor) {
return bean;
}

@SuppressWarnings("rawtypes")
final RecordInterceptor sentryInterceptor =
new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing);
factory.setRecordInterceptor(sentryInterceptor);
}
return bean;
}

@SuppressWarnings("unchecked")
private @Nullable RecordInterceptor<?, ?> getExistingInterceptor(
final @NotNull AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
try {
final @NotNull Field field =
AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor");
field.setAccessible(true);
return (RecordInterceptor<?, ?>) field.get(factory);
} catch (NoSuchFieldException | IllegalAccessException e) {
return null;
}
}

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package io.sentry.spring.jakarta.kafka;

import io.sentry.BaggageHeader;
import io.sentry.IScopes;
import io.sentry.ISentryLifecycleToken;
import io.sentry.ITransaction;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanStatus;
import io.sentry.TransactionContext;
import io.sentry.TransactionOptions;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.kafka.listener.RecordInterceptor;

/**
* A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka
* records with distributed tracing support.
*/
@ApiStatus.Internal
public final class SentryKafkaRecordInterceptor<K, V> implements RecordInterceptor<K, V> {

static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer";

private final @NotNull IScopes scopes;
private final @Nullable RecordInterceptor<K, V> delegate;

private static final @NotNull ThreadLocal<SentryRecordContext> currentContext =
new ThreadLocal<>();

public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) {
this(scopes, null);
}

public SentryKafkaRecordInterceptor(
final @NotNull IScopes scopes, final @Nullable RecordInterceptor<K, V> delegate) {
this.scopes = scopes;
this.delegate = delegate;
}

@Override
public @Nullable ConsumerRecord<K, V> intercept(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
if (!scopes.getOptions().isEnableQueueTracing()) {
return delegateIntercept(record, consumer);
}

final @NotNull IScopes forkedScopes = scopes.forkedScopes("SentryKafkaRecordInterceptor");
final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent();

continueTrace(forkedScopes, record);

final @Nullable ITransaction transaction = startTransaction(forkedScopes, record);
currentContext.set(new SentryRecordContext(lifecycleToken, transaction));

return delegateIntercept(record, consumer);
}

@Override
public void success(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
try {
if (delegate != null) {
delegate.success(record, consumer);
}
} finally {
finishSpan(SpanStatus.OK, null);
}
}

@Override
public void failure(
final @NotNull ConsumerRecord<K, V> record,
final @NotNull Exception exception,
final @NotNull Consumer<K, V> consumer) {
try {
if (delegate != null) {
delegate.failure(record, exception, consumer);
}
} finally {
finishSpan(SpanStatus.INTERNAL_ERROR, exception);
}
}

@Override
public void afterRecord(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
if (delegate != null) {
delegate.afterRecord(record, consumer);
}
}

private @Nullable ConsumerRecord<K, V> delegateIntercept(
final @NotNull ConsumerRecord<K, V> record, final @NotNull Consumer<K, V> consumer) {
if (delegate != null) {
return delegate.intercept(record, consumer);
}
return record;
}

private void continueTrace(
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
final @Nullable List<String> baggageHeaders =
baggage != null ? Collections.singletonList(baggage) : null;
forkedScopes.continueTrace(sentryTrace, baggageHeaders);
}

private @Nullable ITransaction startTransaction(
final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord<K, V> record) {
if (!forkedScopes.getOptions().isTracingEnabled()) {
return null;
}

final @NotNull TransactionOptions txOptions = new TransactionOptions();
txOptions.setOrigin(TRACE_ORIGIN);
txOptions.setBindToScope(true);

final @NotNull ITransaction transaction =
forkedScopes.startTransaction(
new TransactionContext("queue.process", "queue.process"), txOptions);

if (transaction.isNoOp()) {
return null;
}

transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());

final @Nullable String messageId = headerValue(record, "messaging.message.id");
if (messageId != null) {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId);
}

final @Nullable String enqueuedTimeStr =
headerValue(record, SentryKafkaProducerWrapper.SENTRY_ENQUEUED_TIME_HEADER);
if (enqueuedTimeStr != null) {
try {
final long enqueuedTime = Long.parseLong(enqueuedTimeStr);
final long latencyMs = System.currentTimeMillis() - enqueuedTime;
if (latencyMs >= 0) {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs);
}
} catch (NumberFormatException ignored) {
// ignore malformed header
}
}

return transaction;
}

private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) {
final @Nullable SentryRecordContext ctx = currentContext.get();
if (ctx == null) {
return;
}
currentContext.remove();

try {
final @Nullable ITransaction transaction = ctx.transaction;
if (transaction != null) {
transaction.setStatus(status);
if (throwable != null) {
transaction.setThrowable(throwable);
}
transaction.finish();
}
} finally {
ctx.lifecycleToken.close();
}
}

private @Nullable String headerValue(
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
final @Nullable Header header = record.headers().lastHeader(headerName);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), StandardCharsets.UTF_8);
}

private static final class SentryRecordContext {
final @NotNull ISentryLifecycleToken lifecycleToken;
final @Nullable ITransaction transaction;

SentryRecordContext(
final @NotNull ISentryLifecycleToken lifecycleToken,
final @Nullable ITransaction transaction) {
this.lifecycleToken = lifecycleToken;
this.transaction = transaction;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.sentry.spring.jakarta.kafka

import kotlin.test.Test
import kotlin.test.assertSame
import kotlin.test.assertTrue
import org.mockito.kotlin.mock
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory

class SentryKafkaConsumerBeanPostProcessorTest {

@Test
fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() {
val consumerFactory = mock<ConsumerFactory<String, String>>()
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory

val processor = SentryKafkaConsumerBeanPostProcessor()
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")

// Verify via reflection that the interceptor was set
val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
field.isAccessible = true
val interceptor = field.get(factory)
assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>)
}

@Test
fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() {
val consumerFactory = mock<ConsumerFactory<String, String>>()
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory

val processor = SentryKafkaConsumerBeanPostProcessor()
// First wrap
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")

val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor")
field.isAccessible = true
val firstInterceptor = field.get(factory)

// Second wrap โ€” should be idempotent
processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory")
val secondInterceptor = field.get(factory)

assertSame(firstInterceptor, secondInterceptor)
}

@Test
fun `does not wrap non-factory beans`() {
val someBean = "not a factory"
val processor = SentryKafkaConsumerBeanPostProcessor()

val result = processor.postProcessAfterInitialization(someBean, "someBean")

assertSame(someBean, result)
}
}
Loading
Loading