Skip to content

Ack consumed messages only after receiving ack for produced messages #23

@laaksma

Description

@laaksma

There is a one to one relationship between consumed Pulsar messages and produced Pulsar messages. One message of input results in one message of output. Currently when we use sendAsync for the producer, we return immediately and then ack the original message to the Pulsar broker. This way, if the sending fails, we have already acked the original message and are unable to reprocess it.

Correct solution could be something along the lines of:

producer.newMessage()
               .sendAsync()
               .thenCompose((msg) -> consumer.acknowledgeAsync(inMsg))
               .thenRun(() -> log.info("Message produced"));

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions