Skip to content

Commit 4a033d8

Browse files
committed
Updating the examples to use the latest changes.
Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com>
1 parent e5dc0ec commit 4a033d8

File tree

3 files changed

+160
-45
lines changed

3 files changed

+160
-45
lines changed

examples/src/main/java/io/dapr/examples/pubsub/stream/README.md

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture.
44

55
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.
6-
6+
77
## Pub-Sub Sample using the Java-SDK
88

99
This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5).
@@ -41,45 +41,80 @@ cd examples
4141

4242
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
4343

44-
### Running the subscriber
45-
46-
The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface.
44+
## Running the Subscriber
4745

46+
The subscriber uses the `DaprPreviewClient` interface to subscribe to events via streaming and process them using reactive operators.
4847

48+
The SDK provides two ways to subscribe to events:
4949

50-
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
50+
### Option 1: Raw Data Subscription
5151

52-
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux<CloudEvent<T>>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
52+
Use `TypeRef.STRING` (or any other type) to receive the deserialized message data directly:
5353

5454
```java
5555
public class Subscriber {
5656

57-
// ...
57+
public static void main(String[] args) throws Exception {
58+
try (var client = new DaprClientBuilder().buildPreviewClient()) {
59+
// Subscribe to events - receives raw String data directly
60+
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING)
61+
.doOnNext(message -> {
62+
System.out.println("Subscriber got: " + message);
63+
})
64+
.doOnError(throwable -> {
65+
System.out.println("Subscriber got exception: " + throwable.getMessage());
66+
})
67+
.blockLast();
68+
}
69+
}
70+
}
71+
```
72+
73+
### Option 2: CloudEvent Subscription
74+
75+
Use `TypeRef<CloudEvent<String>>` to receive the full CloudEvent with metadata (ID, source, type, etc.):
76+
77+
```java
78+
public class SubscriberCloudEvent {
5879

5980
public static void main(String[] args) throws Exception {
60-
String topicName = getTopicName(args);
6181
try (var client = new DaprClientBuilder().buildPreviewClient()) {
62-
// Subscribe to events using the Flux-based reactive API
63-
// The stream will emit CloudEvent<String> objects as they arrive
64-
client.subscribeToEvents(
65-
PUBSUB_NAME,
66-
topicName,
67-
TypeRef.STRING)
68-
.doOnNext(event -> {
69-
System.out.println("Subscriber got: " + event.getData());
82+
// Subscribe to events - receives CloudEvent<String> with full metadata
83+
client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
84+
.doOnNext(cloudEvent -> {
85+
System.out.println("Received CloudEvent:");
86+
System.out.println(" ID: " + cloudEvent.getId());
87+
System.out.println(" Type: " + cloudEvent.getType());
88+
System.out.println(" Data: " + cloudEvent.getData());
7089
})
7190
.doOnError(throwable -> {
7291
System.out.println("Subscriber got exception: " + throwable.getMessage());
7392
})
74-
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
93+
.blockLast();
7594
}
7695
}
77-
78-
// ...
7996
}
8097
```
8198

82-
Execute the following command to run the Subscriber example:
99+
### Subscription with Metadata
100+
101+
You can also pass metadata to the subscription, for example to enable raw payload mode:
102+
103+
```java
104+
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true"))
105+
.doOnNext(message -> {
106+
System.out.println("Subscriber got: " + message);
107+
})
108+
.blockLast();
109+
```
110+
111+
### Subscription Lifecycle
112+
113+
The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`.
114+
115+
## Running the Examples
116+
117+
Execute the following command to run the raw data Subscriber example:
83118

84119
<!-- STEP
85120
name: Run Subscriber
@@ -97,6 +132,12 @@ dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar t
97132

98133
<!-- END_STEP -->
99134

135+
Or run the CloudEvent Subscriber example:
136+
137+
```bash
138+
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent
139+
```
140+
100141
Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:
101142

102143
<!-- STEP

examples/src/main/java/io/dapr/examples/pubsub/stream/Subscriber.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,51 +18,47 @@
1818

1919
/**
2020
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
21-
* 1. Build and install jars:
22-
* mvn clean install
23-
* 2. cd [repo root]/examples
24-
* 3. Run the subscriber:
25-
* dapr run --resources-path ./components/pubsub --app-id subscriber -- \
26-
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
21+
*
22+
* <p>This example demonstrates subscribing to raw message data directly.
23+
* For CloudEvent subscription with metadata, see {@link SubscriberCloudEvent}.
24+
*
25+
* <p>Usage:
26+
* <ol>
27+
* <li>Build and install jars: {@code mvn clean install}
28+
* <li>cd [repo root]/examples
29+
* <li>Run the subscriber:
30+
* {@code dapr run --resources-path ./components/pubsub --app-id subscriber -- \
31+
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber}
32+
* </ol>
2733
*/
2834
public class Subscriber {
2935

30-
//The title of the topic to be used for publishing
3136
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
32-
33-
//The name of the pubsub
3437
private static final String PUBSUB_NAME = "messagebus";
3538

3639
/**
37-
* This is the entry point for this example app, which subscribes to a topic.
40+
* Main entry point for the raw data subscriber example.
41+
*
3842
* @param args Used to optionally pass a topic name.
3943
* @throws Exception An Exception on startup.
4044
*/
4145
public static void main(String[] args) throws Exception {
4246
String topicName = getTopicName(args);
4347
try (var client = new DaprClientBuilder().buildPreviewClient()) {
44-
// Subscribe to events using the Flux-based reactive API
45-
// The stream will emit CloudEvent<String> objects as they arrive
46-
client.subscribeToEvents(
47-
PUBSUB_NAME,
48-
topicName,
49-
TypeRef.STRING)
50-
.doOnNext(event -> {
51-
System.out.println("Subscriber got: " + event.getData());
48+
System.out.println("Subscribing to topic: " + topicName);
49+
50+
// Subscribe to events - receives raw String data directly
51+
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING)
52+
.doOnNext(message -> {
53+
System.out.println("Subscriber got: " + message);
5254
})
5355
.doOnError(throwable -> {
5456
System.out.println("Subscriber got exception: " + throwable.getMessage());
5557
})
56-
.blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running)
58+
.blockLast();
5759
}
5860
}
5961

60-
/**
61-
* If a topic is specified in args, use that.
62-
* Else, fallback to the default topic.
63-
* @param args program arguments
64-
* @return name of the topic to publish messages to.
65-
*/
6662
private static String getTopicName(String[] args) {
6763
if (args.length >= 1) {
6864
return args[0];
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.examples.pubsub.stream;
15+
16+
import io.dapr.client.DaprClientBuilder;
17+
import io.dapr.client.domain.CloudEvent;
18+
import io.dapr.utils.TypeRef;
19+
20+
/**
21+
* Subscriber using bi-directional gRPC streaming with CloudEvent metadata.
22+
*
23+
* <p>This example demonstrates subscribing to CloudEvent objects which include
24+
* metadata like event ID, source, type, and other CloudEvents specification fields.
25+
* For raw message data subscription, see {@link Subscriber}.
26+
*
27+
* <p>Usage:
28+
* <ol>
29+
* <li>Build and install jars: {@code mvn clean install}
30+
* <li>cd [repo root]/examples
31+
* <li>Run the subscriber:
32+
* {@code dapr run --resources-path ./components/pubsub --app-id subscriber -- \
33+
* java -jar target/dapr-java-sdk-examples-exec.jar \
34+
* io.dapr.examples.pubsub.stream.SubscriberCloudEvent}
35+
* </ol>
36+
*/
37+
public class SubscriberCloudEvent {
38+
39+
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
40+
private static final String PUBSUB_NAME = "messagebus";
41+
42+
/**
43+
* Main entry point for the CloudEvent subscriber example.
44+
*
45+
* @param args Used to optionally pass a topic name.
46+
* @throws Exception An Exception on startup.
47+
*/
48+
public static void main(String[] args) throws Exception {
49+
String topicName = getTopicName(args);
50+
try (var client = new DaprClientBuilder().buildPreviewClient()) {
51+
System.out.println("Subscribing to topic: " + topicName + " (CloudEvent mode)");
52+
53+
// Subscribe to events - receives CloudEvent<String> with full metadata
54+
// Use TypeRef<CloudEvent<String>> to get CloudEvent wrapper with metadata
55+
client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
56+
.doOnNext(cloudEvent -> {
57+
System.out.println("Received CloudEvent:");
58+
System.out.println(" ID: " + cloudEvent.getId());
59+
System.out.println(" Source: " + cloudEvent.getSource());
60+
System.out.println(" Type: " + cloudEvent.getType());
61+
System.out.println(" Topic: " + cloudEvent.getTopic());
62+
System.out.println(" PubSub: " + cloudEvent.getPubsubName());
63+
System.out.println(" Data: " + cloudEvent.getData());
64+
})
65+
.doOnError(throwable -> {
66+
System.out.println("Subscriber got exception: " + throwable.getMessage());
67+
})
68+
.blockLast();
69+
}
70+
}
71+
72+
private static String getTopicName(String[] args) {
73+
if (args.length >= 1) {
74+
return args[0];
75+
}
76+
return DEFAULT_TOPIC_NAME;
77+
}
78+
}

0 commit comments

Comments
 (0)