Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,36 @@ void convertAndSend(String destinationName, Object message, MessagePostProcessor
*/
@Nullable Message sendAndReceive(String destinationName, MessageCreator messageCreator) throws JmsException;

/**
* Send a message and receive the reply from the specified destination. The
* {@link MessageCreator} callback creates the message given a Session. A given
* responseQueue is set in the {@code JMSReplyTO} header of the message.
* @param destination the destination to send this message to
* @param responseQueue the destination to receive the reply from
* @param messageCreator callback to create a message
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
* @since 7.0.4
*/
@Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException;

/**
* Send a message and receive the reply from the specified destination. The
* {@link MessageCreator} callback creates the message given a Session. A given
* responseQueue is set in the {@code JMSReplyTO} header of the message.
* @param destinationName the name of the destination to send this message to
* (to be resolved to an actual destination by a DestinationResolver)
* @param responseQueueName the name of the destination to receive the reply from
* (to be resolved to an actual destination by a DestinationResolver)
* @param messageCreator callback to create a message
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
* @since 7.0.4
*/
@Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException;


//---------------------------------------------------------------------------------------
// Convenience methods for browsing messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,11 @@ else if (isClientAcknowledge(session)) {
return executeLocal(session -> doSendAndReceive(session, destination, messageCreator), true);
}

@Override
public @Nullable Message sendAndReceive(Destination destination, Destination responseQueue, MessageCreator messageCreator) throws JmsException {
return executeLocal(session -> doSendAndReceive(session, destination, responseQueue, messageCreator), true);
}

@Override
public @Nullable Message sendAndReceive(String destinationName, MessageCreator messageCreator) throws JmsException {
return executeLocal(session -> {
Expand All @@ -906,22 +911,50 @@ else if (isClientAcknowledge(session)) {
}, true);
}

@Override
public @Nullable Message sendAndReceive(String destinationName, String responseQueueName, MessageCreator messageCreator) throws JmsException {
return executeLocal(session -> {
Destination destination = resolveDestinationName(session, destinationName);
Destination responseQueue = resolveDestinationName(session, responseQueueName);
return doSendAndReceive(session, destination, responseQueue, messageCreator);
}, true);
}

/**
* Send a request message to the given {@link Destination} and block until
* Send a request message to the given {@link Destination destination} and block until
* a reply has been received on a temporary queue created on-the-fly.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
protected @Nullable Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {

Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
try {
responseQueue = session.createTemporaryQueue();
return doSendAndReceive(session, destination, responseQueue, messageCreator);
}
finally {
if (responseQueue != null) {
responseQueue.delete();
}
}
}

/**
* Send a request message to the given {@link Destination destination} and block until
* a reply has been received on a {@link Destination responseQueue} queue.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
protected @Nullable Message doSendAndReceive(Session session, Destination destination, Destination responseQueue, MessageCreator messageCreator)
throws JMSException {

Assert.notNull(messageCreator, "MessageCreator must not be null");
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
Expand All @@ -934,9 +967,6 @@ else if (isClientAcknowledge(session)) {
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,69 @@ else if (explicitDestination) {
verify(messageProducer).close();
}

@Test
void testSendAndReceiveDestinationWithResponseQueue() throws Exception {
doTestSendAndReceiveWithResponseQueue(true, 1000L);
}

@Test
void testSendAndReceiveDestinationNameWithResponseQueueName() throws Exception {
doTestSendAndReceiveWithResponseQueue(false, 1000L);
}

private void doTestSendAndReceiveWithResponseQueue(boolean explicitDestination, long timeout)
throws Exception {

JmsTemplate template = createTemplate();
template.setConnectionFactory(this.connectionFactory);
template.setReceiveTimeout(timeout);

String destinationName = "testDestination";
String responseQueueName = "responseQueue";

Queue responseQueue = mock();
given(this.jndiContext.lookup(responseQueueName)).willReturn(responseQueue);

Session localSession = getLocalSession();
MessageProducer messageProducer = mock();
given(localSession.createProducer(this.queue)).willReturn(messageProducer);

MessageConsumer messageConsumer = mock();
given(localSession.createConsumer(responseQueue)).willReturn(messageConsumer);

TextMessage request = mock();
MessageCreator messageCreator = mock();
given(messageCreator.createMessage(localSession)).willReturn(request);

TextMessage reply = mock();
if (timeout == JmsTemplate.RECEIVE_TIMEOUT_NO_WAIT) {
given(messageConsumer.receiveNoWait()).willReturn(reply);
}
else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
given(messageConsumer.receive()).willReturn(reply);
}
else {
given(messageConsumer.receive(timeout)).willReturn(reply);
}

Message message;
if (explicitDestination) {
message = template.sendAndReceive(this.queue, responseQueue, messageCreator);
}
else {
message = template.sendAndReceive(destinationName, responseQueueName, messageCreator);
}

// replyTO set on the request
verify(request).setJMSReplyTo(responseQueue);
assertThat(message).as("Reply message not received").isSameAs(reply);
verify(this.connection).start();
verify(this.connection).close();
verify(localSession).close();
verify(messageConsumer).close();
verify(messageProducer).close();
}

@Test
void testIllegalStateException() throws Exception {
doTestJmsException(new jakarta.jms.IllegalStateException(""), org.springframework.jms.IllegalStateException.class);
Expand Down