From 7fdf04375ae157a6a206d8802f124a8a207f1769 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 12:06:55 -0800
Subject: [PATCH 01/12] Adding a Flux based subscribeToEvents method
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 155 ++++++++++++++++++
.../io/dapr/client/DaprPreviewClient.java | 29 +++-
.../client/DaprPreviewClientGrpcTest.java | 69 ++++++++
3 files changed, 252 insertions(+), 1 deletion(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index 012921a89e..9854d96051 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -96,6 +96,7 @@
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
+import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.dapr.v1.DaprProtos.ActiveActorsCount;
@@ -133,6 +134,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -475,6 +480,139 @@ public Subscription subscribeToEvents(
return buildSubscription(listener, type, request);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type) {
+ DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
+ DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
+ .setTopic(topic)
+ .setPubsubName(pubsubName)
+ .build();
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
+ DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
+ .setInitialRequest(initialRequest)
+ .build();
+
+ return Flux.create(sink -> {
+ var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
+ BlockingQueue ackQueue = new LinkedBlockingQueue<>(50);
+ AtomicReference> streamRef =
+ new AtomicReference<>();
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ // Thread to send acknowledgments back to Dapr
+ Thread acker = new Thread(() -> {
+ while (running.get()) {
+ try {
+ var ackResponse = ackQueue.take();
+ if (ackResponse == null) {
+ continue;
+ }
+
+ var stream = streamRef.get();
+ if (stream == null) {
+ Thread.sleep(100);
+ continue;
+ }
+
+ stream.onNext(ackResponse);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ });
+
+ // Create the gRPC streaming observer
+ var stream = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
+ @Override
+ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
+ try {
+ if (response.getEventMessage() == null) {
+ return;
+ }
+
+ var message = response.getEventMessage();
+ if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) {
+ return;
+ }
+
+ var id = message.getId();
+ if ((id == null) || id.isEmpty()) {
+ return;
+ }
+
+ // Deserialize the event data
+ T data = null;
+ if (type != null) {
+ data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
+ }
+
+ // Emit the data to the Flux (only if not null)
+ if (data != null) {
+ sink.next(data);
+ }
+
+ // Send SUCCESS acknowledgment
+ var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS);
+ ackQueue.put(ack);
+
+ } catch (Exception e) {
+ // On error during processing, send RETRY acknowledgment
+ try {
+ var id = response.getEventMessage().getId();
+ if (id != null && !id.isEmpty()) {
+ var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY);
+ ackQueue.put(ack);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ sink.error(DaprException.propagate(e));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ running.set(false);
+ sink.error(DaprException.propagate(throwable));
+ }
+
+ @Override
+ public void onCompleted() {
+ running.set(false);
+ sink.complete();
+ }
+ });
+
+ streamRef.set(stream);
+ acker.start();
+
+ // Send initial request to start receiving events
+ stream.onNext(request);
+
+ // Cleanup when Flux is cancelled or completed
+ sink.onDispose(() -> {
+ running.set(false);
+ acker.interrupt();
+ try {
+ stream.onCompleted();
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ });
+ }, FluxSink.OverflowStrategy.BUFFER);
+ }
+
@Nonnull
private Subscription buildSubscription(
SubscriptionListener listener,
@@ -513,6 +651,23 @@ private Subscription buildSubscription(
return subscription;
}
+ @Nonnull
+ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
+ String id, SubscriptionListener.Status status) {
+ DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =
+ DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder()
+ .setId(id)
+ .setStatus(
+ DaprAppCallbackProtos.TopicEventResponse.newBuilder()
+ .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf(
+ status.name()))
+ .build())
+ .build();
+ return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
+ .setEventProcessed(eventProcessed)
+ .build();
+ }
+
@Override
public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) {
try {
diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
index 92c6a61c3e..8c66979df9 100644
--- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
+++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
@@ -32,6 +32,7 @@
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.utils.TypeRef;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
@@ -271,12 +272,38 @@ Mono> publishEvents(String pubsubName, String topicNa
* @param topic Name of the topic to subscribe to.
* @param listener Callback methods to process events.
* @param type Type for object deserialization.
- * @return An active subscription.
* @param Type of object deserialization.
+ * @return An active subscription.
+ * @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
*/
+ @Deprecated
Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener listener, TypeRef type);
+ /**
+ * Subscribe to pubsub events via streaming using Project Reactor Flux.
+ *
+ * This method returns a reactive stream of events that can be processed using standard Reactor operators.
+ * Events are automatically acknowledged as SUCCESS when processed successfully, or RETRY when an error occurs.
+ * Users can control retry behavior using standard Flux operators like {@code retry()} and {@code onErrorResume()}.
+ *
+ *
+ * Example usage:
+ *
{@code
+ * client.subscribeToEvents("pubsub", "orders", TypeRef.get(Order.class))
+ * .doOnNext(order -> processOrder(order))
+ * .retry(3) // Retry up to 3 times on errors
+ * .subscribe();
+ * }
+ *
+ * @param pubsubName Name of the pubsub component.
+ * @param topic Name of the topic to subscribe to.
+ * @param type Type for object deserialization.
+ * @return A Flux of deserialized event payloads.
+ * @param Type of the event payload.
+ */
+ Flux subscribeToEvents(String pubsubName, String topic, TypeRef type);
+
/**
* Schedules a job using the provided job request details.
*
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
index f7b5584cc7..e0db8174e2 100644
--- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
@@ -586,6 +586,75 @@ public void onError(RuntimeException exception) {
assertEquals(numErrors, errors.size());
}
+ @Test
+ public void subscribeEventFluxTest() throws Exception {
+ var numEvents = 100;
+ var pubsubName = "pubsubName";
+ var topicName = "topicName";
+ var data = "my message";
+
+ var started = new Semaphore(0);
+
+ doAnswer((Answer>) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[0];
+ var emitterThread = new Thread(() -> {
+ try {
+ started.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
+ for (int i = 0; i < numEvents; i++) {
+ observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder()
+ .setId(Integer.toString(i))
+ .setPubsubName(pubsubName)
+ .setTopic(topicName)
+ .setData(ByteString.copyFromUtf8("\"" + data + "\""))
+ .setDataContentType("application/json")
+ .build())
+ .build());
+ }
+ observer.onCompleted();
+ });
+ emitterThread.start();
+ return new StreamObserver<>() {
+
+ @Override
+ public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) {
+ started.release();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
+
+ final Set receivedEvents = Collections.synchronizedSet(new HashSet<>());
+ final Semaphore gotAll = new Semaphore(0);
+
+ var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING)
+ .doOnNext(eventData -> {
+ assertEquals(data, eventData);
+ receivedEvents.add(eventData);
+ if (receivedEvents.size() >= numEvents) {
+ gotAll.release();
+ }
+ })
+ .subscribe();
+
+ gotAll.acquire();
+ disposable.dispose();
+
+ assertEquals(numEvents, receivedEvents.size());
+ }
+
@Test
public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception {
List inputs = new ArrayList<>();
From 4606c8d4d268c62ecb5fa3db44b2b097beee5c88 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 12:25:41 -0800
Subject: [PATCH 02/12] Simplify GRPC stream handling
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 69 +++++--------------
1 file changed, 16 insertions(+), 53 deletions(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index 9854d96051..b2a951ce2e 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -134,10 +134,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -497,43 +493,15 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
return Flux.create(sink -> {
var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
- BlockingQueue ackQueue = new LinkedBlockingQueue<>(50);
- AtomicReference> streamRef =
- new AtomicReference<>();
- AtomicBoolean running = new AtomicBoolean(true);
-
- // Thread to send acknowledgments back to Dapr
- Thread acker = new Thread(() -> {
- while (running.get()) {
- try {
- var ackResponse = ackQueue.take();
- if (ackResponse == null) {
- continue;
- }
- var stream = streamRef.get();
- if (stream == null) {
- Thread.sleep(100);
- continue;
- }
+ // Use array wrapper to allow assignment within anonymous class (Java's effectively final requirement)
+ // This is simpler than AtomicReference since we don't need atomicity - just mutability
+ @SuppressWarnings("unchecked")
+ StreamObserver[] streamHolder = new StreamObserver[1];
- stream.onNext(ackResponse);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- } catch (Exception e) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- }
- }
- });
-
- // Create the gRPC streaming observer
- var stream = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
+ // Create the gRPC bidirectional streaming observer
+ // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly
+ streamHolder[0] = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
@Override
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
try {
@@ -562,9 +530,9 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
sink.next(data);
}
- // Send SUCCESS acknowledgment
+ // Send SUCCESS acknowledgment directly (no blocking queue or thread needed)
var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS);
- ackQueue.put(ack);
+ streamHolder[0].onNext(ack);
} catch (Exception e) {
// On error during processing, send RETRY acknowledgment
@@ -572,10 +540,12 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
var id = response.getEventMessage().getId();
if (id != null && !id.isEmpty()) {
var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY);
- ackQueue.put(ack);
+ streamHolder[0].onNext(ack);
}
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
+ } catch (Exception ex) {
+ // If we can't send ack, propagate the error
+ sink.error(DaprException.propagate(ex));
+ return;
}
sink.error(DaprException.propagate(e));
}
@@ -583,29 +553,22 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
@Override
public void onError(Throwable throwable) {
- running.set(false);
sink.error(DaprException.propagate(throwable));
}
@Override
public void onCompleted() {
- running.set(false);
sink.complete();
}
});
- streamRef.set(stream);
- acker.start();
-
// Send initial request to start receiving events
- stream.onNext(request);
+ streamHolder[0].onNext(request);
// Cleanup when Flux is cancelled or completed
sink.onDispose(() -> {
- running.set(false);
- acker.interrupt();
try {
- stream.onCompleted();
+ streamHolder[0].onCompleted();
} catch (Exception e) {
// Ignore cleanup errors
}
From b363e49a76f510b627daa02ff2c8b9aa8b0491a4 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 12:27:24 -0800
Subject: [PATCH 03/12] Simplify Javadoc
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprPreviewClient.java | 14 --------------
1 file changed, 14 deletions(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
index 8c66979df9..ba97931117 100644
--- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
+++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
@@ -282,20 +282,6 @@ Subscription subscribeToEvents(
/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
- *
- * This method returns a reactive stream of events that can be processed using standard Reactor operators.
- * Events are automatically acknowledged as SUCCESS when processed successfully, or RETRY when an error occurs.
- * Users can control retry behavior using standard Flux operators like {@code retry()} and {@code onErrorResume()}.
- *
- *
- * Example usage:
- *
{@code
- * client.subscribeToEvents("pubsub", "orders", TypeRef.get(Order.class))
- * .doOnNext(order -> processOrder(order))
- * .retry(3) // Retry up to 3 times on errors
- * .subscribe();
- * }
- *
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param type Type for object deserialization.
From 305baee24d071a19055083b232f77e8b2f58c238 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 16:47:28 -0800
Subject: [PATCH 04/12] Fix unit tests and simplify implementation
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 38 ++++++++++++-------
.../client/DaprPreviewClientGrpcTest.java | 8 ++--
2 files changed, 28 insertions(+), 18 deletions(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index b2a951ce2e..25185306ba 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -134,6 +134,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -494,14 +495,16 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
return Flux.create(sink -> {
var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
- // Use array wrapper to allow assignment within anonymous class (Java's effectively final requirement)
- // This is simpler than AtomicReference since we don't need atomicity - just mutability
- @SuppressWarnings("unchecked")
- StreamObserver[] streamHolder = new StreamObserver[1];
+ // We need AtomicReference because we're accessing the stream reference from within the anonymous
+ // StreamObserver implementation (to send acks). Java requires variables used in lambdas/anonymous
+ // classes to be effectively final, so we can't use a plain variable. AtomicReference provides
+ // the mutable container we need while keeping the reference itself final.
+ AtomicReference> streamRef =
+ new AtomicReference<>();
// Create the gRPC bidirectional streaming observer
// Note: StreamObserver.onNext() is thread-safe, so we can send acks directly
- streamHolder[0] = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
+ streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
@Override
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
try {
@@ -509,18 +512,22 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
return;
}
- var message = response.getEventMessage();
- if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) {
+ DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
+ String pubsubName = message.getPubsubName();
+
+ if (pubsubName == null || pubsubName.isEmpty()) {
return;
}
var id = message.getId();
- if ((id == null) || id.isEmpty()) {
+
+ if (id == null || id.isEmpty()) {
return;
}
// Deserialize the event data
T data = null;
+
if (type != null) {
data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
}
@@ -530,23 +537,26 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
sink.next(data);
}
- // Send SUCCESS acknowledgment directly (no blocking queue or thread needed)
+ // Send SUCCESS acknowledgment directly
var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS);
- streamHolder[0].onNext(ack);
+ streamRef.get().onNext(ack);
} catch (Exception e) {
// On error during processing, send RETRY acknowledgment
try {
var id = response.getEventMessage().getId();
+
if (id != null && !id.isEmpty()) {
var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY);
- streamHolder[0].onNext(ack);
+
+ streamRef.get().onNext(ack);
}
} catch (Exception ex) {
// If we can't send ack, propagate the error
sink.error(DaprException.propagate(ex));
return;
}
+
sink.error(DaprException.propagate(e));
}
}
@@ -560,15 +570,15 @@ public void onError(Throwable throwable) {
public void onCompleted() {
sink.complete();
}
- });
+ }));
// Send initial request to start receiving events
- streamHolder[0].onNext(request);
+ streamRef.get().onNext(request);
// Cleanup when Flux is cancelled or completed
sink.onDispose(() -> {
try {
- streamHolder[0].onCompleted();
+ streamRef.get().onCompleted();
} catch (Exception e) {
// Ignore cleanup errors
}
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
index e0db8174e2..35bc300590 100644
--- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
@@ -636,14 +636,14 @@ public void onCompleted() {
};
}).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
- final Set receivedEvents = Collections.synchronizedSet(new HashSet<>());
+ final AtomicInteger eventCount = new AtomicInteger(0);
final Semaphore gotAll = new Semaphore(0);
var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING)
.doOnNext(eventData -> {
assertEquals(data, eventData);
- receivedEvents.add(eventData);
- if (receivedEvents.size() >= numEvents) {
+ int count = eventCount.incrementAndGet();
+ if (count >= numEvents) {
gotAll.release();
}
})
@@ -652,7 +652,7 @@ public void onCompleted() {
gotAll.acquire();
disposable.dispose();
- assertEquals(numEvents, receivedEvents.size());
+ assertEquals(numEvents, eventCount.get());
}
@Test
From 47ed16c9ce3e804fc5d7dea56d8127c660e5e07c Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 17:07:57 -0800
Subject: [PATCH 05/12] Adding event subscriber stream observer to simplify
subscription logic
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 78 +------
.../EventSubscriberStreamObserver.java | 191 ++++++++++++++++++
2 files changed, 201 insertions(+), 68 deletions(-)
create mode 100644 sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index 25185306ba..71c4bfc939 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -91,6 +91,7 @@
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
+import io.dapr.internal.subscription.EventSubscriberStreamObserver;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
@@ -495,8 +496,8 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
return Flux.create(sink -> {
var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
- // We need AtomicReference because we're accessing the stream reference from within the anonymous
- // StreamObserver implementation (to send acks). Java requires variables used in lambdas/anonymous
+ // We need AtomicReference because we're accessing the stream reference from within the
+ // EventSubscriberStreamObserver (to send acks). Java requires variables used in lambdas/anonymous
// classes to be effectively final, so we can't use a plain variable. AtomicReference provides
// the mutable container we need while keeping the reference itself final.
AtomicReference> streamRef =
@@ -504,73 +505,14 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
// Create the gRPC bidirectional streaming observer
// Note: StreamObserver.onNext() is thread-safe, so we can send acks directly
- streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
- @Override
- public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
- try {
- if (response.getEventMessage() == null) {
- return;
- }
-
- DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
- String pubsubName = message.getPubsubName();
-
- if (pubsubName == null || pubsubName.isEmpty()) {
- return;
- }
-
- var id = message.getId();
-
- if (id == null || id.isEmpty()) {
- return;
- }
-
- // Deserialize the event data
- T data = null;
-
- if (type != null) {
- data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
- }
-
- // Emit the data to the Flux (only if not null)
- if (data != null) {
- sink.next(data);
- }
-
- // Send SUCCESS acknowledgment directly
- var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS);
-
- streamRef.get().onNext(ack);
- } catch (Exception e) {
- // On error during processing, send RETRY acknowledgment
- try {
- var id = response.getEventMessage().getId();
-
- if (id != null && !id.isEmpty()) {
- var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY);
-
- streamRef.get().onNext(ack);
- }
- } catch (Exception ex) {
- // If we can't send ack, propagate the error
- sink.error(DaprException.propagate(ex));
- return;
- }
-
- sink.error(DaprException.propagate(e));
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- sink.error(DaprException.propagate(throwable));
- }
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ sink,
+ type,
+ this.objectSerializer,
+ streamRef
+ );
- @Override
- public void onCompleted() {
- sink.complete();
- }
- }));
+ streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(observer));
// Send initial request to start receiving events
streamRef.get().onNext(request);
diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
new file mode 100644
index 0000000000..b6c7d47558
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2024 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.internal.subscription;
+
+import io.dapr.exceptions.DaprException;
+import io.dapr.serializer.DaprObjectSerializer;
+import io.dapr.utils.TypeRef;
+import io.dapr.v1.DaprAppCallbackProtos;
+import io.dapr.v1.DaprProtos;
+import io.grpc.stub.StreamObserver;
+import reactor.core.publisher.FluxSink;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * StreamObserver implementation for subscribing to Dapr pub/sub events.
+ *
+ * This class handles the bidirectional gRPC streaming for event subscriptions, including:
+ *
+ * - Receiving events from Dapr
+ * - Deserializing event payloads
+ * - Emitting deserialized data to a Reactor Flux
+ * - Sending acknowledgments (SUCCESS/RETRY) back to Dapr
+ *
+ *
+ *
+ * @param The type of the event payload
+ */
+public class EventSubscriberStreamObserver implements StreamObserver {
+
+ private final FluxSink sink;
+ private final TypeRef type;
+ private final DaprObjectSerializer objectSerializer;
+ private final AtomicReference> requestStreamRef;
+
+ /**
+ * Creates a new EventSubscriberStreamObserver.
+ *
+ * @param sink The FluxSink to emit deserialized events to
+ * @param type The TypeRef for deserializing event payloads
+ * @param objectSerializer The serializer to use for deserialization
+ * @param requestStreamRef Reference to the request stream for sending acknowledgments
+ */
+ public EventSubscriberStreamObserver(
+ FluxSink sink,
+ TypeRef type,
+ DaprObjectSerializer objectSerializer,
+ AtomicReference> requestStreamRef) {
+ this.sink = sink;
+ this.type = type;
+ this.objectSerializer = objectSerializer;
+ this.requestStreamRef = requestStreamRef;
+ }
+
+ @Override
+ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
+ try {
+ if (response.getEventMessage() == null) {
+ return;
+ }
+
+ DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
+ String pubsubName = message.getPubsubName();
+
+ if (pubsubName == null || pubsubName.isEmpty()) {
+ return;
+ }
+
+ String id = message.getId();
+
+ if (id == null || id.isEmpty()) {
+ return;
+ }
+
+ // Deserialize the event data
+ T data = null;
+
+ if (type != null) {
+ data = objectSerializer.deserialize(message.getData().toByteArray(), type);
+ }
+
+ // Emit the data to the Flux (only if not null)
+ if (data != null) {
+ sink.next(data);
+ }
+
+ // Send SUCCESS acknowledgment directly
+ var ack = buildSuccessAck(id);
+
+ requestStreamRef.get().onNext(ack);
+ } catch (Exception e) {
+ // On error during processing, send RETRY acknowledgment
+ try {
+ var id = response.getEventMessage().getId();
+
+ if (id != null && !id.isEmpty()) {
+ var ack = buildRetryAck(id);
+
+ requestStreamRef.get().onNext(ack);
+ }
+ } catch (Exception ex) {
+ // If we can't send ack, propagate the error
+ sink.error(DaprException.propagate(ex));
+ return;
+ }
+
+ sink.error(DaprException.propagate(e));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ sink.error(DaprException.propagate(throwable));
+ }
+
+ @Override
+ public void onCompleted() {
+ sink.complete();
+ }
+
+ /**
+ * Builds a SUCCESS acknowledgment request.
+ *
+ * @param eventId The ID of the event to acknowledge
+ * @return The acknowledgment request
+ */
+ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildSuccessAck(String eventId) {
+ return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS);
+ }
+
+ /**
+ * Builds a RETRY acknowledgment request.
+ *
+ * @param eventId The ID of the event to acknowledge
+ * @return The acknowledgment request
+ */
+ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildRetryAck(String eventId) {
+ return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY);
+ }
+
+ /**
+ * Builds a DROP acknowledgment request.
+ *
+ * @param eventId The ID of the event to acknowledge
+ * @return The acknowledgment request
+ */
+ @SuppressWarnings("unused") // May be used in the future
+ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String eventId) {
+ return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP);
+ }
+
+ /**
+ * Builds an acknowledgment request with the specified status.
+ *
+ * This method directly uses the protobuf enum instead of depending on
+ * {@code SubscriptionListener.Status} to keep this class independent
+ * of the older callback-based API.
+ *
+ *
+ * @param eventId The ID of the event to acknowledge
+ * @param status The acknowledgment status (SUCCESS, RETRY, or DROP)
+ * @return The acknowledgment request
+ */
+ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
+ String eventId,
+ DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus status) {
+ DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =
+ DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder()
+ .setId(eventId)
+ .setStatus(
+ DaprAppCallbackProtos.TopicEventResponse.newBuilder()
+ .setStatus(status)
+ .build())
+ .build();
+
+ return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
+ .setEventProcessed(eventProcessed)
+ .build();
+ }
+}
From ae0b5330ea4a305d4c97c72407ab6a4c5e1fb311 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 17:49:39 -0800
Subject: [PATCH 06/12] Use start() method to start stream subscription
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 27 +++-------
.../EventSubscriberStreamObserver.java | 53 ++++++++++---------
2 files changed, 35 insertions(+), 45 deletions(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index 71c4bfc939..96e990dd25 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -135,7 +135,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -494,33 +493,19 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
.build();
return Flux.create(sink -> {
- var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
-
- // We need AtomicReference because we're accessing the stream reference from within the
- // EventSubscriberStreamObserver (to send acks). Java requires variables used in lambdas/anonymous
- // classes to be effectively final, so we can't use a plain variable. AtomicReference provides
- // the mutable container we need while keeping the reference itself final.
- AtomicReference> streamRef =
- new AtomicReference<>();
-
- // Create the gRPC bidirectional streaming observer
- // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly
- EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
+ EventSubscriberStreamObserver eventSubscriber = new EventSubscriberStreamObserver<>(
+ interceptedStub,
sink,
type,
- this.objectSerializer,
- streamRef
+ this.objectSerializer
);
-
- streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(observer));
-
- // Send initial request to start receiving events
- streamRef.get().onNext(request);
+ StreamObserver requestStream = eventSubscriber.start(request);
// Cleanup when Flux is cancelled or completed
sink.onDispose(() -> {
try {
- streamRef.get().onCompleted();
+ requestStream.onCompleted();
} catch (Exception e) {
// Ignore cleanup errors
}
diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
index b6c7d47558..fb3eebda88 100644
--- a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
+++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
@@ -17,50 +17,60 @@
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
import io.dapr.v1.DaprAppCallbackProtos;
+import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.FluxSink;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* StreamObserver implementation for subscribing to Dapr pub/sub events.
- *
- * This class handles the bidirectional gRPC streaming for event subscriptions, including:
- *
- * - Receiving events from Dapr
- * - Deserializing event payloads
- * - Emitting deserialized data to a Reactor Flux
- * - Sending acknowledgments (SUCCESS/RETRY) back to Dapr
- *
- *
+ * Thread Safety: This class relies on gRPC's StreamObserver contract, which guarantees that
+ * onNext(), onError(), and onCompleted() are never called concurrently and always from the
+ * same thread. Therefore, no additional synchronization is needed.
*
* @param The type of the event payload
*/
public class EventSubscriberStreamObserver implements StreamObserver {
+ private final DaprGrpc.DaprStub stub;
private final FluxSink sink;
private final TypeRef type;
private final DaprObjectSerializer objectSerializer;
- private final AtomicReference> requestStreamRef;
+
+ private StreamObserver requestStream;
/**
* Creates a new EventSubscriberStreamObserver.
*
+ * @param stub The gRPC stub for making Dapr service calls
* @param sink The FluxSink to emit deserialized events to
* @param type The TypeRef for deserializing event payloads
* @param objectSerializer The serializer to use for deserialization
- * @param requestStreamRef Reference to the request stream for sending acknowledgments
*/
public EventSubscriberStreamObserver(
+ DaprGrpc.DaprStub stub,
FluxSink sink,
TypeRef type,
- DaprObjectSerializer objectSerializer,
- AtomicReference> requestStreamRef) {
+ DaprObjectSerializer objectSerializer) {
+ this.stub = stub;
this.sink = sink;
this.type = type;
this.objectSerializer = objectSerializer;
- this.requestStreamRef = requestStreamRef;
+ }
+
+ /** Starts the subscription by sending the initial request.
+ *
+ * @param request The subscription request
+ * @return The StreamObserver to send further requests (acknowledgments)
+ */
+ public StreamObserver start(
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 request
+ ) {
+ requestStream = stub.subscribeTopicEventsAlpha1(this);
+
+ requestStream.onNext(request);
+
+ return requestStream;
}
@Override
@@ -98,7 +108,7 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
// Send SUCCESS acknowledgment directly
var ack = buildSuccessAck(id);
- requestStreamRef.get().onNext(ack);
+ requestStream.onNext(ack);
} catch (Exception e) {
// On error during processing, send RETRY acknowledgment
try {
@@ -107,7 +117,7 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
if (id != null && !id.isEmpty()) {
var ack = buildRetryAck(id);
- requestStreamRef.get().onNext(ack);
+ requestStream.onNext(ack);
}
} catch (Exception ex) {
// If we can't send ack, propagate the error
@@ -162,12 +172,7 @@ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String
/**
* Builds an acknowledgment request with the specified status.
- *
- * This method directly uses the protobuf enum instead of depending on
- * {@code SubscriptionListener.Status} to keep this class independent
- * of the older callback-based API.
- *
- *
+ *
* @param eventId The ID of the event to acknowledge
* @param status The acknowledgment status (SUCCESS, RETRY, or DROP)
* @return The acknowledgment request
From f2f7603e0fe55660338a4b7c95462289f7dbb962 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 18:01:16 -0800
Subject: [PATCH 07/12] Add unit test for event suscriber observer
Signed-off-by: Artur Ciocanu
---
.../EventSubscriberStreamObserverTest.java | 458 ++++++++++++++++++
1 file changed, 458 insertions(+)
create mode 100644 sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
new file mode 100644
index 0000000000..53ee5c0f28
--- /dev/null
+++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
@@ -0,0 +1,458 @@
+/*
+ * Copyright 2024 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package io.dapr.internal.subscription;
+
+import com.google.protobuf.ByteString;
+import io.dapr.exceptions.DaprException;
+import io.dapr.serializer.DaprObjectSerializer;
+import io.dapr.serializer.DefaultObjectSerializer;
+import io.dapr.utils.TypeRef;
+import io.dapr.v1.DaprAppCallbackProtos;
+import io.dapr.v1.DaprGrpc;
+import io.dapr.v1.DaprProtos;
+import io.grpc.stub.StreamObserver;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for EventSubscriberStreamObserver.
+ */
+class EventSubscriberStreamObserverTest {
+
+ private DaprGrpc.DaprStub mockStub;
+ private DaprObjectSerializer objectSerializer;
+ private StreamObserver mockRequestStream;
+
+ @BeforeEach
+ @SuppressWarnings("unchecked")
+ void setUp() {
+ mockStub = mock(DaprGrpc.DaprStub.class);
+ objectSerializer = new DefaultObjectSerializer();
+ mockRequestStream = mock(StreamObserver.class);
+
+ // Setup stub to return mock request stream
+ when(mockStub.subscribeTopicEventsAlpha1(any())).thenReturn(mockRequestStream);
+ }
+
+ @Test
+ void testSuccessfulEventProcessing() {
+ // Create a Flux and capture the sink
+ List emittedEvents = new ArrayList<>();
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ // Start the subscription
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Simulate receiving an event
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse("event-1", "pubsub", "topic", "Hello World");
+ observer.onNext(response);
+
+ // Complete the stream
+ observer.onCompleted();
+ });
+
+ // Verify the flux emits the correct data
+ StepVerifier.create(flux)
+ .expectNext("Hello World")
+ .verifyComplete();
+
+ // Verify the initial request was sent
+ ArgumentCaptor requestCaptor =
+ ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class);
+ verify(mockRequestStream, times(2)).onNext(requestCaptor.capture());
+
+ List requests = requestCaptor.getAllValues();
+ assertEquals(2, requests.size());
+
+ // First request should be the initial request
+ assertTrue(requests.get(0).hasInitialRequest());
+
+ // Second request should be a SUCCESS ack
+ assertTrue(requests.get(1).hasEventProcessed());
+ assertEquals("event-1", requests.get(1).getEventProcessed().getId());
+ assertEquals(
+ DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS,
+ requests.get(1).getEventProcessed().getStatus().getStatus()
+ );
+ }
+
+ @Test
+ void testMultipleEvents() {
+ List emittedEvents = new ArrayList<>();
+
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send multiple events
+ observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Message 1"));
+ observer.onNext(buildEventResponse("event-2", "pubsub", "topic", "Message 2"));
+ observer.onNext(buildEventResponse("event-3", "pubsub", "topic", "Message 3"));
+
+ observer.onCompleted();
+ });
+
+ StepVerifier.create(flux)
+ .expectNext("Message 1")
+ .expectNext("Message 2")
+ .expectNext("Message 3")
+ .verifyComplete();
+
+ // Verify 4 requests: 1 initial + 3 acks
+ verify(mockRequestStream, times(4)).onNext(any());
+ }
+
+ @Test
+ void testDeserializationError() {
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send an event with invalid data (can't deserialize to String)
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .setEventMessage(
+ DaprAppCallbackProtos.TopicEventRequest.newBuilder()
+ .setId("event-1")
+ .setPubsubName("pubsub")
+ .setTopic("topic")
+ .setData(ByteString.copyFrom(new byte[]{(byte) 0xFF, (byte) 0xFE})) // Invalid UTF-8
+ .build()
+ )
+ .build();
+
+ observer.onNext(response);
+ });
+
+ // Verify error is propagated
+ StepVerifier.create(flux)
+ .expectError(DaprException.class)
+ .verify();
+
+ // Verify RETRY ack was sent
+ ArgumentCaptor requestCaptor =
+ ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class);
+ verify(mockRequestStream, atLeast(2)).onNext(requestCaptor.capture());
+
+ // Find the ack request (not the initial request)
+ List ackRequests = requestCaptor.getAllValues().stream()
+ .filter(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed)
+ .collect(Collectors.toList());
+
+ assertEquals(1, ackRequests.size());
+ assertEquals("event-1", ackRequests.get(0).getEventProcessed().getId());
+ assertEquals(
+ DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY,
+ ackRequests.get(0).getEventProcessed().getStatus().getStatus()
+ );
+ }
+
+ @Test
+ void testGrpcError() {
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Simulate gRPC error
+ observer.onError(new RuntimeException("gRPC connection failed"));
+ });
+
+ StepVerifier.create(flux)
+ .expectError(DaprException.class)
+ .verify();
+ }
+
+ @Test
+ void testNullEventMessage() {
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send response with null event message
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .build();
+
+ observer.onNext(response);
+ observer.onCompleted();
+ });
+
+ // Should complete without emitting any events
+ StepVerifier.create(flux)
+ .verifyComplete();
+
+ // Verify only the initial request was sent (no ack for null event)
+ verify(mockRequestStream, times(1)).onNext(any());
+ }
+
+ @Test
+ void testEmptyPubsubName() {
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send event with empty pubsub name
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .setEventMessage(
+ DaprAppCallbackProtos.TopicEventRequest.newBuilder()
+ .setId("event-1")
+ .setPubsubName("")
+ .setTopic("topic")
+ .setData(ByteString.copyFromUtf8("\"Hello\""))
+ .build()
+ )
+ .build();
+
+ observer.onNext(response);
+ observer.onCompleted();
+ });
+
+ // Should complete without emitting any events
+ StepVerifier.create(flux)
+ .verifyComplete();
+
+ // Verify only the initial request was sent
+ verify(mockRequestStream, times(1)).onNext(any());
+ }
+
+ @Test
+ void testEmptyEventId() {
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send event with empty ID
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .setEventMessage(
+ DaprAppCallbackProtos.TopicEventRequest.newBuilder()
+ .setId("")
+ .setPubsubName("pubsub")
+ .setTopic("topic")
+ .setData(ByteString.copyFromUtf8("\"Hello\""))
+ .build()
+ )
+ .build();
+
+ observer.onNext(response);
+ observer.onCompleted();
+ });
+
+ // Should complete without emitting any events
+ StepVerifier.create(flux)
+ .verifyComplete();
+
+ // Verify only the initial request was sent
+ verify(mockRequestStream, times(1)).onNext(any());
+ }
+
+ @Test
+ void testNullData() {
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ null, // null type
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send event with valid structure but null type
+ observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello"));
+
+ observer.onCompleted();
+ });
+
+ // Should complete without emitting any events (data is null when type is null)
+ StepVerifier.create(flux)
+ .verifyComplete();
+
+ // Verify initial request + ack were sent
+ verify(mockRequestStream, times(2)).onNext(any());
+ }
+
+ @Test
+ void testComplexObjectSerialization() throws IOException {
+ // Test with a custom object
+ TestEvent testEvent = new TestEvent("test-name", 42);
+ byte[] serializedEvent = objectSerializer.serialize(testEvent);
+
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.get(TestEvent.class),
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .setEventMessage(
+ DaprAppCallbackProtos.TopicEventRequest.newBuilder()
+ .setId("event-1")
+ .setPubsubName("pubsub")
+ .setTopic("topic")
+ .setData(ByteString.copyFrom(serializedEvent))
+ .build()
+ )
+ .build();
+
+ observer.onNext(response);
+ observer.onCompleted();
+ });
+
+ StepVerifier.create(flux)
+ .expectNextMatches(event -> event.name.equals("test-name") && event.value == 42)
+ .verifyComplete();
+ }
+
+ @Test
+ void testErrorDuringSendingAck() {
+ // Mock request stream to throw exception when sending ack
+ doThrow(new RuntimeException("Failed to send ack"))
+ .when(mockRequestStream).onNext(argThat(req -> req.hasEventProcessed()));
+
+ Flux flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ observer.start(initialRequest);
+
+ // Send an event - this should trigger an error when trying to send ack
+ observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello"));
+ });
+
+ // The event is emitted successfully, then the error occurs when sending ack
+ StepVerifier.create(flux)
+ .expectNext("Hello") // Event is emitted before ack
+ .expectError(DaprException.class) // Then error when sending ack
+ .verify();
+ }
+
+ // Helper methods
+
+ private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest(String pubsubName, String topic) {
+ return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
+ .setInitialRequest(
+ DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
+ .setPubsubName(pubsubName)
+ .setTopic(topic)
+ .build()
+ )
+ .build();
+ }
+
+ private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse(
+ String eventId,
+ String pubsubName,
+ String topic,
+ String data) {
+ try {
+ byte[] serializedData = objectSerializer.serialize(data);
+ return DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ .setEventMessage(
+ DaprAppCallbackProtos.TopicEventRequest.newBuilder()
+ .setId(eventId)
+ .setPubsubName(pubsubName)
+ .setTopic(topic)
+ .setData(ByteString.copyFrom(serializedData))
+ .build()
+ )
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Test class for complex object serialization
+ public static class TestEvent {
+ public String name;
+ public int value;
+
+ public TestEvent() {
+ }
+
+ public TestEvent(String name, int value) {
+ this.name = name;
+ this.value = value;
+ }
+ }
+}
From d778c7d9412cf496eadfacf1f8ead4409ce1df6b Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Fri, 28 Nov 2025 18:18:58 -0800
Subject: [PATCH 08/12] Improve the tests a little bit
Signed-off-by: Artur Ciocanu
---
.../EventSubscriberStreamObserverTest.java | 132 ++++++++----------
1 file changed, 60 insertions(+), 72 deletions(-)
diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
index 53ee5c0f28..63981df2ea 100644
--- a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
+++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
@@ -23,13 +23,13 @@
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -42,6 +42,8 @@
*/
class EventSubscriberStreamObserverTest {
+ public static final String PUBSUB_NAME = "pubsub";
+ public static final String TOPIC_NAME = "topic";
private DaprGrpc.DaprStub mockStub;
private DaprObjectSerializer objectSerializer;
private StreamObserver mockRequestStream;
@@ -53,14 +55,12 @@ void setUp() {
objectSerializer = new DefaultObjectSerializer();
mockRequestStream = mock(StreamObserver.class);
- // Setup stub to return mock request stream
when(mockStub.subscribeTopicEventsAlpha1(any())).thenReturn(mockRequestStream);
}
@Test
+ @DisplayName("Should successfully process events and send SUCCESS acks")
void testSuccessfulEventProcessing() {
- // Create a Flux and capture the sink
- List emittedEvents = new ArrayList<>();
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
@@ -70,34 +70,34 @@ void testSuccessfulEventProcessing() {
);
// Start the subscription
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
// Simulate receiving an event
- DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse("event-1", "pubsub", "topic", "Hello World");
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse(
+ "event-1",
+ "Hello World"
+ );
observer.onNext(response);
// Complete the stream
observer.onCompleted();
});
- // Verify the flux emits the correct data
StepVerifier.create(flux)
.expectNext("Hello World")
.verifyComplete();
- // Verify the initial request was sent
ArgumentCaptor requestCaptor =
ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class);
+
verify(mockRequestStream, times(2)).onNext(requestCaptor.capture());
List requests = requestCaptor.getAllValues();
- assertEquals(2, requests.size());
- // First request should be the initial request
+ assertEquals(2, requests.size());
assertTrue(requests.get(0).hasInitialRequest());
-
- // Second request should be a SUCCESS ack
assertTrue(requests.get(1).hasEventProcessed());
assertEquals("event-1", requests.get(1).getEventProcessed().getId());
assertEquals(
@@ -107,9 +107,8 @@ void testSuccessfulEventProcessing() {
}
@Test
+ @DisplayName("Should handle multiple consecutive events correctly")
void testMultipleEvents() {
- List emittedEvents = new ArrayList<>();
-
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
@@ -118,13 +117,13 @@ void testMultipleEvents() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
- // Send multiple events
- observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Message 1"));
- observer.onNext(buildEventResponse("event-2", "pubsub", "topic", "Message 2"));
- observer.onNext(buildEventResponse("event-3", "pubsub", "topic", "Message 3"));
+ observer.onNext(buildEventResponse("event-1", "Message 1"));
+ observer.onNext(buildEventResponse("event-2", "Message 2"));
+ observer.onNext(buildEventResponse("event-3", "Message 3"));
observer.onCompleted();
});
@@ -135,11 +134,11 @@ void testMultipleEvents() {
.expectNext("Message 3")
.verifyComplete();
- // Verify 4 requests: 1 initial + 3 acks
verify(mockRequestStream, times(4)).onNext(any());
}
@Test
+ @DisplayName("Should send RETRY ack when deserialization fails")
void testDeserializationError() {
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -149,7 +148,8 @@ void testDeserializationError() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
// Send an event with invalid data (can't deserialize to String)
@@ -157,8 +157,8 @@ void testDeserializationError() {
.setEventMessage(
DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId("event-1")
- .setPubsubName("pubsub")
- .setTopic("topic")
+ .setPubsubName(PUBSUB_NAME)
+ .setTopic(TOPIC_NAME)
.setData(ByteString.copyFrom(new byte[]{(byte) 0xFF, (byte) 0xFE})) // Invalid UTF-8
.build()
)
@@ -167,17 +167,15 @@ void testDeserializationError() {
observer.onNext(response);
});
- // Verify error is propagated
StepVerifier.create(flux)
.expectError(DaprException.class)
.verify();
- // Verify RETRY ack was sent
ArgumentCaptor requestCaptor =
ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class);
+
verify(mockRequestStream, atLeast(2)).onNext(requestCaptor.capture());
- // Find the ack request (not the initial request)
List ackRequests = requestCaptor.getAllValues().stream()
.filter(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed)
.collect(Collectors.toList());
@@ -191,6 +189,7 @@ void testDeserializationError() {
}
@Test
+ @DisplayName("Should propagate gRPC errors as DaprException")
void testGrpcError() {
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -200,7 +199,7 @@ void testGrpcError() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest();
observer.start(initialRequest);
// Simulate gRPC error
@@ -213,6 +212,7 @@ void testGrpcError() {
}
@Test
+ @DisplayName("Should handle null event messages gracefully without emitting events")
void testNullEventMessage() {
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -222,10 +222,10 @@ void testNullEventMessage() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
- // Send response with null event message
DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.build();
@@ -233,15 +233,14 @@ void testNullEventMessage() {
observer.onCompleted();
});
- // Should complete without emitting any events
StepVerifier.create(flux)
.verifyComplete();
- // Verify only the initial request was sent (no ack for null event)
verify(mockRequestStream, times(1)).onNext(any());
}
@Test
+ @DisplayName("Should skip events with empty pubsub name")
void testEmptyPubsubName() {
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -251,16 +250,16 @@ void testEmptyPubsubName() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
- // Send event with empty pubsub name
DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(
DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId("event-1")
.setPubsubName("")
- .setTopic("topic")
+ .setTopic(TOPIC_NAME)
.setData(ByteString.copyFromUtf8("\"Hello\""))
.build()
)
@@ -270,15 +269,14 @@ void testEmptyPubsubName() {
observer.onCompleted();
});
- // Should complete without emitting any events
StepVerifier.create(flux)
.verifyComplete();
- // Verify only the initial request was sent
verify(mockRequestStream, times(1)).onNext(any());
}
@Test
+ @DisplayName("Should skip events with empty event ID")
void testEmptyEventId() {
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -288,16 +286,16 @@ void testEmptyEventId() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
- // Send event with empty ID
DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(
DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId("")
- .setPubsubName("pubsub")
- .setTopic("topic")
+ .setPubsubName(PUBSUB_NAME)
+ .setTopic(TOPIC_NAME)
.setData(ByteString.copyFromUtf8("\"Hello\""))
.build()
)
@@ -307,15 +305,14 @@ void testEmptyEventId() {
observer.onCompleted();
});
- // Should complete without emitting any events
StepVerifier.create(flux)
.verifyComplete();
- // Verify only the initial request was sent
verify(mockRequestStream, times(1)).onNext(any());
}
@Test
+ @DisplayName("Should handle null type parameter and skip event emission")
void testNullData() {
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -325,26 +322,23 @@ void testNullData() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
- observer.start(initialRequest);
-
- // Send event with valid structure but null type
- observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello"));
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
+ observer.start(initialRequest);
+ observer.onNext(buildEventResponse("event-1", "Hello"));
observer.onCompleted();
});
- // Should complete without emitting any events (data is null when type is null)
StepVerifier.create(flux)
.verifyComplete();
- // Verify initial request + ack were sent
verify(mockRequestStream, times(2)).onNext(any());
}
@Test
+ @DisplayName("Should deserialize and emit complex objects correctly")
void testComplexObjectSerialization() throws IOException {
- // Test with a custom object
TestEvent testEvent = new TestEvent("test-name", 42);
byte[] serializedEvent = objectSerializer.serialize(testEvent);
@@ -356,15 +350,16 @@ void testComplexObjectSerialization() throws IOException {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(
+ );
observer.start(initialRequest);
DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(
DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId("event-1")
- .setPubsubName("pubsub")
- .setTopic("topic")
+ .setPubsubName(PUBSUB_NAME)
+ .setTopic(TOPIC_NAME)
.setData(ByteString.copyFrom(serializedEvent))
.build()
)
@@ -380,10 +375,11 @@ void testComplexObjectSerialization() throws IOException {
}
@Test
+ @DisplayName("Should propagate errors when ack sending fails")
void testErrorDuringSendingAck() {
- // Mock request stream to throw exception when sending ack
doThrow(new RuntimeException("Failed to send ack"))
- .when(mockRequestStream).onNext(argThat(req -> req.hasEventProcessed()));
+ .when(mockRequestStream)
+ .onNext(argThat(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed));
Flux flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
@@ -393,46 +389,39 @@ void testErrorDuringSendingAck() {
objectSerializer
);
- DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic");
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest();
observer.start(initialRequest);
- // Send an event - this should trigger an error when trying to send ack
- observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello"));
+ observer.onNext(buildEventResponse("event-1", "Hello"));
});
- // The event is emitted successfully, then the error occurs when sending ack
StepVerifier.create(flux)
.expectNext("Hello") // Event is emitted before ack
.expectError(DaprException.class) // Then error when sending ack
.verify();
}
- // Helper methods
-
- private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest(String pubsubName, String topic) {
+ private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest() {
return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
- .setPubsubName(pubsubName)
- .setTopic(topic)
+ .setPubsubName(PUBSUB_NAME)
+ .setTopic(TOPIC_NAME)
.build()
)
.build();
}
- private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse(
- String eventId,
- String pubsubName,
- String topic,
- String data) {
+ private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse(String eventId, String data) {
+
try {
byte[] serializedData = objectSerializer.serialize(data);
return DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(
DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId(eventId)
- .setPubsubName(pubsubName)
- .setTopic(topic)
+ .setPubsubName(PUBSUB_NAME)
+ .setTopic(TOPIC_NAME)
.setData(ByteString.copyFrom(serializedData))
.build()
)
@@ -442,7 +431,6 @@ private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse(
}
}
- // Test class for complex object serialization
public static class TestEvent {
public String name;
public int value;
From 23c822d75a1dcff606da560360d77feaf02178d6 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Sun, 30 Nov 2025 11:32:54 -0800
Subject: [PATCH 09/12] Remove the unnecessary method
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 18 ------------------
1 file changed, 18 deletions(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index 96e990dd25..d0e5c01b2d 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -97,7 +97,6 @@
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
-import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.dapr.v1.DaprProtos.ActiveActorsCount;
@@ -551,23 +550,6 @@ private Subscription buildSubscription(
return subscription;
}
- @Nonnull
- private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
- String id, SubscriptionListener.Status status) {
- DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =
- DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder()
- .setId(id)
- .setStatus(
- DaprAppCallbackProtos.TopicEventResponse.newBuilder()
- .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf(
- status.name()))
- .build())
- .build();
- return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
- .setEventProcessed(eventProcessed)
- .build();
- }
-
@Override
public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) {
try {
From 018a8da9cbb9cd7c5bd73ed66db09b0842abb0c1 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Tue, 2 Dec 2025 23:35:21 -0800
Subject: [PATCH 10/12] Improve error handling and use CloudEvent wrapper
Signed-off-by: Artur Ciocanu
---
.../java/io/dapr/client/DaprClientImpl.java | 4 +-
.../io/dapr/client/DaprPreviewClient.java | 5 +-
.../EventSubscriberStreamObserver.java | 183 ++++++++++--------
.../client/DaprPreviewClientGrpcTest.java | 7 +-
.../EventSubscriberStreamObserverTest.java | 102 ++++++++--
5 files changed, 196 insertions(+), 105 deletions(-)
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
index d0e5c01b2d..0dfb1b644b 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java
@@ -480,7 +480,7 @@ public Subscription subscribeToEvents(
* {@inheritDoc}
*/
@Override
- public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type) {
+ public Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
@@ -506,7 +506,7 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef
try {
requestStream.onCompleted();
} catch (Exception e) {
- // Ignore cleanup errors
+ logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage());
}
});
}, FluxSink.OverflowStrategy.BUFFER);
diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
index ba97931117..545b8e5dc5 100644
--- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
+++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
@@ -17,6 +17,7 @@
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
+import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.ConversationRequest;
import io.dapr.client.domain.ConversationRequestAlpha2;
import io.dapr.client.domain.ConversationResponse;
@@ -285,10 +286,10 @@ Subscription subscribeToEvents(
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param type Type for object deserialization.
- * @return A Flux of deserialized event payloads.
+ * @return A Flux of CloudEvents containing deserialized event payloads and metadata.
* @param Type of the event payload.
*/
- Flux subscribeToEvents(String pubsubName, String topic, TypeRef type);
+ Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type);
/**
* Schedules a job using the provided job request details.
diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
index fb3eebda88..56131882b8 100644
--- a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
+++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2024 The Dapr Authors
+ * Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -13,6 +13,7 @@
package io.dapr.internal.subscription;
+import io.dapr.client.domain.CloudEvent;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
@@ -20,8 +21,12 @@
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;
+import java.io.IOException;
+
/**
* StreamObserver implementation for subscribing to Dapr pub/sub events.
* Thread Safety: This class relies on gRPC's StreamObserver contract, which guarantees that
@@ -32,8 +37,10 @@
*/
public class EventSubscriberStreamObserver implements StreamObserver {
+ private static final Logger logger = LoggerFactory.getLogger(EventSubscriberStreamObserver.class);
+
private final DaprGrpc.DaprStub stub;
- private final FluxSink sink;
+ private final FluxSink> sink;
private final TypeRef type;
private final DaprObjectSerializer objectSerializer;
@@ -43,13 +50,13 @@ public class EventSubscriberStreamObserver implements StreamObserver sink,
+ FluxSink> sink,
TypeRef type,
DaprObjectSerializer objectSerializer) {
this.stub = stub;
@@ -75,57 +82,23 @@ public StreamObserver start(
@Override
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
- try {
- if (response.getEventMessage() == null) {
- return;
- }
-
- DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
- String pubsubName = message.getPubsubName();
-
- if (pubsubName == null || pubsubName.isEmpty()) {
- return;
- }
-
- String id = message.getId();
-
- if (id == null || id.isEmpty()) {
- return;
- }
-
- // Deserialize the event data
- T data = null;
-
- if (type != null) {
- data = objectSerializer.deserialize(message.getData().toByteArray(), type);
- }
-
- // Emit the data to the Flux (only if not null)
- if (data != null) {
- sink.next(data);
- }
+ if (!isValidEventMessage(response)) {
+ return;
+ }
- // Send SUCCESS acknowledgment directly
- var ack = buildSuccessAck(id);
+ DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
+ String eventId = message.getId();
- requestStream.onNext(ack);
+ try {
+ T data = deserializeEventData(message);
+ CloudEvent cloudEvent = buildCloudEvent(message, data);
+ emitEventAndAcknowledge(cloudEvent, eventId);
+ } catch (IOException e) {
+ // Deserialization failure - send DROP ack
+ handleDeserializationError(eventId, e);
} catch (Exception e) {
- // On error during processing, send RETRY acknowledgment
- try {
- var id = response.getEventMessage().getId();
-
- if (id != null && !id.isEmpty()) {
- var ack = buildRetryAck(id);
-
- requestStream.onNext(ack);
- }
- } catch (Exception ex) {
- // If we can't send ack, propagate the error
- sink.error(DaprException.propagate(ex));
- return;
- }
-
- sink.error(DaprException.propagate(e));
+ // Processing failure - send RETRY ack
+ handleProcessingError(eventId, e);
}
}
@@ -139,44 +112,98 @@ public void onCompleted() {
sink.complete();
}
- /**
- * Builds a SUCCESS acknowledgment request.
- *
- * @param eventId The ID of the event to acknowledge
- * @return The acknowledgment request
- */
+ private boolean isValidEventMessage(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
+ if (response.getEventMessage() == null) {
+ logger.debug("Received response with null event message, skipping");
+ return false;
+ }
+
+ DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage();
+
+ if (message.getPubsubName() == null || message.getPubsubName().isEmpty()) {
+ logger.debug("Received event with empty pubsub name, skipping");
+ return false;
+ }
+
+ if (message.getId() == null || message.getId().isEmpty()) {
+ logger.debug("Received event with empty ID, skipping");
+ return false;
+ }
+
+ return true;
+ }
+
+ private T deserializeEventData(DaprAppCallbackProtos.TopicEventRequest message) throws IOException {
+ if (type == null) {
+ logger.debug("Type is null, skipping deserialization for event ID: {}", message.getId());
+ return null;
+ }
+
+ return objectSerializer.deserialize(message.getData().toByteArray(), type);
+ }
+
+ private CloudEvent buildCloudEvent(DaprAppCallbackProtos.TopicEventRequest message, T data) {
+ CloudEvent cloudEvent = new CloudEvent<>();
+
+ cloudEvent.setId(message.getId());
+ cloudEvent.setType(message.getType());
+ cloudEvent.setSpecversion(message.getSpecVersion());
+ cloudEvent.setDatacontenttype(message.getDataContentType());
+ cloudEvent.setTopic(message.getTopic());
+ cloudEvent.setPubsubName(message.getPubsubName());
+ cloudEvent.setData(data);
+
+ return cloudEvent;
+ }
+
+ private void emitEventAndAcknowledge(CloudEvent cloudEvent, String eventId) {
+ sink.next(cloudEvent);
+
+ // Send SUCCESS acknowledgment
+ requestStream.onNext(buildSuccessAck(eventId));
+ }
+
+ private void handleDeserializationError(String eventId, IOException cause) {
+ logger.error("Deserialization failed for event ID: {}, sending DROP ack", eventId, cause);
+
+ // Send DROP ack - cannot process malformed data
+ requestStream.onNext(buildDropAck(eventId));
+
+ // Propagate error to sink
+ sink.error(new DaprException("DESERIALIZATION_ERROR",
+ "Failed to deserialize event with ID: " + eventId, cause));
+ }
+
+ private void handleProcessingError(String eventId, Exception cause) {
+ logger.error("Processing error for event ID: {}, attempting to send RETRY ack", eventId, cause);
+
+ try {
+ // Try to send RETRY acknowledgment
+ requestStream.onNext(buildRetryAck(eventId));
+ } catch (Exception ackException) {
+ // Failed to send ack - this is critical
+ logger.error("Failed to send RETRY ack for event ID: {}", eventId, ackException);
+ sink.error(DaprException.propagate(ackException));
+
+ return;
+ }
+
+ // Propagate the original processing error
+ sink.error(DaprException.propagate(cause));
+ }
+
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildSuccessAck(String eventId) {
return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS);
}
- /**
- * Builds a RETRY acknowledgment request.
- *
- * @param eventId The ID of the event to acknowledge
- * @return The acknowledgment request
- */
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildRetryAck(String eventId) {
return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY);
}
- /**
- * Builds a DROP acknowledgment request.
- *
- * @param eventId The ID of the event to acknowledge
- * @return The acknowledgment request
- */
- @SuppressWarnings("unused") // May be used in the future
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String eventId) {
return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP);
}
- /**
- * Builds an acknowledgment request with the specified status.
- *
- * @param eventId The ID of the event to acknowledge
- * @param status The acknowledgment status (SUCCESS, RETRY, or DROP)
- * @return The acknowledgment request
- */
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
String eventId,
DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus status) {
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
index 35bc300590..4bb59bb4c0 100644
--- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
@@ -640,8 +640,11 @@ public void onCompleted() {
final Semaphore gotAll = new Semaphore(0);
var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING)
- .doOnNext(eventData -> {
- assertEquals(data, eventData);
+ .doOnNext(cloudEvent -> {
+ assertEquals(data, cloudEvent.getData());
+ assertEquals("pubsubname", cloudEvent.getPubsubName());
+ assertEquals("topic", cloudEvent.getTopic());
+ assertNotNull(cloudEvent.getId());
int count = eventCount.incrementAndGet();
if (count >= numEvents) {
gotAll.release();
diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
index 63981df2ea..7328f79e51 100644
--- a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
+++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2024 The Dapr Authors
+ * Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,6 +14,7 @@
package io.dapr.internal.subscription;
import com.google.protobuf.ByteString;
+import io.dapr.client.domain.CloudEvent;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
@@ -61,7 +62,7 @@ void setUp() {
@Test
@DisplayName("Should successfully process events and send SUCCESS acks")
void testSuccessfulEventProcessing() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -86,7 +87,12 @@ void testSuccessfulEventProcessing() {
});
StepVerifier.create(flux)
- .expectNext("Hello World")
+ .assertNext(cloudEvent -> {
+ assertEquals("Hello World", cloudEvent.getData());
+ assertEquals("event-1", cloudEvent.getId());
+ assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName());
+ assertEquals(TOPIC_NAME, cloudEvent.getTopic());
+ })
.verifyComplete();
ArgumentCaptor requestCaptor =
@@ -109,7 +115,7 @@ void testSuccessfulEventProcessing() {
@Test
@DisplayName("Should handle multiple consecutive events correctly")
void testMultipleEvents() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -129,18 +135,27 @@ void testMultipleEvents() {
});
StepVerifier.create(flux)
- .expectNext("Message 1")
- .expectNext("Message 2")
- .expectNext("Message 3")
+ .assertNext(cloudEvent -> {
+ assertEquals("Message 1", cloudEvent.getData());
+ assertEquals("event-1", cloudEvent.getId());
+ })
+ .assertNext(cloudEvent -> {
+ assertEquals("Message 2", cloudEvent.getData());
+ assertEquals("event-2", cloudEvent.getId());
+ })
+ .assertNext(cloudEvent -> {
+ assertEquals("Message 3", cloudEvent.getData());
+ assertEquals("event-3", cloudEvent.getId());
+ })
.verifyComplete();
verify(mockRequestStream, times(4)).onNext(any());
}
@Test
- @DisplayName("Should send RETRY ack when deserialization fails")
+ @DisplayName("Should send DROP ack when deserialization fails")
void testDeserializationError() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -168,7 +183,10 @@ void testDeserializationError() {
});
StepVerifier.create(flux)
- .expectError(DaprException.class)
+ .expectErrorMatches(error ->
+ error instanceof DaprException
+ && error.getMessage().contains("DESERIALIZATION_ERROR")
+ && error.getMessage().contains("event-1"))
.verify();
ArgumentCaptor requestCaptor =
@@ -183,15 +201,46 @@ void testDeserializationError() {
assertEquals(1, ackRequests.size());
assertEquals("event-1", ackRequests.get(0).getEventProcessed().getId());
assertEquals(
- DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY,
+ DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP,
ackRequests.get(0).getEventProcessed().getStatus().getStatus()
);
}
+ @Test
+ @DisplayName("Should send RETRY ack when non-deserialization error occurs")
+ void testProcessingError() {
+ Flux> flux = Flux.create(sink -> {
+ EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
+ mockStub,
+ sink,
+ TypeRef.STRING,
+ objectSerializer
+ );
+
+ DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest();
+ observer.start(initialRequest);
+
+ // Simulate a processing error by throwing during sink.next()
+ sink.onRequest(n -> {
+ throw new RuntimeException("Processing error");
+ });
+
+ observer.onNext(buildEventResponse("event-1", "Hello"));
+ });
+
+ StepVerifier.create(flux)
+ .expectError(RuntimeException.class)
+ .verify();
+
+ // Note: When error occurs in onRequest callback (before processing),
+ // no ack is sent as the error happens before we can handle the event
+ verify(mockRequestStream, times(1)).onNext(any()); // Only initial request sent
+ }
+
@Test
@DisplayName("Should propagate gRPC errors as DaprException")
void testGrpcError() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -214,7 +263,7 @@ void testGrpcError() {
@Test
@DisplayName("Should handle null event messages gracefully without emitting events")
void testNullEventMessage() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -242,7 +291,7 @@ void testNullEventMessage() {
@Test
@DisplayName("Should skip events with empty pubsub name")
void testEmptyPubsubName() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -278,7 +327,7 @@ void testEmptyPubsubName() {
@Test
@DisplayName("Should skip events with empty event ID")
void testEmptyEventId() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -312,9 +361,9 @@ void testEmptyEventId() {
}
@Test
- @DisplayName("Should handle null type parameter and skip event emission")
+ @DisplayName("Should handle null type parameter and emit CloudEvent with null data")
void testNullData() {
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -331,6 +380,12 @@ void testNullData() {
});
StepVerifier.create(flux)
+ .assertNext(cloudEvent -> {
+ assertNull(cloudEvent.getData());
+ assertEquals("event-1", cloudEvent.getId());
+ assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName());
+ assertEquals(TOPIC_NAME, cloudEvent.getTopic());
+ })
.verifyComplete();
verify(mockRequestStream, times(2)).onNext(any());
@@ -342,7 +397,7 @@ void testComplexObjectSerialization() throws IOException {
TestEvent testEvent = new TestEvent("test-name", 42);
byte[] serializedEvent = objectSerializer.serialize(testEvent);
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -370,7 +425,12 @@ void testComplexObjectSerialization() throws IOException {
});
StepVerifier.create(flux)
- .expectNextMatches(event -> event.name.equals("test-name") && event.value == 42)
+ .assertNext(cloudEvent -> {
+ TestEvent event = cloudEvent.getData();
+ assertEquals("test-name", event.name);
+ assertEquals(42, event.value);
+ assertEquals("event-1", cloudEvent.getId());
+ })
.verifyComplete();
}
@@ -381,7 +441,7 @@ void testErrorDuringSendingAck() {
.when(mockRequestStream)
.onNext(argThat(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed));
- Flux flux = Flux.create(sink -> {
+ Flux> flux = Flux.create(sink -> {
EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>(
mockStub,
sink,
@@ -396,7 +456,7 @@ void testErrorDuringSendingAck() {
});
StepVerifier.create(flux)
- .expectNext("Hello") // Event is emitted before ack
+ .assertNext(cloudEvent -> assertEquals("Hello", cloudEvent.getData())) // Event is emitted before ack
.expectError(DaprException.class) // Then error when sending ack
.verify();
}
From 9821f8ee033cbe90f4139da48f79e159b8f8a1a6 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Wed, 3 Dec 2025 10:57:51 -0800
Subject: [PATCH 11/12] Fix unit tests asserts
Signed-off-by: Artur Ciocanu
---
.../client/DaprPreviewClientGrpcTest.java | 37 ++++++++++++-------
1 file changed, 23 insertions(+), 14 deletions(-)
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
index 4bb59bb4c0..a42c4f946c 100644
--- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
@@ -592,35 +592,41 @@ public void subscribeEventFluxTest() throws Exception {
var pubsubName = "pubsubName";
var topicName = "topicName";
var data = "my message";
-
var started = new Semaphore(0);
doAnswer((Answer>) invocation -> {
StreamObserver observer =
(StreamObserver) invocation.getArguments()[0];
+
var emitterThread = new Thread(() -> {
try {
started.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
+
observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
+
for (int i = 0; i < numEvents; i++) {
- observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
+ DaprProtos.SubscribeTopicEventsResponseAlpha1 reponse =
+ DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder()
- .setId(Integer.toString(i))
- .setPubsubName(pubsubName)
- .setTopic(topicName)
- .setData(ByteString.copyFromUtf8("\"" + data + "\""))
- .setDataContentType("application/json")
- .build())
- .build());
+ .setId(Integer.toString(i))
+ .setPubsubName(pubsubName)
+ .setTopic(topicName)
+ .setData(ByteString.copyFromUtf8("\"" + data + "\""))
+ .setDataContentType("application/json")
+ .build())
+ .build();
+ observer.onNext(reponse);
}
+
observer.onCompleted();
});
+
emitterThread.start();
- return new StreamObserver<>() {
+ return new StreamObserver<>() {
@Override
public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) {
started.release();
@@ -628,24 +634,27 @@ public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEv
@Override
public void onError(Throwable throwable) {
+ // No-op
}
@Override
public void onCompleted() {
+ // No-op
}
};
}).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
final AtomicInteger eventCount = new AtomicInteger(0);
final Semaphore gotAll = new Semaphore(0);
-
- var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING)
+ var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING)
.doOnNext(cloudEvent -> {
assertEquals(data, cloudEvent.getData());
- assertEquals("pubsubname", cloudEvent.getPubsubName());
- assertEquals("topic", cloudEvent.getTopic());
+ assertEquals(pubsubName, cloudEvent.getPubsubName());
+ assertEquals(topicName, cloudEvent.getTopic());
assertNotNull(cloudEvent.getId());
+
int count = eventCount.incrementAndGet();
+
if (count >= numEvents) {
gotAll.release();
}
From 26d215342bf541135c200ba22fd085b8a4e1c010 Mon Sep 17 00:00:00 2001
From: Artur Ciocanu
Date: Thu, 4 Dec 2025 13:13:45 -0800
Subject: [PATCH 12/12] Adjust Java examples for Subscriber
Signed-off-by: Artur Ciocanu
---
.../io/dapr/examples/pubsub/stream/README.md | 30 +++++++-----------
.../examples/pubsub/stream/Subscriber.java | 31 +++++++------------
.../java/io/dapr/client/Subscription.java | 1 +
.../io/dapr/client/SubscriptionListener.java | 1 +
4 files changed, 25 insertions(+), 38 deletions(-)
diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
index d9d41b3759..da3e4e2482 100644
--- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
+++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
@@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
-In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
+In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
```java
public class Subscriber {
@@ -59,25 +59,19 @@ public class Subscriber {
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
- var subscription = client.subscribeToEvents(
+ // Subscribe to events using the Flux-based reactive API
+ // The stream will emit CloudEvent objects as they arrive
+ client.subscribeToEvents(
PUBSUB_NAME,
topicName,
- new SubscriptionListener<>() {
-
- @Override
- public Mono onEvent(CloudEvent event) {
- System.out.println("Subscriber got: " + event.getData());
- return Mono.just(Status.SUCCESS);
- }
-
- @Override
- public void onError(RuntimeException exception) {
- System.out.println("Subscriber got exception: " + exception.getMessage());
- }
- },
- TypeRef.STRING);
-
- subscription.awaitTermination();
+ TypeRef.STRING)
+ .doOnNext(event -> {
+ System.out.println("Subscriber got: " + event.getData());
+ })
+ .doOnError(throwable -> {
+ System.out.println("Subscriber got exception: " + throwable.getMessage());
+ })
+ .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
}
}
diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java
index 31678dce08..763bb436ce 100644
--- a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java
+++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java
@@ -14,10 +14,7 @@
package io.dapr.examples.pubsub.stream;
import io.dapr.client.DaprClientBuilder;
-import io.dapr.client.SubscriptionListener;
-import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
-import reactor.core.publisher.Mono;
/**
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
@@ -44,25 +41,19 @@ public class Subscriber {
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
- var subscription = client.subscribeToEvents(
+ // Subscribe to events using the Flux-based reactive API
+ // The stream will emit CloudEvent objects as they arrive
+ client.subscribeToEvents(
PUBSUB_NAME,
topicName,
- new SubscriptionListener<>() {
-
- @Override
- public Mono onEvent(CloudEvent event) {
- System.out.println("Subscriber got: " + event.getData());
- return Mono.just(Status.SUCCESS);
- }
-
- @Override
- public void onError(RuntimeException exception) {
- System.out.println("Subscriber got exception: " + exception.getMessage());
- }
- },
- TypeRef.STRING);
-
- subscription.awaitTermination();
+ TypeRef.STRING)
+ .doOnNext(event -> {
+ System.out.println("Subscriber got: " + event.getData());
+ })
+ .doOnError(throwable -> {
+ System.out.println("Subscriber got exception: " + throwable.getMessage());
+ })
+ .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
}
}
diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java
index 53e89e8456..2cbd1e9b30 100644
--- a/sdk/src/main/java/io/dapr/client/Subscription.java
+++ b/sdk/src/main/java/io/dapr/client/Subscription.java
@@ -35,6 +35,7 @@
* Streaming subscription of events for Dapr's pubsub.
* @param Application's object type.
*/
+@Deprecated
public class Subscription implements Closeable {
private final BlockingQueue ackQueue = new LinkedBlockingQueue<>(50);
diff --git a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java
index 5a467d69f4..c5420af602 100644
--- a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java
+++ b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java
@@ -20,6 +20,7 @@
* Callback interface to receive events from a streaming subscription of events.
* @param Object type for deserialization.
*/
+@Deprecated
public interface SubscriptionListener {
/**