Skip to content

Commit e5dc0ec

Browse files
committed
Use proper method overloads for subscribeToEvents()
Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com>
1 parent 2e9e35b commit e5dc0ec

File tree

6 files changed

+189
-100
lines changed

6 files changed

+189
-100
lines changed

sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java

Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Collections;
2929
import java.util.HashSet;
3030
import java.util.List;
31+
import java.util.Map;
3132
import java.util.Random;
3233
import java.util.Set;
3334
import java.util.UUID;
@@ -126,9 +127,9 @@ public void onError(RuntimeException exception) {
126127
}
127128

128129
@Test
129-
public void testPubSubRawData() throws Exception {
130+
public void testPubSubFlux() throws Exception {
130131
final DaprRun daprRun = closeLater(startDaprApp(
131-
this.getClass().getSimpleName() + "-rawdata",
132+
this.getClass().getSimpleName() + "-flux",
132133
60000));
133134

134135
var runId = UUID.randomUUID().toString();
@@ -137,18 +138,18 @@ public void testPubSubRawData() throws Exception {
137138

138139
// Publish messages
139140
for (int i = 0; i < NUM_MESSAGES; i++) {
140-
String message = String.format("Raw message #%d for run %s", i, runId);
141+
String message = String.format("Flux message #%d for run %s", i, runId);
141142
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
142143
System.out.println(
143-
String.format("Published raw message: '%s' to topic '%s'", message, TOPIC_NAME));
144+
String.format("Published flux message: '%s' to topic '%s'", message, TOPIC_NAME));
144145
}
145146

146-
System.out.println("Starting raw data subscription for " + TOPIC_NAME);
147+
System.out.println("Starting Flux subscription for " + TOPIC_NAME);
147148

148149
Set<String> messages = Collections.synchronizedSet(new HashSet<>());
149150

150-
// Use new subscribeToEventsData - receives String directly, not CloudEvent<String>
151-
var disposable = previewClient.subscribeToEventsData(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING)
151+
// subscribeToEvents now returns Flux<T> directly (raw data)
152+
var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING)
152153
.doOnNext(rawMessage -> {
153154
// rawMessage is String directly
154155
if (rawMessage.contains(runId)) {
@@ -161,7 +162,96 @@ public void testPubSubRawData() throws Exception {
161162
callWithRetry(() -> {
162163
var messageCount = messages.size();
163164
System.out.println(
164-
String.format("Got %d raw messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME));
165+
String.format("Got %d flux messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME));
166+
assertEquals(NUM_MESSAGES, messages.size());
167+
}, 60000);
168+
169+
disposable.dispose();
170+
}
171+
}
172+
173+
@Test
174+
public void testPubSubCloudEvent() throws Exception {
175+
final DaprRun daprRun = closeLater(startDaprApp(
176+
this.getClass().getSimpleName() + "-cloudevent",
177+
60000));
178+
179+
var runId = UUID.randomUUID().toString();
180+
try (DaprClient client = daprRun.newDaprClient();
181+
DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) {
182+
183+
// Publish messages
184+
for (int i = 0; i < NUM_MESSAGES; i++) {
185+
String message = String.format("CloudEvent message #%d for run %s", i, runId);
186+
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
187+
System.out.println(
188+
String.format("Published CloudEvent message: '%s' to topic '%s'", message, TOPIC_NAME));
189+
}
190+
191+
System.out.println("Starting CloudEvent subscription for " + TOPIC_NAME);
192+
193+
Set<String> messageIds = Collections.synchronizedSet(new HashSet<>());
194+
195+
// Use TypeRef<CloudEvent<String>> to receive full CloudEvent with metadata
196+
var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, new TypeRef<CloudEvent<String>>(){})
197+
.doOnNext(cloudEvent -> {
198+
if (cloudEvent.getData() != null && cloudEvent.getData().contains(runId)) {
199+
messageIds.add(cloudEvent.getId());
200+
System.out.println("Received CloudEvent with ID: " + cloudEvent.getId()
201+
+ ", topic: " + cloudEvent.getTopic()
202+
+ ", data: " + cloudEvent.getData());
203+
}
204+
})
205+
.subscribe();
206+
207+
callWithRetry(() -> {
208+
var messageCount = messageIds.size();
209+
System.out.println(
210+
String.format("Got %d CloudEvent messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME));
211+
assertEquals(NUM_MESSAGES, messageIds.size());
212+
}, 60000);
213+
214+
disposable.dispose();
215+
}
216+
}
217+
218+
@Test
219+
public void testPubSubRawPayload() throws Exception {
220+
final DaprRun daprRun = closeLater(startDaprApp(
221+
this.getClass().getSimpleName() + "-rawpayload",
222+
60000));
223+
224+
var runId = UUID.randomUUID().toString();
225+
try (DaprClient client = daprRun.newDaprClient();
226+
DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) {
227+
228+
// Publish messages with rawPayload metadata
229+
for (int i = 0; i < NUM_MESSAGES; i++) {
230+
String message = String.format("RawPayload message #%d for run %s", i, runId);
231+
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message, Map.of("rawPayload", "true")).block();
232+
System.out.println(
233+
String.format("Published raw payload message: '%s' to topic '%s'", message, TOPIC_NAME));
234+
}
235+
236+
System.out.println("Starting raw payload subscription for " + TOPIC_NAME);
237+
238+
Set<String> messages = Collections.synchronizedSet(new HashSet<>());
239+
Map<String, String> metadata = Map.of("rawPayload", "true");
240+
241+
// Use subscribeToEvents with rawPayload metadata
242+
var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING, metadata)
243+
.doOnNext(rawMessage -> {
244+
if (rawMessage.contains(runId)) {
245+
messages.add(rawMessage);
246+
System.out.println("Received raw payload message: " + rawMessage);
247+
}
248+
})
249+
.subscribe();
250+
251+
callWithRetry(() -> {
252+
var messageCount = messages.size();
253+
System.out.println(
254+
String.format("Got %d raw payload messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME));
165255
assertEquals(NUM_MESSAGES, messages.size());
166256
}, 60000);
167257

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -482,15 +482,27 @@ public <T> Subscription subscribeToEvents(
482482
* {@inheritDoc}
483483
*/
484484
@Override
485-
public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
486-
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
485+
public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
486+
return subscribeToEvents(pubsubName, topic, type, null);
487+
}
488+
489+
/**
490+
* {@inheritDoc}
491+
*/
492+
@Override
493+
public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata) {
494+
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
487495
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
488496
.setTopic(topic)
489-
.setPubsubName(pubsubName)
490-
.build();
497+
.setPubsubName(pubsubName);
498+
499+
if (metadata != null && !metadata.isEmpty()) {
500+
initialRequestBuilder.putAllMetadata(metadata);
501+
}
502+
491503
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
492504
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
493-
.setInitialRequest(initialRequest)
505+
.setInitialRequest(initialRequestBuilder.build())
494506
.build();
495507

496508
return Flux.create(sink -> {
@@ -514,15 +526,6 @@ public <T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic
514526
}, FluxSink.OverflowStrategy.BUFFER);
515527
}
516528

517-
/**
518-
* {@inheritDoc}
519-
*/
520-
@Override
521-
public <T> Flux<T> subscribeToEventsData(String pubsubName, String topic, TypeRef<T> type) {
522-
return subscribeToEvents(pubsubName, topic, type)
523-
.map(CloudEvent::getData);
524-
}
525-
526529
@Nonnull
527530
private <T> Subscription<T> buildSubscription(
528531
SubscriptionListener<T> listener,

sdk/src/main/java/io/dapr/client/DaprPreviewClient.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -285,25 +285,36 @@ <T> Subscription subscribeToEvents(
285285

286286
/**
287287
* Subscribe to pubsub events via streaming using Project Reactor Flux.
288+
*
289+
* <p>The type parameter determines what is deserialized from the event data:
290+
* <ul>
291+
* <li>Use {@code TypeRef.STRING} or similar for raw payload data</li>
292+
* <li>Use {@code new TypeRef<CloudEvent<String>>(){}} to receive CloudEvent with metadata</li>
293+
* </ul>
294+
*
288295
* @param pubsubName Name of the pubsub component.
289296
* @param topic Name of the topic to subscribe to.
290297
* @param type Type for object deserialization.
291-
* @return A Flux of CloudEvents containing deserialized event payloads and metadata.
298+
* @return A Flux of deserialized event payloads.
292299
* @param <T> Type of the event payload.
293300
*/
294-
<T> Flux<CloudEvent<T>> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);
301+
<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);
295302

296303
/**
297-
* Subscribe to pubsub events via streaming using Project Reactor Flux.
298-
* Returns only the deserialized event data without CloudEvent metadata wrapper.
304+
* Subscribe to pubsub events via streaming using Project Reactor Flux with metadata support.
305+
*
306+
* <p>If metadata is null or empty, this method delegates to {@link #subscribeToEvents(String, String, TypeRef)}.
307+
* Use metadata {@code {"rawPayload": "true"}} for raw payload subscriptions where Dapr
308+
* delivers messages without CloudEvent wrapping.
299309
*
300310
* @param pubsubName Name of the pubsub component.
301311
* @param topic Name of the topic to subscribe to.
302312
* @param type Type for object deserialization.
303-
* @return A Flux of deserialized event payloads (no CloudEvent wrapper).
313+
* @param metadata Subscription metadata (e.g., {"rawPayload": "true"}).
314+
* @return A Flux of deserialized event payloads.
304315
* @param <T> Type of the event payload.
305316
*/
306-
<T> Flux<T> subscribeToEventsData(String pubsubName, String topic, TypeRef<T> type);
317+
<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata);
307318

308319
/*
309320
* Converse with an LLM.

sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
package io.dapr.internal.subscription;
1515

16-
import io.dapr.client.domain.CloudEvent;
1716
import io.dapr.exceptions.DaprException;
1817
import io.dapr.serializer.DaprObjectSerializer;
1918
import io.dapr.utils.TypeRef;
@@ -40,7 +39,7 @@ public class EventSubscriberStreamObserver<T> implements StreamObserver<DaprProt
4039
private static final Logger logger = LoggerFactory.getLogger(EventSubscriberStreamObserver.class);
4140

4241
private final DaprGrpc.DaprStub stub;
43-
private final FluxSink<CloudEvent<T>> sink;
42+
private final FluxSink<T> sink;
4443
private final TypeRef<T> type;
4544
private final DaprObjectSerializer objectSerializer;
4645

@@ -50,13 +49,13 @@ public class EventSubscriberStreamObserver<T> implements StreamObserver<DaprProt
5049
* Creates a new EventSubscriberStreamObserver.
5150
*
5251
* @param stub The gRPC stub for making Dapr service calls
53-
* @param sink The FluxSink to emit CloudEvents to
52+
* @param sink The FluxSink to emit deserialized event data to
5453
* @param type The TypeRef for deserializing event payloads
5554
* @param objectSerializer The serializer to use for deserialization
5655
*/
5756
public EventSubscriberStreamObserver(
5857
DaprGrpc.DaprStub stub,
59-
FluxSink<CloudEvent<T>> sink,
58+
FluxSink<T> sink,
6059
TypeRef<T> type,
6160
DaprObjectSerializer objectSerializer) {
6261
this.stub = stub;
@@ -91,8 +90,7 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) {
9190

9291
try {
9392
T data = deserializeEventData(message);
94-
CloudEvent<T> cloudEvent = buildCloudEvent(message, data);
95-
emitEventAndAcknowledge(cloudEvent, eventId);
93+
emitDataAndAcknowledge(data, eventId);
9694
} catch (IOException e) {
9795
// Deserialization failure - send DROP ack
9896
handleDeserializationError(eventId, e);
@@ -142,22 +140,11 @@ private T deserializeEventData(DaprAppCallbackProtos.TopicEventRequest message)
142140
return objectSerializer.deserialize(message.getData().toByteArray(), type);
143141
}
144142

145-
private CloudEvent<T> buildCloudEvent(DaprAppCallbackProtos.TopicEventRequest message, T data) {
146-
CloudEvent<T> cloudEvent = new CloudEvent<>();
147-
148-
cloudEvent.setId(message.getId());
149-
cloudEvent.setType(message.getType());
150-
cloudEvent.setSpecversion(message.getSpecVersion());
151-
cloudEvent.setDatacontenttype(message.getDataContentType());
152-
cloudEvent.setTopic(message.getTopic());
153-
cloudEvent.setPubsubName(message.getPubsubName());
154-
cloudEvent.setData(data);
155-
156-
return cloudEvent;
157-
}
158-
159-
private void emitEventAndAcknowledge(CloudEvent<T> cloudEvent, String eventId) {
160-
sink.next(cloudEvent);
143+
private void emitDataAndAcknowledge(T data, String eventId) {
144+
// Only emit if data is not null (Reactor doesn't allow null values in Flux)
145+
if (data != null) {
146+
sink.next(data);
147+
}
161148

162149
// Send SUCCESS acknowledgment
163150
requestStream.onNext(buildSuccessAck(eventId));

sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import java.util.concurrent.ExecutionException;
8989
import java.util.concurrent.Semaphore;
9090
import java.util.concurrent.atomic.AtomicInteger;
91+
import java.util.concurrent.atomic.AtomicReference;
9192

9293
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
9394
import static org.junit.Assert.assertTrue;
@@ -638,12 +639,13 @@ public void onCompleted() {
638639

639640
final AtomicInteger eventCount = new AtomicInteger(0);
640641
final Semaphore gotAll = new Semaphore(0);
642+
643+
// subscribeToEvents now returns Flux<T> directly (raw data)
641644
var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING)
642-
.doOnNext(cloudEvent -> {
643-
assertEquals(data, cloudEvent.getData());
644-
assertEquals(pubsubName, cloudEvent.getPubsubName());
645-
assertEquals(topicName, cloudEvent.getTopic());
646-
assertNotNull(cloudEvent.getId());
645+
.doOnNext(rawData -> {
646+
// rawData is String directly, not CloudEvent
647+
assertEquals(data, rawData);
648+
assertTrue(rawData instanceof String);
647649

648650
int count = eventCount.incrementAndGet();
649651

@@ -660,12 +662,13 @@ public void onCompleted() {
660662
}
661663

662664
@Test
663-
public void subscribeToEventsDataTest() throws Exception {
664-
var numEvents = 100;
665+
public void subscribeEventsWithMetadataTest() throws Exception {
666+
var numEvents = 10;
665667
var pubsubName = "pubsubName";
666668
var topicName = "topicName";
667669
var data = "my message";
668670
var started = new Semaphore(0);
671+
var capturedMetadata = new AtomicReference<java.util.Map<String, String>>();
669672

670673
doAnswer((Answer<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>>) invocation -> {
671674
StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1> observer =
@@ -701,7 +704,11 @@ public void subscribeToEventsDataTest() throws Exception {
701704

702705
return new StreamObserver<>() {
703706
@Override
704-
public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) {
707+
public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 request) {
708+
// Capture metadata from initial request
709+
if (request.hasInitialRequest()) {
710+
capturedMetadata.set(request.getInitialRequest().getMetadataMap());
711+
}
705712
started.release();
706713
}
707714

@@ -719,11 +726,11 @@ public void onCompleted() {
719726

720727
final AtomicInteger eventCount = new AtomicInteger(0);
721728
final Semaphore gotAll = new Semaphore(0);
729+
Map<String, String> metadata = Map.of("rawPayload", "true");
722730

723-
// Use new subscribeToEventsData - receives raw String, not CloudEvent<String>
724-
var disposable = previewClient.subscribeToEventsData(pubsubName, topicName, TypeRef.STRING)
731+
// Use subscribeToEvents with rawPayload metadata
732+
var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING, metadata)
725733
.doOnNext(rawData -> {
726-
// rawData is String directly, not CloudEvent
727734
assertEquals(data, rawData);
728735
assertTrue(rawData instanceof String);
729736

@@ -739,6 +746,10 @@ public void onCompleted() {
739746
disposable.dispose();
740747

741748
assertEquals(numEvents, eventCount.get());
749+
750+
// Verify metadata was passed to gRPC request
751+
assertNotNull(capturedMetadata.get());
752+
assertEquals("true", capturedMetadata.get().get("rawPayload"));
742753
}
743754

744755
@Test

0 commit comments

Comments
 (0)