There is a one to one relationship between consumed Pulsar messages and produced Pulsar messages. One message of input results in one message of output. Currently when we use sendAsync for the producer, we return immediately and then ack the original message to the Pulsar broker. This way, if the sending fails, we have already acked the original message and are unable to reprocess it.
Correct solution could be something along the lines of:
producer.newMessage()
.sendAsync()
.thenCompose((msg) -> consumer.acknowledgeAsync(inMsg))
.thenRun(() -> log.info("Message produced"));