@ -18,6 +18,7 @@ package org.springframework.messaging.core;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicReference ;
import org.junit.Before ;
import org.junit.Test ;
@ -56,6 +57,7 @@ public class GenericMessagingTemplateTests {
@Test
public void sendAndReceive ( ) {
SubscribableChannel channel = new ExecutorSubscribableChannel ( this . executor ) ;
channel . subscribe ( new MessageHandler ( ) {
@Override
@ -66,12 +68,13 @@ public class GenericMessagingTemplateTests {
} ) ;
String actual = this . template . convertSendAndReceive ( channel , "request" , String . class ) ;
assertEquals ( "response" , actual ) ;
}
@Test
public void sendAndReceiveTimeout ( ) throws InterruptedException {
final AtomicReference < Throwable > failure = new AtomicReference < Throwable > ( ) ;
final CountDownLatch latch = new CountDownLatch ( 1 ) ;
this . template . setReceiveTimeout ( 1 ) ;
@ -85,14 +88,17 @@ public class GenericMessagingTemplateTests {
Thread . sleep ( 500 ) ;
MessageChannel replyChannel = ( MessageChannel ) message . getHeaders ( ) . getReplyChannel ( ) ;
replyChannel . send ( new GenericMessage < String > ( "response" ) ) ;
fail ( "Expected exception" ) ;
failure . set ( new IllegalStateException ( "Expected exception" ) ) ;
}
catch ( InterruptedException e ) {
fail ( "Unexpected exception " + e . getMessage ( ) ) ;
failure . set ( e ) ;
}
catch ( MessageDeliveryException ex ) {
assertEquals ( "Reply message received but the receiving thread has already received a reply" ,
ex . getMessage ( ) ) ;
String expected = "Reply message received but the receiving thread has exited due to a timeout" ;
String actual = ex . getMessage ( ) ;
if ( ! expected . equals ( actual ) ) {
failure . set ( new IllegalStateException ( "Unexpected error: '" + actual + "'" ) ) ;
}
}
finally {
latch . countDown ( ) ;
@ -101,8 +107,11 @@ public class GenericMessagingTemplateTests {
} ) ;
assertNull ( this . template . convertSendAndReceive ( channel , "request" , String . class ) ) ;
assertTrue ( latch . await ( 1000 , TimeUnit . MILLISECONDS ) ) ;
if ( failure . get ( ) ! = null ) {
throw new AssertionError ( failure . get ( ) ) ;
}
}
}