Skip to content

Commit 75e13de

Browse files
[1.16] Flux subscribeToEvents method (#1613)
* Adding a Flux based subscribeToEvents method (#1598) * Adding a Flux based subscribeToEvents method Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Simplify GRPC stream handling Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Simplify Javadoc Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Fix unit tests and simplify implementation Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Adding event subscriber stream observer to simplify subscription logic Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Use start() method to start stream subscription Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Add unit test for event suscriber observer Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Improve the tests a little bit Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Remove the unnecessary method Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Improve error handling and use CloudEvent wrapper Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Fix unit tests asserts Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * Adjust Java examples for Subscriber Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> --------- Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> * rm docker bits Signed-off-by: Cassandra Coyle <cassie@diagrid.io> * try eclipse-temurin image Signed-off-by: Cassandra Coyle <cassie@diagrid.io> --------- Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com> Signed-off-by: Cassandra Coyle <cassie@diagrid.io> Co-authored-by: artur-ciocanu <artur.ciocanu@gmail.com>
1 parent b0594de commit 75e13de

File tree

12 files changed

+894
-91
lines changed

12 files changed

+894
-91
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,9 @@ jobs:
7474
DAPR_REF:
7575
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
7676
steps:
77-
- name: Install Stable Docker
78-
id: setup_docker
79-
uses: docker/setup-docker-action@v4
80-
- name: Check Docker version
81-
run: docker version
8277
- uses: actions/checkout@v5
78+
- name: Check Docker version
79+
run: docker version
8380
- name: Set up OpenJDK ${{ env.JDK_VER }}
8481
uses: actions/setup-java@v5
8582
with:
@@ -146,8 +143,6 @@ jobs:
146143
- name: Integration tests using spring boot version ${{ matrix.spring-boot-version }}
147144
id: integration_tests
148145
run: PRODUCT_SPRING_BOOT_VERSION=${{ matrix.spring-boot-version }} ./mvnw -B -Pintegration-tests dependency:copy-dependencies verify
149-
env:
150-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
151146
- name: Upload failsafe test report for sdk-tests on failure
152147
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
153148
uses: actions/upload-artifact@v4

.github/workflows/validate.yml

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ jobs:
4949
with:
5050
distribution: 'temurin'
5151
java-version: ${{ env.JDK_VER }}
52-
- name: Install Stable Docker
53-
id: setup_docker
54-
uses: docker/setup-docker-action@v4
5552
- name: Check Docker version
5653
run: docker version
5754
- name: Set up Dapr CLI
@@ -109,114 +106,75 @@ jobs:
109106
run: sleep 30 && docker logs dapr_scheduler && nc -vz localhost 50006
110107
- name: Install jars
111108
run: ./mvnw clean install -DskipTests -q
112-
env:
113-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
114109
- name: Validate workflows example
115110
working-directory: ./examples
116111
run: |
117112
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
118-
env:
119-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
120113
- name: Validate Spring Boot examples
121114
working-directory: ./spring-boot-examples
122115
run: |
123116
mm.py README.md
124-
env:
125-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
126117
- name: Validate Spring Boot Workflow examples
127118
working-directory: ./spring-boot-examples/workflows
128119
run: |
129120
mm.py README.md
130-
env:
131-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
132121
- name: Validate Jobs example
133122
working-directory: ./examples
134123
run: |
135124
mm.py ./src/main/java/io/dapr/examples/jobs/README.md
136-
env:
137-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
138125
- name: Validate conversation ai example
139126
working-directory: ./examples
140127
run: |
141128
mm.py ./src/main/java/io/dapr/examples/conversation/README.md
142-
env:
143-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
144129
- name: Validate invoke http example
145130
working-directory: ./examples
146131
run: |
147132
mm.py ./src/main/java/io/dapr/examples/invoke/http/README.md
148-
env:
149-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
150133
- name: Validate invoke grpc example
151134
working-directory: ./examples
152135
run: |
153136
mm.py ./src/main/java/io/dapr/examples/invoke/grpc/README.md
154-
env:
155-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
156137
- name: Validate tracing example
157138
working-directory: ./examples
158139
run: |
159140
mm.py ./src/main/java/io/dapr/examples/tracing/README.md
160-
env:
161-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
162141
- name: Validate expection handling example
163142
working-directory: ./examples
164143
run: |
165144
mm.py ./src/main/java/io/dapr/examples/exception/README.md
166-
env:
167-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
168145
- name: Validate state example
169146
working-directory: ./examples
170147
run: |
171148
mm.py ./src/main/java/io/dapr/examples/state/README.md
172-
env:
173-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
174149
- name: Validate pubsub example
175150
working-directory: ./examples
176151
run: |
177152
mm.py ./src/main/java/io/dapr/examples/pubsub/README.md
178-
env:
179-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
180153
- name: Validate bindings HTTP example
181154
working-directory: ./examples
182155
run: |
183156
mm.py ./src/main/java/io/dapr/examples/bindings/http/README.md
184-
env:
185-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
186157
- name: Validate secrets example
187158
working-directory: ./examples
188159
run: |
189160
mm.py ./src/main/java/io/dapr/examples/secrets/README.md
190-
env:
191-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
192161
- name: Validate unit testing example
193162
working-directory: ./examples
194163
run: |
195164
mm.py ./src/main/java/io/dapr/examples/unittesting/README.md
196-
env:
197-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
198165
- name: Validate Configuration API example
199166
working-directory: ./examples
200167
run: |
201168
mm.py ./src/main/java/io/dapr/examples/configuration/README.md
202-
env:
203-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
204169
- name: Validate actors example
205170
working-directory: ./examples
206171
run: |
207172
mm.py ./src/main/java/io/dapr/examples/actors/README.md
208-
env:
209-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
210173
- name: Validate query state HTTP example
211174
working-directory: ./examples
212175
run: |
213176
mm.py ./src/main/java/io/dapr/examples/querystate/README.md
214-
env:
215-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
216177
- name: Validate streaming subscription example
217178
working-directory: ./examples
218179
run: |
219180
mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md
220-
env:
221-
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
222-

examples/src/main/java/io/dapr/examples/pubsub/stream/README.md

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where
4949

5050
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
5151

52-
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
52+
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux<CloudEvent<T>>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
5353

5454
```java
5555
public class Subscriber {
@@ -59,25 +59,19 @@ public class Subscriber {
5959
public static void main(String[] args) throws Exception {
6060
String topicName = getTopicName(args);
6161
try (var client = new DaprClientBuilder().buildPreviewClient()) {
62-
var subscription = client.subscribeToEvents(
62+
// Subscribe to events using the Flux-based reactive API
63+
// The stream will emit CloudEvent<String> objects as they arrive
64+
client.subscribeToEvents(
6365
PUBSUB_NAME,
6466
topicName,
65-
new SubscriptionListener<>() {
66-
67-
@Override
68-
public Mono<Status> onEvent(CloudEvent<String> event) {
69-
System.out.println("Subscriber got: " + event.getData());
70-
return Mono.just(Status.SUCCESS);
71-
}
72-
73-
@Override
74-
public void onError(RuntimeException exception) {
75-
System.out.println("Subscriber got exception: " + exception.getMessage());
76-
}
77-
},
78-
TypeRef.STRING);
79-
80-
subscription.awaitTermination();
67+
TypeRef.STRING)
68+
.doOnNext(event -> {
69+
System.out.println("Subscriber got: " + event.getData());
70+
})
71+
.doOnError(throwable -> {
72+
System.out.println("Subscriber got exception: " + throwable.getMessage());
73+
})
74+
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
8175
}
8276
}
8377

examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@
1414
package io.dapr.examples.pubsub.stream;
1515

1616
import io.dapr.client.DaprClientBuilder;
17-
import io.dapr.client.SubscriptionListener;
18-
import io.dapr.client.domain.CloudEvent;
1917
import io.dapr.utils.TypeRef;
20-
import reactor.core.publisher.Mono;
2118

2219
/**
2320
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
@@ -44,25 +41,19 @@ public class Subscriber {
4441
public static void main(String[] args) throws Exception {
4542
String topicName = getTopicName(args);
4643
try (var client = new DaprClientBuilder().buildPreviewClient()) {
47-
var subscription = client.subscribeToEvents(
44+
// Subscribe to events using the Flux-based reactive API
45+
// The stream will emit CloudEvent<String> objects as they arrive
46+
client.subscribeToEvents(
4847
PUBSUB_NAME,
4948
topicName,
50-
new SubscriptionListener<>() {
51-
52-
@Override
53-
public Mono<Status> onEvent(CloudEvent<String> event) {
54-
System.out.println("Subscriber got: " + event.getData());
55-
return Mono.just(Status.SUCCESS);
56-
}
57-
58-
@Override
59-
public void onError(RuntimeException exception) {
60-
System.out.println("Subscriber got exception: " + exception.getMessage());
61-
}
62-
},
63-
TypeRef.STRING);
64-
65-
subscription.awaitTermination();
49+
TypeRef.STRING)
50+
.doOnNext(event -> {
51+
System.out.println("Subscriber got: " + event.getData());
52+
})
53+
.doOnError(throwable -> {
54+
System.out.println("Subscriber got exception: " + throwable.getMessage());
55+
})
56+
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
6657
}
6758
}
6859

sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/crossapp/WorkflowsCrossAppCallActivityIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.testcontainers.containers.Network;
2929
import org.testcontainers.junit.jupiter.Container;
3030
import org.testcontainers.junit.jupiter.Testcontainers;
31+
import org.testcontainers.utility.DockerImageName;
3132
import org.testcontainers.utility.MountableFile;
3233
import org.testcontainers.containers.GenericContainer;
3334
import org.testcontainers.containers.wait.strategy.Wait;
@@ -55,6 +56,7 @@
5556
public class WorkflowsCrossAppCallActivityIT {
5657

5758
private static final Network DAPR_NETWORK = Network.newNetwork();
59+
private static final DockerImageName JAVA_WORKER_IMAGE = DockerImageName.parse("eclipse-temurin:17-jdk");
5860

5961
@Container
6062
private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG)
@@ -113,7 +115,7 @@ public class WorkflowsCrossAppCallActivityIT {
113115

114116
// TestContainers for each app
115117
@Container
116-
private static GenericContainer<?> crossappWorker = new GenericContainer<>("openjdk:17-jdk-slim")
118+
private static GenericContainer<?> crossappWorker = new GenericContainer<>(JAVA_WORKER_IMAGE)
117119
.withCopyFileToContainer(MountableFile.forHostPath("target"), "/app")
118120
.withWorkingDirectory("/app")
119121
.withCommand("java", "-cp", "test-classes:classes:dependency/*:*",
@@ -127,7 +129,7 @@ public class WorkflowsCrossAppCallActivityIT {
127129
.withLogConsumer(outputFrame -> System.out.println("CrossAppWorker: " + outputFrame.getUtf8String()));
128130

129131
@Container
130-
private final static GenericContainer<?> app2Worker = new GenericContainer<>("openjdk:17-jdk-slim")
132+
private final static GenericContainer<?> app2Worker = new GenericContainer<>(JAVA_WORKER_IMAGE)
131133
.withCopyFileToContainer(MountableFile.forHostPath("target"), "/app")
132134
.withWorkingDirectory("/app")
133135
.withCommand("java", "-cp", "test-classes:classes:dependency/*:*",
@@ -141,7 +143,7 @@ public class WorkflowsCrossAppCallActivityIT {
141143
.withLogConsumer(outputFrame -> System.out.println("App2Worker: " + outputFrame.getUtf8String()));
142144

143145
@Container
144-
private final static GenericContainer<?> app3Worker = new GenericContainer<>("openjdk:17-jdk-slim")
146+
private final static GenericContainer<?> app3Worker = new GenericContainer<>(JAVA_WORKER_IMAGE)
145147
.withCopyFileToContainer(MountableFile.forHostPath("target"), "/app")
146148
.withWorkingDirectory("/app")
147149
.withCommand("java", "-cp", "test-classes:classes:dependency/*:*",

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
9292
import io.dapr.internal.resiliency.RetryPolicy;
9393
import io.dapr.internal.resiliency.TimeoutPolicy;
94+
import io.dapr.internal.subscription.EventSubscriberStreamObserver;
9495
import io.dapr.serializer.DaprObjectSerializer;
9596
import io.dapr.serializer.DefaultObjectSerializer;
9697
import io.dapr.utils.DefaultContentTypeConverter;
@@ -475,6 +476,42 @@ public <T> Subscription subscribeToEvents(
475476
return buildSubscription(listener, type, request);
476477
}
477478

479+
/**
480+
* {@inheritDoc}
481+
*/
482+
@Override
483+
public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
484+
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
485+
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
486+
.setTopic(topic)
487+
.setPubsubName(pubsubName)
488+
.build();
489+
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
490+
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
491+
.setInitialRequest(initialRequest)
492+
.build();
493+
494+
return Flux.create(sink -> {
495+
DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
496+
EventSubscriberStreamObserver<T> eventSubscriber = new EventSubscriberStreamObserver<>(
497+
interceptedStub,
498+
sink,
499+
type,
500+
this.objectSerializer
501+
);
502+
StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1> requestStream = eventSubscriber.start(request);
503+
504+
// Cleanup when Flux is cancelled or completed
505+
sink.onDispose(() -> {
506+
try {
507+
requestStream.onCompleted();
508+
} catch (Exception e) {
509+
logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage());
510+
}
511+
});
512+
}, FluxSink.OverflowStrategy.BUFFER);
513+
}
514+
478515
@Nonnull
479516
private <T> Subscription<T> buildSubscription(
480517
SubscriptionListener<T> listener,

sdk/src/main/java/io/dapr/client/DaprPreviewClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.dapr.client.domain.BulkPublishRequest;
1818
import io.dapr.client.domain.BulkPublishResponse;
1919
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
20+
import io.dapr.client.domain.CloudEvent;
2021
import io.dapr.client.domain.ConversationRequest;
2122
import io.dapr.client.domain.ConversationRequestAlpha2;
2223
import io.dapr.client.domain.ConversationResponse;
@@ -32,6 +33,7 @@
3233
import io.dapr.client.domain.UnlockResponseStatus;
3334
import io.dapr.client.domain.query.Query;
3435
import io.dapr.utils.TypeRef;
36+
import reactor.core.publisher.Flux;
3537
import reactor.core.publisher.Mono;
3638

3739
import java.util.List;
@@ -271,12 +273,24 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
271273
* @param topic Name of the topic to subscribe to.
272274
* @param listener Callback methods to process events.
273275
* @param type Type for object deserialization.
274-
* @return An active subscription.
275276
* @param <T> Type of object deserialization.
277+
* @return An active subscription.
278+
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
276279
*/
280+
@Deprecated
277281
<T> Subscription subscribeToEvents(
278282
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);
279283

284+
/**
285+
* Subscribe to pubsub events via streaming using Project Reactor Flux.
286+
* @param pubsubName Name of the pubsub component.
287+
* @param topic Name of the topic to subscribe to.
288+
* @param type Type for object deserialization.
289+
* @return A Flux of CloudEvents containing deserialized event payloads and metadata.
290+
* @param <T> Type of the event payload.
291+
*/
292+
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);
293+
280294
/**
281295
* Schedules a job using the provided job request details.
282296
*

sdk/src/main/java/io/dapr/client/Subscription.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* Streaming subscription of events for Dapr's pubsub.
3636
* @param <T> Application's object type.
3737
*/
38+
@Deprecated
3839
public class Subscription<T> implements Closeable {
3940

4041
private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<>(50);

sdk/src/main/java/io/dapr/client/SubscriptionListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* Callback interface to receive events from a streaming subscription of events.
2121
* @param <T> Object type for deserialization.
2222
*/
23+
@Deprecated
2324
public interface SubscriptionListener<T> {
2425

2526
/**

0 commit comments

Comments
 (0)