Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions examples/src/main/java/io/dapr/examples/pubsub/stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The subscriber uses the `DaprPreviewClient` interface to use a new feature where

The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.

In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux<CloudEvent<T>>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.

```java
public class Subscriber {
Expand All @@ -59,25 +59,19 @@ public class Subscriber {
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
// Subscribe to events using the Flux-based reactive API
// The stream will emit CloudEvent<String> objects as they arrive
client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {

@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);

subscription.awaitTermination();
TypeRef.STRING)
.doOnNext(event -> {
System.out.println("Subscriber got: " + event.getData());
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package io.dapr.examples.pubsub.stream;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;

/**
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
Expand All @@ -44,25 +41,19 @@ public class Subscriber {
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
// Subscribe to events using the Flux-based reactive API
// The stream will emit CloudEvent<String> objects as they arrive
client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {

@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}

@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);

subscription.awaitTermination();
TypeRef.STRING)
.doOnNext(event -> {
System.out.println("Subscriber got: " + event.getData());
})
.doOnError(throwable -> {
System.out.println("Subscriber got exception: " + throwable.getMessage());
})
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
}
}

Expand Down
37 changes: 37 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.internal.subscription.EventSubscriberStreamObserver;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
Expand Down Expand Up @@ -475,6 +476,42 @@ public <T> Subscription subscribeToEvents(
return buildSubscription(listener, type, request);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.build();
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(initialRequest)
.build();

return Flux.create(sink -> {
DaprGrpc.DaprStub interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
EventSubscriberStreamObserver<T> eventSubscriber = new EventSubscriberStreamObserver<>(
interceptedStub,
sink,
type,
this.objectSerializer
);
StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1> requestStream = eventSubscriber.start(request);

// Cleanup when Flux is cancelled or completed
sink.onDispose(() -> {
try {
requestStream.onCompleted();
} catch (Exception e) {
logger.debug("Completing the subscription stream resulted in an error: {}", e.getMessage());
}
});
}, FluxSink.OverflowStrategy.BUFFER);
}

@Nonnull
private <T> Subscription<T> buildSubscription(
SubscriptionListener<T> listener,
Expand Down
16 changes: 15 additions & 1 deletion sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.ConversationRequest;
import io.dapr.client.domain.ConversationRequestAlpha2;
import io.dapr.client.domain.ConversationResponse;
Expand All @@ -32,6 +33,7 @@
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
Expand Down Expand Up @@ -271,12 +273,24 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
* @param topic Name of the topic to subscribe to.
* @param listener Callback methods to process events.
* @param type Type for object deserialization.
* @return An active subscription.
* @param <T> Type of object deserialization.
* @return An active subscription.
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
*/
@Deprecated
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux.
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param type Type for object deserialization.
* @return A Flux of CloudEvents containing deserialized event payloads and metadata.
* @param <T> Type of the event payload.
*/
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);

/**
* Schedules a job using the provided job request details.
*
Expand Down
1 change: 1 addition & 0 deletions sdk/src/main/java/io/dapr/client/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Streaming subscription of events for Dapr's pubsub.
* @param <T> Application's object type.
*/
@Deprecated
public class Subscription<T> implements Closeable {

private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<>(50);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Callback interface to receive events from a streaming subscription of events.
* @param <T> Object type for deserialization.
*/
@Deprecated
public interface SubscriptionListener<T> {

/**
Expand Down
Loading
Loading