From 7fdf04375ae157a6a206d8802f124a8a207f1769 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 12:06:55 -0800 Subject: [PATCH 01/12] Adding a Flux based subscribeToEvents method Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 155 ++++++++++++++++++ .../io/dapr/client/DaprPreviewClient.java | 29 +++- .../client/DaprPreviewClientGrpcTest.java | 69 ++++++++ 3 files changed, 252 insertions(+), 1 deletion(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 012921a89e..9854d96051 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -96,6 +96,7 @@ import io.dapr.utils.DefaultContentTypeConverter; import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; +import io.dapr.v1.DaprAppCallbackProtos; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.dapr.v1.DaprProtos.ActiveActorsCount; @@ -133,6 +134,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -475,6 +480,139 @@ public Subscription subscribeToEvents( return buildSubscription(listener, type, request); } + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type) { + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() + .setTopic(topic) + .setPubsubName(pubsubName) + .build(); + DaprProtos.SubscribeTopicEventsRequestAlpha1 request = + DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setInitialRequest(initialRequest) + .build(); + + return Flux.create(sink -> { + var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); + BlockingQueue ackQueue = new LinkedBlockingQueue<>(50); + AtomicReference> streamRef = + new AtomicReference<>(); + AtomicBoolean running = new AtomicBoolean(true); + + // Thread to send acknowledgments back to Dapr + Thread acker = new Thread(() -> { + while (running.get()) { + try { + var ackResponse = ackQueue.take(); + if (ackResponse == null) { + continue; + } + + var stream = streamRef.get(); + if (stream == null) { + Thread.sleep(100); + continue; + } + + stream.onNext(ackResponse); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + } + } + }); + + // Create the gRPC streaming observer + var stream = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { + @Override + public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { + try { + if (response.getEventMessage() == null) { + return; + } + + var message = response.getEventMessage(); + if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) { + return; + } + + var id = message.getId(); + if ((id == null) || id.isEmpty()) { + return; + } + + // Deserialize the event data + T data = null; + if (type != null) { + data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type); + } + + // Emit the data to the Flux (only if not null) + if (data != null) { + sink.next(data); + } + + // Send SUCCESS acknowledgment + var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS); + ackQueue.put(ack); + + } catch (Exception e) { + // On error during processing, send RETRY acknowledgment + try { + var id = response.getEventMessage().getId(); + if (id != null && !id.isEmpty()) { + var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY); + ackQueue.put(ack); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + sink.error(DaprException.propagate(e)); + } + } + + @Override + public void onError(Throwable throwable) { + running.set(false); + sink.error(DaprException.propagate(throwable)); + } + + @Override + public void onCompleted() { + running.set(false); + sink.complete(); + } + }); + + streamRef.set(stream); + acker.start(); + + // Send initial request to start receiving events + stream.onNext(request); + + // Cleanup when Flux is cancelled or completed + sink.onDispose(() -> { + running.set(false); + acker.interrupt(); + try { + stream.onCompleted(); + } catch (Exception e) { + // Ignore cleanup errors + } + }); + }, FluxSink.OverflowStrategy.BUFFER); + } + @Nonnull private Subscription buildSubscription( SubscriptionListener listener, @@ -513,6 +651,23 @@ private Subscription buildSubscription( return subscription; } + @Nonnull + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( + String id, SubscriptionListener.Status status) { + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder() + .setId(id) + .setStatus( + DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf( + status.name())) + .build()) + .build(); + return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setEventProcessed(eventProcessed) + .build(); + } + @Override public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { try { diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 92c6a61c3e..8c66979df9 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -32,6 +32,7 @@ import io.dapr.client.domain.UnlockResponseStatus; import io.dapr.client.domain.query.Query; import io.dapr.utils.TypeRef; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; @@ -271,12 +272,38 @@ Mono> publishEvents(String pubsubName, String topicNa * @param topic Name of the topic to subscribe to. * @param listener Callback methods to process events. * @param type Type for object deserialization. - * @return An active subscription. * @param Type of object deserialization. + * @return An active subscription. + * @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach. */ + @Deprecated Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux. + *

+ * This method returns a reactive stream of events that can be processed using standard Reactor operators. + * Events are automatically acknowledged as SUCCESS when processed successfully, or RETRY when an error occurs. + * Users can control retry behavior using standard Flux operators like {@code retry()} and {@code onErrorResume()}. + *

+ *

+ * Example usage: + *

{@code
+   * client.subscribeToEvents("pubsub", "orders", TypeRef.get(Order.class))
+   *     .doOnNext(order -> processOrder(order))
+   *     .retry(3)  // Retry up to 3 times on errors
+   *     .subscribe();
+   * }
+ *

+ * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param type Type for object deserialization. + * @return A Flux of deserialized event payloads. + * @param Type of the event payload. + */ + Flux subscribeToEvents(String pubsubName, String topic, TypeRef type); + /** * Schedules a job using the provided job request details. * diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index f7b5584cc7..e0db8174e2 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -586,6 +586,75 @@ public void onError(RuntimeException exception) { assertEquals(numErrors, errors.size()); } + @Test + public void subscribeEventFluxTest() throws Exception { + var numEvents = 100; + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + + var started = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + var emitterThread = new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + for (int i = 0; i < numEvents; i++) { + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build()); + } + observer.onCompleted(); + }); + emitterThread.start(); + return new StreamObserver<>() { + + @Override + public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final Set receivedEvents = Collections.synchronizedSet(new HashSet<>()); + final Semaphore gotAll = new Semaphore(0); + + var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING) + .doOnNext(eventData -> { + assertEquals(data, eventData); + receivedEvents.add(eventData); + if (receivedEvents.size() >= numEvents) { + gotAll.release(); + } + }) + .subscribe(); + + gotAll.acquire(); + disposable.dispose(); + + assertEquals(numEvents, receivedEvents.size()); + } + @Test public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception { List inputs = new ArrayList<>(); From 4606c8d4d268c62ecb5fa3db44b2b097beee5c88 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 12:25:41 -0800 Subject: [PATCH 02/12] Simplify GRPC stream handling Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 69 +++++-------------- 1 file changed, 16 insertions(+), 53 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 9854d96051..b2a951ce2e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -134,10 +134,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -497,43 +493,15 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef return Flux.create(sink -> { var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); - BlockingQueue ackQueue = new LinkedBlockingQueue<>(50); - AtomicReference> streamRef = - new AtomicReference<>(); - AtomicBoolean running = new AtomicBoolean(true); - - // Thread to send acknowledgments back to Dapr - Thread acker = new Thread(() -> { - while (running.get()) { - try { - var ackResponse = ackQueue.take(); - if (ackResponse == null) { - continue; - } - var stream = streamRef.get(); - if (stream == null) { - Thread.sleep(100); - continue; - } + // Use array wrapper to allow assignment within anonymous class (Java's effectively final requirement) + // This is simpler than AtomicReference since we don't need atomicity - just mutability + @SuppressWarnings("unchecked") + StreamObserver[] streamHolder = new StreamObserver[1]; - stream.onNext(ackResponse); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } catch (Exception e) { - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - } - } - }); - - // Create the gRPC streaming observer - var stream = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { + // Create the gRPC bidirectional streaming observer + // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly + streamHolder[0] = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { @Override public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { try { @@ -562,9 +530,9 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { sink.next(data); } - // Send SUCCESS acknowledgment + // Send SUCCESS acknowledgment directly (no blocking queue or thread needed) var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS); - ackQueue.put(ack); + streamHolder[0].onNext(ack); } catch (Exception e) { // On error during processing, send RETRY acknowledgment @@ -572,10 +540,12 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { var id = response.getEventMessage().getId(); if (id != null && !id.isEmpty()) { var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY); - ackQueue.put(ack); + streamHolder[0].onNext(ack); } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); + } catch (Exception ex) { + // If we can't send ack, propagate the error + sink.error(DaprException.propagate(ex)); + return; } sink.error(DaprException.propagate(e)); } @@ -583,29 +553,22 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { @Override public void onError(Throwable throwable) { - running.set(false); sink.error(DaprException.propagate(throwable)); } @Override public void onCompleted() { - running.set(false); sink.complete(); } }); - streamRef.set(stream); - acker.start(); - // Send initial request to start receiving events - stream.onNext(request); + streamHolder[0].onNext(request); // Cleanup when Flux is cancelled or completed sink.onDispose(() -> { - running.set(false); - acker.interrupt(); try { - stream.onCompleted(); + streamHolder[0].onCompleted(); } catch (Exception e) { // Ignore cleanup errors } From b363e49a76f510b627daa02ff2c8b9aa8b0491a4 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 12:27:24 -0800 Subject: [PATCH 03/12] Simplify Javadoc Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprPreviewClient.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 8c66979df9..ba97931117 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -282,20 +282,6 @@ Subscription subscribeToEvents( /** * Subscribe to pubsub events via streaming using Project Reactor Flux. - *

- * This method returns a reactive stream of events that can be processed using standard Reactor operators. - * Events are automatically acknowledged as SUCCESS when processed successfully, or RETRY when an error occurs. - * Users can control retry behavior using standard Flux operators like {@code retry()} and {@code onErrorResume()}. - *

- *

- * Example usage: - *

{@code
-   * client.subscribeToEvents("pubsub", "orders", TypeRef.get(Order.class))
-   *     .doOnNext(order -> processOrder(order))
-   *     .retry(3)  // Retry up to 3 times on errors
-   *     .subscribe();
-   * }
- *

* @param pubsubName Name of the pubsub component. * @param topic Name of the topic to subscribe to. * @param type Type for object deserialization. From 305baee24d071a19055083b232f77e8b2f58c238 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 16:47:28 -0800 Subject: [PATCH 04/12] Fix unit tests and simplify implementation Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 38 ++++++++++++------- .../client/DaprPreviewClientGrpcTest.java | 8 ++-- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index b2a951ce2e..25185306ba 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -134,6 +134,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -494,14 +495,16 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef return Flux.create(sink -> { var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); - // Use array wrapper to allow assignment within anonymous class (Java's effectively final requirement) - // This is simpler than AtomicReference since we don't need atomicity - just mutability - @SuppressWarnings("unchecked") - StreamObserver[] streamHolder = new StreamObserver[1]; + // We need AtomicReference because we're accessing the stream reference from within the anonymous + // StreamObserver implementation (to send acks). Java requires variables used in lambdas/anonymous + // classes to be effectively final, so we can't use a plain variable. AtomicReference provides + // the mutable container we need while keeping the reference itself final. + AtomicReference> streamRef = + new AtomicReference<>(); // Create the gRPC bidirectional streaming observer // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly - streamHolder[0] = interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { + streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { @Override public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { try { @@ -509,18 +512,22 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { return; } - var message = response.getEventMessage(); - if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) { + DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); + String pubsubName = message.getPubsubName(); + + if (pubsubName == null || pubsubName.isEmpty()) { return; } var id = message.getId(); - if ((id == null) || id.isEmpty()) { + + if (id == null || id.isEmpty()) { return; } // Deserialize the event data T data = null; + if (type != null) { data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type); } @@ -530,23 +537,26 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { sink.next(data); } - // Send SUCCESS acknowledgment directly (no blocking queue or thread needed) + // Send SUCCESS acknowledgment directly var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS); - streamHolder[0].onNext(ack); + streamRef.get().onNext(ack); } catch (Exception e) { // On error during processing, send RETRY acknowledgment try { var id = response.getEventMessage().getId(); + if (id != null && !id.isEmpty()) { var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY); - streamHolder[0].onNext(ack); + + streamRef.get().onNext(ack); } } catch (Exception ex) { // If we can't send ack, propagate the error sink.error(DaprException.propagate(ex)); return; } + sink.error(DaprException.propagate(e)); } } @@ -560,15 +570,15 @@ public void onError(Throwable throwable) { public void onCompleted() { sink.complete(); } - }); + })); // Send initial request to start receiving events - streamHolder[0].onNext(request); + streamRef.get().onNext(request); // Cleanup when Flux is cancelled or completed sink.onDispose(() -> { try { - streamHolder[0].onCompleted(); + streamRef.get().onCompleted(); } catch (Exception e) { // Ignore cleanup errors } diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index e0db8174e2..35bc300590 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -636,14 +636,14 @@ public void onCompleted() { }; }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); - final Set receivedEvents = Collections.synchronizedSet(new HashSet<>()); + final AtomicInteger eventCount = new AtomicInteger(0); final Semaphore gotAll = new Semaphore(0); var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING) .doOnNext(eventData -> { assertEquals(data, eventData); - receivedEvents.add(eventData); - if (receivedEvents.size() >= numEvents) { + int count = eventCount.incrementAndGet(); + if (count >= numEvents) { gotAll.release(); } }) @@ -652,7 +652,7 @@ public void onCompleted() { gotAll.acquire(); disposable.dispose(); - assertEquals(numEvents, receivedEvents.size()); + assertEquals(numEvents, eventCount.get()); } @Test From 47ed16c9ce3e804fc5d7dea56d8127c660e5e07c Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 17:07:57 -0800 Subject: [PATCH 05/12] Adding event subscriber stream observer to simplify subscription logic Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 78 +------ .../EventSubscriberStreamObserver.java | 191 ++++++++++++++++++ 2 files changed, 201 insertions(+), 68 deletions(-) create mode 100644 sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 25185306ba..71c4bfc939 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -91,6 +91,7 @@ import io.dapr.internal.grpc.DaprClientGrpcInterceptors; import io.dapr.internal.resiliency.RetryPolicy; import io.dapr.internal.resiliency.TimeoutPolicy; +import io.dapr.internal.subscription.EventSubscriberStreamObserver; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.DefaultContentTypeConverter; @@ -495,8 +496,8 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef return Flux.create(sink -> { var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); - // We need AtomicReference because we're accessing the stream reference from within the anonymous - // StreamObserver implementation (to send acks). Java requires variables used in lambdas/anonymous + // We need AtomicReference because we're accessing the stream reference from within the + // EventSubscriberStreamObserver (to send acks). Java requires variables used in lambdas/anonymous // classes to be effectively final, so we can't use a plain variable. AtomicReference provides // the mutable container we need while keeping the reference itself final. AtomicReference> streamRef = @@ -504,73 +505,14 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef // Create the gRPC bidirectional streaming observer // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly - streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { - @Override - public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { - try { - if (response.getEventMessage() == null) { - return; - } - - DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); - String pubsubName = message.getPubsubName(); - - if (pubsubName == null || pubsubName.isEmpty()) { - return; - } - - var id = message.getId(); - - if (id == null || id.isEmpty()) { - return; - } - - // Deserialize the event data - T data = null; - - if (type != null) { - data = DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type); - } - - // Emit the data to the Flux (only if not null) - if (data != null) { - sink.next(data); - } - - // Send SUCCESS acknowledgment directly - var ack = buildAckRequest(id, SubscriptionListener.Status.SUCCESS); - - streamRef.get().onNext(ack); - } catch (Exception e) { - // On error during processing, send RETRY acknowledgment - try { - var id = response.getEventMessage().getId(); - - if (id != null && !id.isEmpty()) { - var ack = buildAckRequest(id, SubscriptionListener.Status.RETRY); - - streamRef.get().onNext(ack); - } - } catch (Exception ex) { - // If we can't send ack, propagate the error - sink.error(DaprException.propagate(ex)); - return; - } - - sink.error(DaprException.propagate(e)); - } - } - - @Override - public void onError(Throwable throwable) { - sink.error(DaprException.propagate(throwable)); - } + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + sink, + type, + this.objectSerializer, + streamRef + ); - @Override - public void onCompleted() { - sink.complete(); - } - })); + streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(observer)); // Send initial request to start receiving events streamRef.get().onNext(request); diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java new file mode 100644 index 0000000000..b6c7d47558 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java @@ -0,0 +1,191 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.internal.subscription; + +import io.dapr.exceptions.DaprException; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.utils.TypeRef; +import io.dapr.v1.DaprAppCallbackProtos; +import io.dapr.v1.DaprProtos; +import io.grpc.stub.StreamObserver; +import reactor.core.publisher.FluxSink; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * StreamObserver implementation for subscribing to Dapr pub/sub events. + *

+ * This class handles the bidirectional gRPC streaming for event subscriptions, including: + *

    + *
  • Receiving events from Dapr
  • + *
  • Deserializing event payloads
  • + *
  • Emitting deserialized data to a Reactor Flux
  • + *
  • Sending acknowledgments (SUCCESS/RETRY) back to Dapr
  • + *
+ *

+ * + * @param The type of the event payload + */ +public class EventSubscriberStreamObserver implements StreamObserver { + + private final FluxSink sink; + private final TypeRef type; + private final DaprObjectSerializer objectSerializer; + private final AtomicReference> requestStreamRef; + + /** + * Creates a new EventSubscriberStreamObserver. + * + * @param sink The FluxSink to emit deserialized events to + * @param type The TypeRef for deserializing event payloads + * @param objectSerializer The serializer to use for deserialization + * @param requestStreamRef Reference to the request stream for sending acknowledgments + */ + public EventSubscriberStreamObserver( + FluxSink sink, + TypeRef type, + DaprObjectSerializer objectSerializer, + AtomicReference> requestStreamRef) { + this.sink = sink; + this.type = type; + this.objectSerializer = objectSerializer; + this.requestStreamRef = requestStreamRef; + } + + @Override + public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { + try { + if (response.getEventMessage() == null) { + return; + } + + DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); + String pubsubName = message.getPubsubName(); + + if (pubsubName == null || pubsubName.isEmpty()) { + return; + } + + String id = message.getId(); + + if (id == null || id.isEmpty()) { + return; + } + + // Deserialize the event data + T data = null; + + if (type != null) { + data = objectSerializer.deserialize(message.getData().toByteArray(), type); + } + + // Emit the data to the Flux (only if not null) + if (data != null) { + sink.next(data); + } + + // Send SUCCESS acknowledgment directly + var ack = buildSuccessAck(id); + + requestStreamRef.get().onNext(ack); + } catch (Exception e) { + // On error during processing, send RETRY acknowledgment + try { + var id = response.getEventMessage().getId(); + + if (id != null && !id.isEmpty()) { + var ack = buildRetryAck(id); + + requestStreamRef.get().onNext(ack); + } + } catch (Exception ex) { + // If we can't send ack, propagate the error + sink.error(DaprException.propagate(ex)); + return; + } + + sink.error(DaprException.propagate(e)); + } + } + + @Override + public void onError(Throwable throwable) { + sink.error(DaprException.propagate(throwable)); + } + + @Override + public void onCompleted() { + sink.complete(); + } + + /** + * Builds a SUCCESS acknowledgment request. + * + * @param eventId The ID of the event to acknowledge + * @return The acknowledgment request + */ + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildSuccessAck(String eventId) { + return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS); + } + + /** + * Builds a RETRY acknowledgment request. + * + * @param eventId The ID of the event to acknowledge + * @return The acknowledgment request + */ + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildRetryAck(String eventId) { + return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY); + } + + /** + * Builds a DROP acknowledgment request. + * + * @param eventId The ID of the event to acknowledge + * @return The acknowledgment request + */ + @SuppressWarnings("unused") // May be used in the future + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String eventId) { + return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP); + } + + /** + * Builds an acknowledgment request with the specified status. + *

+ * This method directly uses the protobuf enum instead of depending on + * {@code SubscriptionListener.Status} to keep this class independent + * of the older callback-based API. + *

+ * + * @param eventId The ID of the event to acknowledge + * @param status The acknowledgment status (SUCCESS, RETRY, or DROP) + * @return The acknowledgment request + */ + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( + String eventId, + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus status) { + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = + DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder() + .setId(eventId) + .setStatus( + DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(status) + .build()) + .build(); + + return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setEventProcessed(eventProcessed) + .build(); + } +} From ae0b5330ea4a305d4c97c72407ab6a4c5e1fb311 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 17:49:39 -0800 Subject: [PATCH 06/12] Use start() method to start stream subscription Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 27 +++------- .../EventSubscriberStreamObserver.java | 53 ++++++++++--------- 2 files changed, 35 insertions(+), 45 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 71c4bfc939..96e990dd25 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -135,7 +135,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -494,33 +493,19 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef .build(); return Flux.create(sink -> { - var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); - - // We need AtomicReference because we're accessing the stream reference from within the - // EventSubscriberStreamObserver (to send acks). Java requires variables used in lambdas/anonymous - // classes to be effectively final, so we can't use a plain variable. AtomicReference provides - // the mutable container we need while keeping the reference itself final. - AtomicReference> streamRef = - new AtomicReference<>(); - - // Create the gRPC bidirectional streaming observer - // Note: StreamObserver.onNext() is thread-safe, so we can send acks directly - EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub); + EventSubscriberStreamObserver eventSubscriber = new EventSubscriberStreamObserver<>( + interceptedStub, sink, type, - this.objectSerializer, - streamRef + this.objectSerializer ); - - streamRef.set(interceptedStub.subscribeTopicEventsAlpha1(observer)); - - // Send initial request to start receiving events - streamRef.get().onNext(request); + StreamObserver requestStream = eventSubscriber.start(request); // Cleanup when Flux is cancelled or completed sink.onDispose(() -> { try { - streamRef.get().onCompleted(); + requestStream.onCompleted(); } catch (Exception e) { // Ignore cleanup errors } diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java index b6c7d47558..fb3eebda88 100644 --- a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java +++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java @@ -17,50 +17,60 @@ import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.TypeRef; import io.dapr.v1.DaprAppCallbackProtos; +import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.stub.StreamObserver; import reactor.core.publisher.FluxSink; -import java.util.concurrent.atomic.AtomicReference; - /** * StreamObserver implementation for subscribing to Dapr pub/sub events. - *

- * This class handles the bidirectional gRPC streaming for event subscriptions, including: - *

    - *
  • Receiving events from Dapr
  • - *
  • Deserializing event payloads
  • - *
  • Emitting deserialized data to a Reactor Flux
  • - *
  • Sending acknowledgments (SUCCESS/RETRY) back to Dapr
  • - *
- *

+ * Thread Safety: This class relies on gRPC's StreamObserver contract, which guarantees that + * onNext(), onError(), and onCompleted() are never called concurrently and always from the + * same thread. Therefore, no additional synchronization is needed. * * @param The type of the event payload */ public class EventSubscriberStreamObserver implements StreamObserver { + private final DaprGrpc.DaprStub stub; private final FluxSink sink; private final TypeRef type; private final DaprObjectSerializer objectSerializer; - private final AtomicReference> requestStreamRef; + + private StreamObserver requestStream; /** * Creates a new EventSubscriberStreamObserver. * + * @param stub The gRPC stub for making Dapr service calls * @param sink The FluxSink to emit deserialized events to * @param type The TypeRef for deserializing event payloads * @param objectSerializer The serializer to use for deserialization - * @param requestStreamRef Reference to the request stream for sending acknowledgments */ public EventSubscriberStreamObserver( + DaprGrpc.DaprStub stub, FluxSink sink, TypeRef type, - DaprObjectSerializer objectSerializer, - AtomicReference> requestStreamRef) { + DaprObjectSerializer objectSerializer) { + this.stub = stub; this.sink = sink; this.type = type; this.objectSerializer = objectSerializer; - this.requestStreamRef = requestStreamRef; + } + + /** Starts the subscription by sending the initial request. + * + * @param request The subscription request + * @return The StreamObserver to send further requests (acknowledgments) + */ + public StreamObserver start( + DaprProtos.SubscribeTopicEventsRequestAlpha1 request + ) { + requestStream = stub.subscribeTopicEventsAlpha1(this); + + requestStream.onNext(request); + + return requestStream; } @Override @@ -98,7 +108,7 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { // Send SUCCESS acknowledgment directly var ack = buildSuccessAck(id); - requestStreamRef.get().onNext(ack); + requestStream.onNext(ack); } catch (Exception e) { // On error during processing, send RETRY acknowledgment try { @@ -107,7 +117,7 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { if (id != null && !id.isEmpty()) { var ack = buildRetryAck(id); - requestStreamRef.get().onNext(ack); + requestStream.onNext(ack); } } catch (Exception ex) { // If we can't send ack, propagate the error @@ -162,12 +172,7 @@ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String /** * Builds an acknowledgment request with the specified status. - *

- * This method directly uses the protobuf enum instead of depending on - * {@code SubscriptionListener.Status} to keep this class independent - * of the older callback-based API. - *

- * + * * @param eventId The ID of the event to acknowledge * @param status The acknowledgment status (SUCCESS, RETRY, or DROP) * @return The acknowledgment request From f2f7603e0fe55660338a4b7c95462289f7dbb962 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 18:01:16 -0800 Subject: [PATCH 07/12] Add unit test for event suscriber observer Signed-off-by: Artur Ciocanu --- .../EventSubscriberStreamObserverTest.java | 458 ++++++++++++++++++ 1 file changed, 458 insertions(+) create mode 100644 sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java new file mode 100644 index 0000000000..53ee5c0f28 --- /dev/null +++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java @@ -0,0 +1,458 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package io.dapr.internal.subscription; + +import com.google.protobuf.ByteString; +import io.dapr.exceptions.DaprException; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.utils.TypeRef; +import io.dapr.v1.DaprAppCallbackProtos; +import io.dapr.v1.DaprGrpc; +import io.dapr.v1.DaprProtos; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * Unit tests for EventSubscriberStreamObserver. + */ +class EventSubscriberStreamObserverTest { + + private DaprGrpc.DaprStub mockStub; + private DaprObjectSerializer objectSerializer; + private StreamObserver mockRequestStream; + + @BeforeEach + @SuppressWarnings("unchecked") + void setUp() { + mockStub = mock(DaprGrpc.DaprStub.class); + objectSerializer = new DefaultObjectSerializer(); + mockRequestStream = mock(StreamObserver.class); + + // Setup stub to return mock request stream + when(mockStub.subscribeTopicEventsAlpha1(any())).thenReturn(mockRequestStream); + } + + @Test + void testSuccessfulEventProcessing() { + // Create a Flux and capture the sink + List emittedEvents = new ArrayList<>(); + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + // Start the subscription + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Simulate receiving an event + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse("event-1", "pubsub", "topic", "Hello World"); + observer.onNext(response); + + // Complete the stream + observer.onCompleted(); + }); + + // Verify the flux emits the correct data + StepVerifier.create(flux) + .expectNext("Hello World") + .verifyComplete(); + + // Verify the initial request was sent + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class); + verify(mockRequestStream, times(2)).onNext(requestCaptor.capture()); + + List requests = requestCaptor.getAllValues(); + assertEquals(2, requests.size()); + + // First request should be the initial request + assertTrue(requests.get(0).hasInitialRequest()); + + // Second request should be a SUCCESS ack + assertTrue(requests.get(1).hasEventProcessed()); + assertEquals("event-1", requests.get(1).getEventProcessed().getId()); + assertEquals( + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS, + requests.get(1).getEventProcessed().getStatus().getStatus() + ); + } + + @Test + void testMultipleEvents() { + List emittedEvents = new ArrayList<>(); + + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send multiple events + observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Message 1")); + observer.onNext(buildEventResponse("event-2", "pubsub", "topic", "Message 2")); + observer.onNext(buildEventResponse("event-3", "pubsub", "topic", "Message 3")); + + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .expectNext("Message 1") + .expectNext("Message 2") + .expectNext("Message 3") + .verifyComplete(); + + // Verify 4 requests: 1 initial + 3 acks + verify(mockRequestStream, times(4)).onNext(any()); + } + + @Test + void testDeserializationError() { + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send an event with invalid data (can't deserialize to String) + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("event-1") + .setPubsubName("pubsub") + .setTopic("topic") + .setData(ByteString.copyFrom(new byte[]{(byte) 0xFF, (byte) 0xFE})) // Invalid UTF-8 + .build() + ) + .build(); + + observer.onNext(response); + }); + + // Verify error is propagated + StepVerifier.create(flux) + .expectError(DaprException.class) + .verify(); + + // Verify RETRY ack was sent + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class); + verify(mockRequestStream, atLeast(2)).onNext(requestCaptor.capture()); + + // Find the ack request (not the initial request) + List ackRequests = requestCaptor.getAllValues().stream() + .filter(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed) + .collect(Collectors.toList()); + + assertEquals(1, ackRequests.size()); + assertEquals("event-1", ackRequests.get(0).getEventProcessed().getId()); + assertEquals( + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY, + ackRequests.get(0).getEventProcessed().getStatus().getStatus() + ); + } + + @Test + void testGrpcError() { + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Simulate gRPC error + observer.onError(new RuntimeException("gRPC connection failed")); + }); + + StepVerifier.create(flux) + .expectError(DaprException.class) + .verify(); + } + + @Test + void testNullEventMessage() { + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send response with null event message + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + // Should complete without emitting any events + StepVerifier.create(flux) + .verifyComplete(); + + // Verify only the initial request was sent (no ack for null event) + verify(mockRequestStream, times(1)).onNext(any()); + } + + @Test + void testEmptyPubsubName() { + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send event with empty pubsub name + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("event-1") + .setPubsubName("") + .setTopic("topic") + .setData(ByteString.copyFromUtf8("\"Hello\"")) + .build() + ) + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + // Should complete without emitting any events + StepVerifier.create(flux) + .verifyComplete(); + + // Verify only the initial request was sent + verify(mockRequestStream, times(1)).onNext(any()); + } + + @Test + void testEmptyEventId() { + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send event with empty ID + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("") + .setPubsubName("pubsub") + .setTopic("topic") + .setData(ByteString.copyFromUtf8("\"Hello\"")) + .build() + ) + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + // Should complete without emitting any events + StepVerifier.create(flux) + .verifyComplete(); + + // Verify only the initial request was sent + verify(mockRequestStream, times(1)).onNext(any()); + } + + @Test + void testNullData() { + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + null, // null type + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send event with valid structure but null type + observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello")); + + observer.onCompleted(); + }); + + // Should complete without emitting any events (data is null when type is null) + StepVerifier.create(flux) + .verifyComplete(); + + // Verify initial request + ack were sent + verify(mockRequestStream, times(2)).onNext(any()); + } + + @Test + void testComplexObjectSerialization() throws IOException { + // Test with a custom object + TestEvent testEvent = new TestEvent("test-name", 42); + byte[] serializedEvent = objectSerializer.serialize(testEvent); + + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.get(TestEvent.class), + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId("event-1") + .setPubsubName("pubsub") + .setTopic("topic") + .setData(ByteString.copyFrom(serializedEvent)) + .build() + ) + .build(); + + observer.onNext(response); + observer.onCompleted(); + }); + + StepVerifier.create(flux) + .expectNextMatches(event -> event.name.equals("test-name") && event.value == 42) + .verifyComplete(); + } + + @Test + void testErrorDuringSendingAck() { + // Mock request stream to throw exception when sending ack + doThrow(new RuntimeException("Failed to send ack")) + .when(mockRequestStream).onNext(argThat(req -> req.hasEventProcessed())); + + Flux flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + observer.start(initialRequest); + + // Send an event - this should trigger an error when trying to send ack + observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello")); + }); + + // The event is emitted successfully, then the error occurs when sending ack + StepVerifier.create(flux) + .expectNext("Hello") // Event is emitted before ack + .expectError(DaprException.class) // Then error when sending ack + .verify(); + } + + // Helper methods + + private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest(String pubsubName, String topic) { + return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() + .setInitialRequest( + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() + .setPubsubName(pubsubName) + .setTopic(topic) + .build() + ) + .build(); + } + + private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse( + String eventId, + String pubsubName, + String topic, + String data) { + try { + byte[] serializedData = objectSerializer.serialize(data); + return DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage( + DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(eventId) + .setPubsubName(pubsubName) + .setTopic(topic) + .setData(ByteString.copyFrom(serializedData)) + .build() + ) + .build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Test class for complex object serialization + public static class TestEvent { + public String name; + public int value; + + public TestEvent() { + } + + public TestEvent(String name, int value) { + this.name = name; + this.value = value; + } + } +} From d778c7d9412cf496eadfacf1f8ead4409ce1df6b Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 28 Nov 2025 18:18:58 -0800 Subject: [PATCH 08/12] Improve the tests a little bit Signed-off-by: Artur Ciocanu --- .../EventSubscriberStreamObserverTest.java | 132 ++++++++---------- 1 file changed, 60 insertions(+), 72 deletions(-) diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java index 53ee5c0f28..63981df2ea 100644 --- a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java +++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java @@ -23,13 +23,13 @@ import io.dapr.v1.DaprProtos; import io.grpc.stub.StreamObserver; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -42,6 +42,8 @@ */ class EventSubscriberStreamObserverTest { + public static final String PUBSUB_NAME = "pubsub"; + public static final String TOPIC_NAME = "topic"; private DaprGrpc.DaprStub mockStub; private DaprObjectSerializer objectSerializer; private StreamObserver mockRequestStream; @@ -53,14 +55,12 @@ void setUp() { objectSerializer = new DefaultObjectSerializer(); mockRequestStream = mock(StreamObserver.class); - // Setup stub to return mock request stream when(mockStub.subscribeTopicEventsAlpha1(any())).thenReturn(mockRequestStream); } @Test + @DisplayName("Should successfully process events and send SUCCESS acks") void testSuccessfulEventProcessing() { - // Create a Flux and capture the sink - List emittedEvents = new ArrayList<>(); Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, @@ -70,34 +70,34 @@ void testSuccessfulEventProcessing() { ); // Start the subscription - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); // Simulate receiving an event - DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse("event-1", "pubsub", "topic", "Hello World"); + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = buildEventResponse( + "event-1", + "Hello World" + ); observer.onNext(response); // Complete the stream observer.onCompleted(); }); - // Verify the flux emits the correct data StepVerifier.create(flux) .expectNext("Hello World") .verifyComplete(); - // Verify the initial request was sent ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class); + verify(mockRequestStream, times(2)).onNext(requestCaptor.capture()); List requests = requestCaptor.getAllValues(); - assertEquals(2, requests.size()); - // First request should be the initial request + assertEquals(2, requests.size()); assertTrue(requests.get(0).hasInitialRequest()); - - // Second request should be a SUCCESS ack assertTrue(requests.get(1).hasEventProcessed()); assertEquals("event-1", requests.get(1).getEventProcessed().getId()); assertEquals( @@ -107,9 +107,8 @@ void testSuccessfulEventProcessing() { } @Test + @DisplayName("Should handle multiple consecutive events correctly") void testMultipleEvents() { - List emittedEvents = new ArrayList<>(); - Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, @@ -118,13 +117,13 @@ void testMultipleEvents() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); - // Send multiple events - observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Message 1")); - observer.onNext(buildEventResponse("event-2", "pubsub", "topic", "Message 2")); - observer.onNext(buildEventResponse("event-3", "pubsub", "topic", "Message 3")); + observer.onNext(buildEventResponse("event-1", "Message 1")); + observer.onNext(buildEventResponse("event-2", "Message 2")); + observer.onNext(buildEventResponse("event-3", "Message 3")); observer.onCompleted(); }); @@ -135,11 +134,11 @@ void testMultipleEvents() { .expectNext("Message 3") .verifyComplete(); - // Verify 4 requests: 1 initial + 3 acks verify(mockRequestStream, times(4)).onNext(any()); } @Test + @DisplayName("Should send RETRY ack when deserialization fails") void testDeserializationError() { Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -149,7 +148,8 @@ void testDeserializationError() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); // Send an event with invalid data (can't deserialize to String) @@ -157,8 +157,8 @@ void testDeserializationError() { .setEventMessage( DaprAppCallbackProtos.TopicEventRequest.newBuilder() .setId("event-1") - .setPubsubName("pubsub") - .setTopic("topic") + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) .setData(ByteString.copyFrom(new byte[]{(byte) 0xFF, (byte) 0xFE})) // Invalid UTF-8 .build() ) @@ -167,17 +167,15 @@ void testDeserializationError() { observer.onNext(response); }); - // Verify error is propagated StepVerifier.create(flux) .expectError(DaprException.class) .verify(); - // Verify RETRY ack was sent ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DaprProtos.SubscribeTopicEventsRequestAlpha1.class); + verify(mockRequestStream, atLeast(2)).onNext(requestCaptor.capture()); - // Find the ack request (not the initial request) List ackRequests = requestCaptor.getAllValues().stream() .filter(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed) .collect(Collectors.toList()); @@ -191,6 +189,7 @@ void testDeserializationError() { } @Test + @DisplayName("Should propagate gRPC errors as DaprException") void testGrpcError() { Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -200,7 +199,7 @@ void testGrpcError() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(); observer.start(initialRequest); // Simulate gRPC error @@ -213,6 +212,7 @@ void testGrpcError() { } @Test + @DisplayName("Should handle null event messages gracefully without emitting events") void testNullEventMessage() { Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -222,10 +222,10 @@ void testNullEventMessage() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); - // Send response with null event message DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() .build(); @@ -233,15 +233,14 @@ void testNullEventMessage() { observer.onCompleted(); }); - // Should complete without emitting any events StepVerifier.create(flux) .verifyComplete(); - // Verify only the initial request was sent (no ack for null event) verify(mockRequestStream, times(1)).onNext(any()); } @Test + @DisplayName("Should skip events with empty pubsub name") void testEmptyPubsubName() { Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -251,16 +250,16 @@ void testEmptyPubsubName() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); - // Send event with empty pubsub name DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() .setEventMessage( DaprAppCallbackProtos.TopicEventRequest.newBuilder() .setId("event-1") .setPubsubName("") - .setTopic("topic") + .setTopic(TOPIC_NAME) .setData(ByteString.copyFromUtf8("\"Hello\"")) .build() ) @@ -270,15 +269,14 @@ void testEmptyPubsubName() { observer.onCompleted(); }); - // Should complete without emitting any events StepVerifier.create(flux) .verifyComplete(); - // Verify only the initial request was sent verify(mockRequestStream, times(1)).onNext(any()); } @Test + @DisplayName("Should skip events with empty event ID") void testEmptyEventId() { Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -288,16 +286,16 @@ void testEmptyEventId() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); - // Send event with empty ID DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() .setEventMessage( DaprAppCallbackProtos.TopicEventRequest.newBuilder() .setId("") - .setPubsubName("pubsub") - .setTopic("topic") + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) .setData(ByteString.copyFromUtf8("\"Hello\"")) .build() ) @@ -307,15 +305,14 @@ void testEmptyEventId() { observer.onCompleted(); }); - // Should complete without emitting any events StepVerifier.create(flux) .verifyComplete(); - // Verify only the initial request was sent verify(mockRequestStream, times(1)).onNext(any()); } @Test + @DisplayName("Should handle null type parameter and skip event emission") void testNullData() { Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -325,26 +322,23 @@ void testNullData() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); - observer.start(initialRequest); - - // Send event with valid structure but null type - observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello")); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); + observer.start(initialRequest); + observer.onNext(buildEventResponse("event-1", "Hello")); observer.onCompleted(); }); - // Should complete without emitting any events (data is null when type is null) StepVerifier.create(flux) .verifyComplete(); - // Verify initial request + ack were sent verify(mockRequestStream, times(2)).onNext(any()); } @Test + @DisplayName("Should deserialize and emit complex objects correctly") void testComplexObjectSerialization() throws IOException { - // Test with a custom object TestEvent testEvent = new TestEvent("test-name", 42); byte[] serializedEvent = objectSerializer.serialize(testEvent); @@ -356,15 +350,16 @@ void testComplexObjectSerialization() throws IOException { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest( + ); observer.start(initialRequest); DaprProtos.SubscribeTopicEventsResponseAlpha1 response = DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() .setEventMessage( DaprAppCallbackProtos.TopicEventRequest.newBuilder() .setId("event-1") - .setPubsubName("pubsub") - .setTopic("topic") + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) .setData(ByteString.copyFrom(serializedEvent)) .build() ) @@ -380,10 +375,11 @@ void testComplexObjectSerialization() throws IOException { } @Test + @DisplayName("Should propagate errors when ack sending fails") void testErrorDuringSendingAck() { - // Mock request stream to throw exception when sending ack doThrow(new RuntimeException("Failed to send ack")) - .when(mockRequestStream).onNext(argThat(req -> req.hasEventProcessed())); + .when(mockRequestStream) + .onNext(argThat(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed)); Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( @@ -393,46 +389,39 @@ void testErrorDuringSendingAck() { objectSerializer ); - DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest("pubsub", "topic"); + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(); observer.start(initialRequest); - // Send an event - this should trigger an error when trying to send ack - observer.onNext(buildEventResponse("event-1", "pubsub", "topic", "Hello")); + observer.onNext(buildEventResponse("event-1", "Hello")); }); - // The event is emitted successfully, then the error occurs when sending ack StepVerifier.create(flux) .expectNext("Hello") // Event is emitted before ack .expectError(DaprException.class) // Then error when sending ack .verify(); } - // Helper methods - - private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest(String pubsubName, String topic) { + private DaprProtos.SubscribeTopicEventsRequestAlpha1 buildInitialRequest() { return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() .setInitialRequest( DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() - .setPubsubName(pubsubName) - .setTopic(topic) + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) .build() ) .build(); } - private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse( - String eventId, - String pubsubName, - String topic, - String data) { + private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse(String eventId, String data) { + try { byte[] serializedData = objectSerializer.serialize(data); return DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() .setEventMessage( DaprAppCallbackProtos.TopicEventRequest.newBuilder() .setId(eventId) - .setPubsubName(pubsubName) - .setTopic(topic) + .setPubsubName(PUBSUB_NAME) + .setTopic(TOPIC_NAME) .setData(ByteString.copyFrom(serializedData)) .build() ) @@ -442,7 +431,6 @@ private DaprProtos.SubscribeTopicEventsResponseAlpha1 buildEventResponse( } } - // Test class for complex object serialization public static class TestEvent { public String name; public int value; From 23c822d75a1dcff606da560360d77feaf02178d6 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Sun, 30 Nov 2025 11:32:54 -0800 Subject: [PATCH 09/12] Remove the unnecessary method Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 96e990dd25..d0e5c01b2d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -97,7 +97,6 @@ import io.dapr.utils.DefaultContentTypeConverter; import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; -import io.dapr.v1.DaprAppCallbackProtos; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.dapr.v1.DaprProtos.ActiveActorsCount; @@ -551,23 +550,6 @@ private Subscription buildSubscription( return subscription; } - @Nonnull - private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( - String id, SubscriptionListener.Status status) { - DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = - DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder() - .setId(id) - .setStatus( - DaprAppCallbackProtos.TopicEventResponse.newBuilder() - .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf( - status.name())) - .build()) - .build(); - return DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() - .setEventProcessed(eventProcessed) - .build(); - } - @Override public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) { try { From 018a8da9cbb9cd7c5bd73ed66db09b0842abb0c1 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Tue, 2 Dec 2025 23:35:21 -0800 Subject: [PATCH 10/12] Improve error handling and use CloudEvent wrapper Signed-off-by: Artur Ciocanu --- .../java/io/dapr/client/DaprClientImpl.java | 4 +- .../io/dapr/client/DaprPreviewClient.java | 5 +- .../EventSubscriberStreamObserver.java | 183 ++++++++++-------- .../client/DaprPreviewClientGrpcTest.java | 7 +- .../EventSubscriberStreamObserverTest.java | 102 ++++++++-- 5 files changed, 196 insertions(+), 105 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index d0e5c01b2d..0dfb1b644b 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -480,7 +480,7 @@ public Subscription subscribeToEvents( * {@inheritDoc} */ @Override - public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type) { + public Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type) { DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) @@ -506,7 +506,7 @@ public Flux subscribeToEvents(String pubsubName, String topic, TypeRef try { requestStream.onCompleted(); } catch (Exception e) { - // Ignore cleanup errors + logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage()); } }); }, FluxSink.OverflowStrategy.BUFFER); diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index ba97931117..545b8e5dc5 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -17,6 +17,7 @@ import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.ConversationRequest; import io.dapr.client.domain.ConversationRequestAlpha2; import io.dapr.client.domain.ConversationResponse; @@ -285,10 +286,10 @@ Subscription subscribeToEvents( * @param pubsubName Name of the pubsub component. * @param topic Name of the topic to subscribe to. * @param type Type for object deserialization. - * @return A Flux of deserialized event payloads. + * @return A Flux of CloudEvents containing deserialized event payloads and metadata. * @param Type of the event payload. */ - Flux subscribeToEvents(String pubsubName, String topic, TypeRef type); + Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type); /** * Schedules a job using the provided job request details. diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java index fb3eebda88..56131882b8 100644 --- a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java +++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 The Dapr Authors + * Copyright 2025 The Dapr Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,6 +13,7 @@ package io.dapr.internal.subscription; +import io.dapr.client.domain.CloudEvent; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.TypeRef; @@ -20,8 +21,12 @@ import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.FluxSink; +import java.io.IOException; + /** * StreamObserver implementation for subscribing to Dapr pub/sub events. * Thread Safety: This class relies on gRPC's StreamObserver contract, which guarantees that @@ -32,8 +37,10 @@ */ public class EventSubscriberStreamObserver implements StreamObserver { + private static final Logger logger = LoggerFactory.getLogger(EventSubscriberStreamObserver.class); + private final DaprGrpc.DaprStub stub; - private final FluxSink sink; + private final FluxSink> sink; private final TypeRef type; private final DaprObjectSerializer objectSerializer; @@ -43,13 +50,13 @@ public class EventSubscriberStreamObserver implements StreamObserver sink, + FluxSink> sink, TypeRef type, DaprObjectSerializer objectSerializer) { this.stub = stub; @@ -75,57 +82,23 @@ public StreamObserver start( @Override public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { - try { - if (response.getEventMessage() == null) { - return; - } - - DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); - String pubsubName = message.getPubsubName(); - - if (pubsubName == null || pubsubName.isEmpty()) { - return; - } - - String id = message.getId(); - - if (id == null || id.isEmpty()) { - return; - } - - // Deserialize the event data - T data = null; - - if (type != null) { - data = objectSerializer.deserialize(message.getData().toByteArray(), type); - } - - // Emit the data to the Flux (only if not null) - if (data != null) { - sink.next(data); - } + if (!isValidEventMessage(response)) { + return; + } - // Send SUCCESS acknowledgment directly - var ack = buildSuccessAck(id); + DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); + String eventId = message.getId(); - requestStream.onNext(ack); + try { + T data = deserializeEventData(message); + CloudEvent cloudEvent = buildCloudEvent(message, data); + emitEventAndAcknowledge(cloudEvent, eventId); + } catch (IOException e) { + // Deserialization failure - send DROP ack + handleDeserializationError(eventId, e); } catch (Exception e) { - // On error during processing, send RETRY acknowledgment - try { - var id = response.getEventMessage().getId(); - - if (id != null && !id.isEmpty()) { - var ack = buildRetryAck(id); - - requestStream.onNext(ack); - } - } catch (Exception ex) { - // If we can't send ack, propagate the error - sink.error(DaprException.propagate(ex)); - return; - } - - sink.error(DaprException.propagate(e)); + // Processing failure - send RETRY ack + handleProcessingError(eventId, e); } } @@ -139,44 +112,98 @@ public void onCompleted() { sink.complete(); } - /** - * Builds a SUCCESS acknowledgment request. - * - * @param eventId The ID of the event to acknowledge - * @return The acknowledgment request - */ + private boolean isValidEventMessage(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { + if (response.getEventMessage() == null) { + logger.debug("Received response with null event message, skipping"); + return false; + } + + DaprAppCallbackProtos.TopicEventRequest message = response.getEventMessage(); + + if (message.getPubsubName() == null || message.getPubsubName().isEmpty()) { + logger.debug("Received event with empty pubsub name, skipping"); + return false; + } + + if (message.getId() == null || message.getId().isEmpty()) { + logger.debug("Received event with empty ID, skipping"); + return false; + } + + return true; + } + + private T deserializeEventData(DaprAppCallbackProtos.TopicEventRequest message) throws IOException { + if (type == null) { + logger.debug("Type is null, skipping deserialization for event ID: {}", message.getId()); + return null; + } + + return objectSerializer.deserialize(message.getData().toByteArray(), type); + } + + private CloudEvent buildCloudEvent(DaprAppCallbackProtos.TopicEventRequest message, T data) { + CloudEvent cloudEvent = new CloudEvent<>(); + + cloudEvent.setId(message.getId()); + cloudEvent.setType(message.getType()); + cloudEvent.setSpecversion(message.getSpecVersion()); + cloudEvent.setDatacontenttype(message.getDataContentType()); + cloudEvent.setTopic(message.getTopic()); + cloudEvent.setPubsubName(message.getPubsubName()); + cloudEvent.setData(data); + + return cloudEvent; + } + + private void emitEventAndAcknowledge(CloudEvent cloudEvent, String eventId) { + sink.next(cloudEvent); + + // Send SUCCESS acknowledgment + requestStream.onNext(buildSuccessAck(eventId)); + } + + private void handleDeserializationError(String eventId, IOException cause) { + logger.error("Deserialization failed for event ID: {}, sending DROP ack", eventId, cause); + + // Send DROP ack - cannot process malformed data + requestStream.onNext(buildDropAck(eventId)); + + // Propagate error to sink + sink.error(new DaprException("DESERIALIZATION_ERROR", + "Failed to deserialize event with ID: " + eventId, cause)); + } + + private void handleProcessingError(String eventId, Exception cause) { + logger.error("Processing error for event ID: {}, attempting to send RETRY ack", eventId, cause); + + try { + // Try to send RETRY acknowledgment + requestStream.onNext(buildRetryAck(eventId)); + } catch (Exception ackException) { + // Failed to send ack - this is critical + logger.error("Failed to send RETRY ack for event ID: {}", eventId, ackException); + sink.error(DaprException.propagate(ackException)); + + return; + } + + // Propagate the original processing error + sink.error(DaprException.propagate(cause)); + } + private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildSuccessAck(String eventId) { return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS); } - /** - * Builds a RETRY acknowledgment request. - * - * @param eventId The ID of the event to acknowledge - * @return The acknowledgment request - */ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildRetryAck(String eventId) { return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY); } - /** - * Builds a DROP acknowledgment request. - * - * @param eventId The ID of the event to acknowledge - * @return The acknowledgment request - */ - @SuppressWarnings("unused") // May be used in the future private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildDropAck(String eventId) { return buildAckRequest(eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP); } - /** - * Builds an acknowledgment request with the specified status. - * - * @param eventId The ID of the event to acknowledge - * @param status The acknowledgment status (SUCCESS, RETRY, or DROP) - * @return The acknowledgment request - */ private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( String eventId, DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus status) { diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index 35bc300590..4bb59bb4c0 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -640,8 +640,11 @@ public void onCompleted() { final Semaphore gotAll = new Semaphore(0); var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING) - .doOnNext(eventData -> { - assertEquals(data, eventData); + .doOnNext(cloudEvent -> { + assertEquals(data, cloudEvent.getData()); + assertEquals("pubsubname", cloudEvent.getPubsubName()); + assertEquals("topic", cloudEvent.getTopic()); + assertNotNull(cloudEvent.getId()); int count = eventCount.incrementAndGet(); if (count >= numEvents) { gotAll.release(); diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java index 63981df2ea..7328f79e51 100644 --- a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java +++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 The Dapr Authors + * Copyright 2025 The Dapr Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -14,6 +14,7 @@ package io.dapr.internal.subscription; import com.google.protobuf.ByteString; +import io.dapr.client.domain.CloudEvent; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; @@ -61,7 +62,7 @@ void setUp() { @Test @DisplayName("Should successfully process events and send SUCCESS acks") void testSuccessfulEventProcessing() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -86,7 +87,12 @@ void testSuccessfulEventProcessing() { }); StepVerifier.create(flux) - .expectNext("Hello World") + .assertNext(cloudEvent -> { + assertEquals("Hello World", cloudEvent.getData()); + assertEquals("event-1", cloudEvent.getId()); + assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName()); + assertEquals(TOPIC_NAME, cloudEvent.getTopic()); + }) .verifyComplete(); ArgumentCaptor requestCaptor = @@ -109,7 +115,7 @@ void testSuccessfulEventProcessing() { @Test @DisplayName("Should handle multiple consecutive events correctly") void testMultipleEvents() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -129,18 +135,27 @@ void testMultipleEvents() { }); StepVerifier.create(flux) - .expectNext("Message 1") - .expectNext("Message 2") - .expectNext("Message 3") + .assertNext(cloudEvent -> { + assertEquals("Message 1", cloudEvent.getData()); + assertEquals("event-1", cloudEvent.getId()); + }) + .assertNext(cloudEvent -> { + assertEquals("Message 2", cloudEvent.getData()); + assertEquals("event-2", cloudEvent.getId()); + }) + .assertNext(cloudEvent -> { + assertEquals("Message 3", cloudEvent.getData()); + assertEquals("event-3", cloudEvent.getId()); + }) .verifyComplete(); verify(mockRequestStream, times(4)).onNext(any()); } @Test - @DisplayName("Should send RETRY ack when deserialization fails") + @DisplayName("Should send DROP ack when deserialization fails") void testDeserializationError() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -168,7 +183,10 @@ void testDeserializationError() { }); StepVerifier.create(flux) - .expectError(DaprException.class) + .expectErrorMatches(error -> + error instanceof DaprException + && error.getMessage().contains("DESERIALIZATION_ERROR") + && error.getMessage().contains("event-1")) .verify(); ArgumentCaptor requestCaptor = @@ -183,15 +201,46 @@ void testDeserializationError() { assertEquals(1, ackRequests.size()); assertEquals("event-1", ackRequests.get(0).getEventProcessed().getId()); assertEquals( - DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.RETRY, + DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.DROP, ackRequests.get(0).getEventProcessed().getStatus().getStatus() ); } + @Test + @DisplayName("Should send RETRY ack when non-deserialization error occurs") + void testProcessingError() { + Flux> flux = Flux.create(sink -> { + EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( + mockStub, + sink, + TypeRef.STRING, + objectSerializer + ); + + DaprProtos.SubscribeTopicEventsRequestAlpha1 initialRequest = buildInitialRequest(); + observer.start(initialRequest); + + // Simulate a processing error by throwing during sink.next() + sink.onRequest(n -> { + throw new RuntimeException("Processing error"); + }); + + observer.onNext(buildEventResponse("event-1", "Hello")); + }); + + StepVerifier.create(flux) + .expectError(RuntimeException.class) + .verify(); + + // Note: When error occurs in onRequest callback (before processing), + // no ack is sent as the error happens before we can handle the event + verify(mockRequestStream, times(1)).onNext(any()); // Only initial request sent + } + @Test @DisplayName("Should propagate gRPC errors as DaprException") void testGrpcError() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -214,7 +263,7 @@ void testGrpcError() { @Test @DisplayName("Should handle null event messages gracefully without emitting events") void testNullEventMessage() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -242,7 +291,7 @@ void testNullEventMessage() { @Test @DisplayName("Should skip events with empty pubsub name") void testEmptyPubsubName() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -278,7 +327,7 @@ void testEmptyPubsubName() { @Test @DisplayName("Should skip events with empty event ID") void testEmptyEventId() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -312,9 +361,9 @@ void testEmptyEventId() { } @Test - @DisplayName("Should handle null type parameter and skip event emission") + @DisplayName("Should handle null type parameter and emit CloudEvent with null data") void testNullData() { - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -331,6 +380,12 @@ void testNullData() { }); StepVerifier.create(flux) + .assertNext(cloudEvent -> { + assertNull(cloudEvent.getData()); + assertEquals("event-1", cloudEvent.getId()); + assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName()); + assertEquals(TOPIC_NAME, cloudEvent.getTopic()); + }) .verifyComplete(); verify(mockRequestStream, times(2)).onNext(any()); @@ -342,7 +397,7 @@ void testComplexObjectSerialization() throws IOException { TestEvent testEvent = new TestEvent("test-name", 42); byte[] serializedEvent = objectSerializer.serialize(testEvent); - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -370,7 +425,12 @@ void testComplexObjectSerialization() throws IOException { }); StepVerifier.create(flux) - .expectNextMatches(event -> event.name.equals("test-name") && event.value == 42) + .assertNext(cloudEvent -> { + TestEvent event = cloudEvent.getData(); + assertEquals("test-name", event.name); + assertEquals(42, event.value); + assertEquals("event-1", cloudEvent.getId()); + }) .verifyComplete(); } @@ -381,7 +441,7 @@ void testErrorDuringSendingAck() { .when(mockRequestStream) .onNext(argThat(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed)); - Flux flux = Flux.create(sink -> { + Flux> flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -396,7 +456,7 @@ void testErrorDuringSendingAck() { }); StepVerifier.create(flux) - .expectNext("Hello") // Event is emitted before ack + .assertNext(cloudEvent -> assertEquals("Hello", cloudEvent.getData())) // Event is emitted before ack .expectError(DaprException.class) // Then error when sending ack .verify(); } From 9821f8ee033cbe90f4139da48f79e159b8f8a1a6 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Wed, 3 Dec 2025 10:57:51 -0800 Subject: [PATCH 11/12] Fix unit tests asserts Signed-off-by: Artur Ciocanu --- .../client/DaprPreviewClientGrpcTest.java | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index 4bb59bb4c0..a42c4f946c 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -592,35 +592,41 @@ public void subscribeEventFluxTest() throws Exception { var pubsubName = "pubsubName"; var topicName = "topicName"; var data = "my message"; - var started = new Semaphore(0); doAnswer((Answer>) invocation -> { StreamObserver observer = (StreamObserver) invocation.getArguments()[0]; + var emitterThread = new Thread(() -> { try { started.acquire(); } catch (InterruptedException e) { throw new RuntimeException(e); } + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + for (int i = 0; i < numEvents; i++) { - observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + DaprProtos.SubscribeTopicEventsResponseAlpha1 reponse = + DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() - .setId(Integer.toString(i)) - .setPubsubName(pubsubName) - .setTopic(topicName) - .setData(ByteString.copyFromUtf8("\"" + data + "\"")) - .setDataContentType("application/json") - .build()) - .build()); + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build(); + observer.onNext(reponse); } + observer.onCompleted(); }); + emitterThread.start(); - return new StreamObserver<>() { + return new StreamObserver<>() { @Override public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { started.release(); @@ -628,24 +634,27 @@ public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEv @Override public void onError(Throwable throwable) { + // No-op } @Override public void onCompleted() { + // No-op } }; }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); final AtomicInteger eventCount = new AtomicInteger(0); final Semaphore gotAll = new Semaphore(0); - - var disposable = previewClient.subscribeToEvents("pubsubname", "topic", TypeRef.STRING) + var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING) .doOnNext(cloudEvent -> { assertEquals(data, cloudEvent.getData()); - assertEquals("pubsubname", cloudEvent.getPubsubName()); - assertEquals("topic", cloudEvent.getTopic()); + assertEquals(pubsubName, cloudEvent.getPubsubName()); + assertEquals(topicName, cloudEvent.getTopic()); assertNotNull(cloudEvent.getId()); + int count = eventCount.incrementAndGet(); + if (count >= numEvents) { gotAll.release(); } From 26d215342bf541135c200ba22fd085b8a4e1c010 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Thu, 4 Dec 2025 13:13:45 -0800 Subject: [PATCH 12/12] Adjust Java examples for Subscriber Signed-off-by: Artur Ciocanu --- .../io/dapr/examples/pubsub/stream/README.md | 30 +++++++----------- .../examples/pubsub/stream/Subscriber.java | 31 +++++++------------ .../java/io/dapr/client/Subscription.java | 1 + .../io/dapr/client/SubscriptionListener.java | 1 + 4 files changed, 25 insertions(+), 38 deletions(-) diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md index d9d41b3759..da3e4e2482 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. -In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription. +In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. ```java public class Subscriber { @@ -59,25 +59,19 @@ public class Subscriber { public static void main(String[] args) throws Exception { String topicName = getTopicName(args); try (var client = new DaprClientBuilder().buildPreviewClient()) { - var subscription = client.subscribeToEvents( + // Subscribe to events using the Flux-based reactive API + // The stream will emit CloudEvent objects as they arrive + client.subscribeToEvents( PUBSUB_NAME, topicName, - new SubscriptionListener<>() { - - @Override - public Mono onEvent(CloudEvent event) { - System.out.println("Subscriber got: " + event.getData()); - return Mono.just(Status.SUCCESS); - } - - @Override - public void onError(RuntimeException exception) { - System.out.println("Subscriber got exception: " + exception.getMessage()); - } - }, - TypeRef.STRING); - - subscription.awaitTermination(); + TypeRef.STRING) + .doOnNext(event -> { + System.out.println("Subscriber got: " + event.getData()); + }) + .doOnError(throwable -> { + System.out.println("Subscriber got exception: " + throwable.getMessage()); + }) + .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running) } } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java index 31678dce08..763bb436ce 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java @@ -14,10 +14,7 @@ package io.dapr.examples.pubsub.stream; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.SubscriptionListener; -import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; -import reactor.core.publisher.Mono; /** * Subscriber using bi-directional gRPC streaming, which does not require an app port. @@ -44,25 +41,19 @@ public class Subscriber { public static void main(String[] args) throws Exception { String topicName = getTopicName(args); try (var client = new DaprClientBuilder().buildPreviewClient()) { - var subscription = client.subscribeToEvents( + // Subscribe to events using the Flux-based reactive API + // The stream will emit CloudEvent objects as they arrive + client.subscribeToEvents( PUBSUB_NAME, topicName, - new SubscriptionListener<>() { - - @Override - public Mono onEvent(CloudEvent event) { - System.out.println("Subscriber got: " + event.getData()); - return Mono.just(Status.SUCCESS); - } - - @Override - public void onError(RuntimeException exception) { - System.out.println("Subscriber got exception: " + exception.getMessage()); - } - }, - TypeRef.STRING); - - subscription.awaitTermination(); + TypeRef.STRING) + .doOnNext(event -> { + System.out.println("Subscriber got: " + event.getData()); + }) + .doOnError(throwable -> { + System.out.println("Subscriber got exception: " + throwable.getMessage()); + }) + .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running) } } diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java index 53e89e8456..2cbd1e9b30 100644 --- a/sdk/src/main/java/io/dapr/client/Subscription.java +++ b/sdk/src/main/java/io/dapr/client/Subscription.java @@ -35,6 +35,7 @@ * Streaming subscription of events for Dapr's pubsub. * @param Application's object type. */ +@Deprecated public class Subscription implements Closeable { private final BlockingQueue ackQueue = new LinkedBlockingQueue<>(50); diff --git a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java index 5a467d69f4..c5420af602 100644 --- a/sdk/src/main/java/io/dapr/client/SubscriptionListener.java +++ b/sdk/src/main/java/io/dapr/client/SubscriptionListener.java @@ -20,6 +20,7 @@ * Callback interface to receive events from a streaming subscription of events. * @param Object type for deserialization. */ +@Deprecated public interface SubscriptionListener { /**