Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,36 @@ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
*
* @param downStreamMsgContext Down Stream Message Context
*/
private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
List<CloudEvent> msgExts = new ArrayList<>();
msgExts.add(downStreamMsgContext.event);
log.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.event.getSubject(),
downStreamMsgContext.seq, downStreamMsgContext.event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS));
downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext);
private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
if (downStreamMsgContext.consumer == null
|| downStreamMsgContext.consumeConcurrentlyContext == null
|| downStreamMsgContext.event == null) {

log.warn(
"eventMeshAckMsg skipped, consumer:{}, context:{}, event:{}",
downStreamMsgContext.consumer == null,
downStreamMsgContext.consumeConcurrentlyContext == null,
downStreamMsgContext.event == null
);
return;
}

List<CloudEvent> msgExts = new ArrayList<>();
msgExts.add(downStreamMsgContext.event);

log.warn(
"eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}",
downStreamMsgContext.event.getSubject(),
downStreamMsgContext.seq,
downStreamMsgContext.event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS)
);

downStreamMsgContext.consumer.updateOffset(
msgExts,
downStreamMsgContext.consumeConcurrentlyContext
);
}

@Override
public void doRun() {
retry();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

import org.junit.jupiter.api.Test;

import java.net.URI;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

class DownStreamMsgContextTest {

private CloudEvent buildEvent() {
return CloudEventBuilder.v1()
.withId("test-id")
.withSource(URI.create("test://source"))
.withType("test-type")
.withSubject("test-topic")
.build();
}

private SubscriptionItem buildSubscriptionItem() {
SubscriptionItem item = new SubscriptionItem();
item.setMode(SubscriptionMode.CLUSTERING);
return item;
}

@Test
void retry_shouldNotThrowException_whenConsumerOrContextIsNull() {

CloudEvent event = buildEvent();

// Intentionally set to null to simulate edge case
Session session = null;
MQConsumerWrapper consumer = null;
AbstractContext context = null;

DownStreamMsgContext msgContext = new DownStreamMsgContext(
event,
session,
consumer,
context,
false,
buildSubscriptionItem()
);

assertDoesNotThrow(msgContext::retry);
}

@Test
void ackMsg_shouldNotThrowException_whenDependenciesAreNull() {

CloudEvent event = buildEvent();

DownStreamMsgContext msgContext = new DownStreamMsgContext(
event,
null,
null,
null,
false,
buildSubscriptionItem()
);

assertDoesNotThrow(msgContext::ackMsg);
}
}