@@ -43,6 +43,9 @@ public class PubSubStreamIT extends BaseIT {
4343 // sent by the runtime. When this was first added, the batch size in runtime was set to 10.
4444 private static final int NUM_MESSAGES = 100 ;
4545 private static final String TOPIC_NAME = "stream-topic" ;
46+ private static final String TOPIC_NAME_FLUX = "stream-topic-flux" ;
47+ private static final String TOPIC_NAME_CLOUDEVENT = "stream-topic-cloudevent" ;
48+ private static final String TOPIC_NAME_RAWPAYLOAD = "stream-topic-rawpayload" ;
4649 private static final String PUBSUB_NAME = "messagebus" ;
4750
4851 private final List <DaprRun > runs = new ArrayList <>();
@@ -139,17 +142,17 @@ public void testPubSubFlux() throws Exception {
139142 // Publish messages
140143 for (int i = 0 ; i < NUM_MESSAGES ; i ++) {
141144 String message = String .format ("Flux message #%d for run %s" , i , runId );
142- client .publishEvent (PUBSUB_NAME , TOPIC_NAME , message ).block ();
145+ client .publishEvent (PUBSUB_NAME , TOPIC_NAME_FLUX , message ).block ();
143146 System .out .println (
144- String .format ("Published flux message: '%s' to topic '%s'" , message , TOPIC_NAME ));
147+ String .format ("Published flux message: '%s' to topic '%s'" , message , TOPIC_NAME_FLUX ));
145148 }
146149
147- System .out .println ("Starting Flux subscription for " + TOPIC_NAME );
150+ System .out .println ("Starting Flux subscription for " + TOPIC_NAME_FLUX );
148151
149152 Set <String > messages = Collections .synchronizedSet (new HashSet <>());
150153
151154 // subscribeToEvents now returns Flux<T> directly (raw data)
152- var disposable = previewClient .subscribeToEvents (PUBSUB_NAME , TOPIC_NAME , TypeRef .STRING )
155+ var disposable = previewClient .subscribeToEvents (PUBSUB_NAME , TOPIC_NAME_FLUX , TypeRef .STRING )
153156 .doOnNext (rawMessage -> {
154157 // rawMessage is String directly
155158 if (rawMessage .contains (runId )) {
@@ -162,7 +165,7 @@ public void testPubSubFlux() throws Exception {
162165 callWithRetry (() -> {
163166 var messageCount = messages .size ();
164167 System .out .println (
165- String .format ("Got %d flux messages out of %d for topic %s." , messageCount , NUM_MESSAGES , TOPIC_NAME ));
168+ String .format ("Got %d flux messages out of %d for topic %s." , messageCount , NUM_MESSAGES , TOPIC_NAME_FLUX ));
166169 assertEquals (NUM_MESSAGES , messages .size ());
167170 }, 60000 );
168171
@@ -183,17 +186,17 @@ public void testPubSubCloudEvent() throws Exception {
183186 // Publish messages
184187 for (int i = 0 ; i < NUM_MESSAGES ; i ++) {
185188 String message = String .format ("CloudEvent message #%d for run %s" , i , runId );
186- client .publishEvent (PUBSUB_NAME , TOPIC_NAME , message ).block ();
189+ client .publishEvent (PUBSUB_NAME , TOPIC_NAME_CLOUDEVENT , message ).block ();
187190 System .out .println (
188- String .format ("Published CloudEvent message: '%s' to topic '%s'" , message , TOPIC_NAME ));
191+ String .format ("Published CloudEvent message: '%s' to topic '%s'" , message , TOPIC_NAME_CLOUDEVENT ));
189192 }
190193
191- System .out .println ("Starting CloudEvent subscription for " + TOPIC_NAME );
194+ System .out .println ("Starting CloudEvent subscription for " + TOPIC_NAME_CLOUDEVENT );
192195
193196 Set <String > messageIds = Collections .synchronizedSet (new HashSet <>());
194197
195198 // Use TypeRef<CloudEvent<String>> to receive full CloudEvent with metadata
196- var disposable = previewClient .subscribeToEvents (PUBSUB_NAME , TOPIC_NAME , new TypeRef <CloudEvent <String >>(){})
199+ var disposable = previewClient .subscribeToEvents (PUBSUB_NAME , TOPIC_NAME_CLOUDEVENT , new TypeRef <CloudEvent <String >>(){})
197200 .doOnNext (cloudEvent -> {
198201 if (cloudEvent .getData () != null && cloudEvent .getData ().contains (runId )) {
199202 messageIds .add (cloudEvent .getId ());
@@ -207,7 +210,7 @@ public void testPubSubCloudEvent() throws Exception {
207210 callWithRetry (() -> {
208211 var messageCount = messageIds .size ();
209212 System .out .println (
210- String .format ("Got %d CloudEvent messages out of %d for topic %s." , messageCount , NUM_MESSAGES , TOPIC_NAME ));
213+ String .format ("Got %d CloudEvent messages out of %d for topic %s." , messageCount , NUM_MESSAGES , TOPIC_NAME_CLOUDEVENT ));
211214 assertEquals (NUM_MESSAGES , messageIds .size ());
212215 }, 60000 );
213216
@@ -228,18 +231,18 @@ public void testPubSubRawPayload() throws Exception {
228231 // Publish messages with rawPayload metadata
229232 for (int i = 0 ; i < NUM_MESSAGES ; i ++) {
230233 String message = String .format ("RawPayload message #%d for run %s" , i , runId );
231- client .publishEvent (PUBSUB_NAME , TOPIC_NAME , message , Map .of ("rawPayload" , "true" )).block ();
234+ client .publishEvent (PUBSUB_NAME , TOPIC_NAME_RAWPAYLOAD , message , Map .of ("rawPayload" , "true" )).block ();
232235 System .out .println (
233- String .format ("Published raw payload message: '%s' to topic '%s'" , message , TOPIC_NAME ));
236+ String .format ("Published raw payload message: '%s' to topic '%s'" , message , TOPIC_NAME_RAWPAYLOAD ));
234237 }
235238
236- System .out .println ("Starting raw payload subscription for " + TOPIC_NAME );
239+ System .out .println ("Starting raw payload subscription for " + TOPIC_NAME_RAWPAYLOAD );
237240
238241 Set <String > messages = Collections .synchronizedSet (new HashSet <>());
239242 Map <String , String > metadata = Map .of ("rawPayload" , "true" );
240243
241244 // Use subscribeToEvents with rawPayload metadata
242- var disposable = previewClient .subscribeToEvents (PUBSUB_NAME , TOPIC_NAME , TypeRef .STRING , metadata )
245+ var disposable = previewClient .subscribeToEvents (PUBSUB_NAME , TOPIC_NAME_RAWPAYLOAD , TypeRef .STRING , metadata )
243246 .doOnNext (rawMessage -> {
244247 if (rawMessage .contains (runId )) {
245248 messages .add (rawMessage );
@@ -251,7 +254,7 @@ public void testPubSubRawPayload() throws Exception {
251254 callWithRetry (() -> {
252255 var messageCount = messages .size ();
253256 System .out .println (
254- String .format ("Got %d raw payload messages out of %d for topic %s." , messageCount , NUM_MESSAGES , TOPIC_NAME ));
257+ String .format ("Got %d raw payload messages out of %d for topic %s." , messageCount , NUM_MESSAGES , TOPIC_NAME_RAWPAYLOAD ));
255258 assertEquals (NUM_MESSAGES , messages .size ());
256259 }, 60000 );
257260
0 commit comments