33import com .typesafe .config .Config ;
44import fi .hsl .common .config .ConfigParser ;
55import fi .hsl .common .config .ConfigUtils ;
6+ import fi .hsl .common .health .HealthServer ;
67import org .apache .pulsar .client .api .Consumer ;
78import org .apache .pulsar .client .api .Message ;
89import org .apache .pulsar .client .api .Producer ;
10+ import org .junit .BeforeClass ;
911import org .junit .ClassRule ;
1012import org .junit .Test ;
1113import org .slf4j .Logger ;
1416import org .testcontainers .containers .PulsarContainer ;
1517import redis .clients .jedis .Jedis ;
1618
19+ import java .net .HttpURLConnection ;
20+ import java .net .URI ;
21+ import java .net .http .HttpClient ;
22+ import java .net .http .HttpRequest ;
23+ import java .net .http .HttpResponse ;
1724import java .nio .charset .Charset ;
1825import java .util .*;
1926import java .util .concurrent .TimeUnit ;
27+ import java .util .function .BooleanSupplier ;
2028
2129import static org .junit .Assert .*;
2230
@@ -36,13 +44,22 @@ public class ITPulsarApplication {
3644
3745 @ ClassRule
3846 public static PulsarContainer pulsar = MockContainers .newPulsarContainer ();
39-
47+
48+ @ BeforeClass
49+ public static void setUp () throws Exception {
50+ MockContainers .configurePulsarContainer (pulsar , TENANT , NAMESPACE );
51+
52+ if (PRINT_PULSAR_LOG ) {
53+ MockContainers .tail (pulsar , logger );
54+ }
55+ }
56+
4057 @ Test
4158 public void testRedisContainer () {
4259 Jedis jedis = MockContainers .newMockJedisConnection (redis );
4360 jedis .set ("key" , "value" );
4461 String value = jedis .get ("key" );
45- assertEquals (value , " value" );
62+ assertEquals (" value" , value );
4663 }
4764
4865 @ Test
@@ -51,10 +68,11 @@ public void testPulsarApplicationRedis() throws Exception {
5168 assertNotNull (config );
5269 PulsarApplication app = PulsarMockApplication .newInstance (config , redis , pulsar );
5370 assertNotNull (app );
54-
71+
72+ assertNotNull (app .getContext ().getJedis ());
5573 app .getContext ().getJedis ().set ("pulsar-application-jedis" , "should work" );
5674 String value = app .getContext ().getJedis ().get ("pulsar-application-jedis" );
57- assertEquals (value , "should work" );
75+ assertEquals ("should work" , value );
5876 }
5977
6078
@@ -76,17 +94,20 @@ public void testPulsar() throws Exception {
7694 final String payload = "Test-message" ;
7795
7896 Producer <byte []> producer = app .getContext ().getSingleProducer ();
97+ assertNotNull (producer );
7998 producer .send (payload .getBytes ());
8099
81100 logger .info ("Message sent, reading it back" );
82101
83102 Consumer <byte []> consumer = app .getContext ().getConsumer ();
84- readAndValidateMsg (consumer , new HashSet <>(Arrays . asList (payload )));
103+ readAndValidateMsg (consumer , new HashSet <>(List . of (payload )));
85104
86105 Jedis jedis = app .getContext ().getJedis ();
87-
106+
107+ assertNotNull (consumer );
88108 assertTrue (consumer .isConnected ());
89109 assertTrue (producer .isConnected ());
110+ assertNotNull (jedis );
90111 assertTrue (jedis .isConnected ());
91112
92113 app .close ();
@@ -115,7 +136,8 @@ public void testPulsarWithMultipleTopics() throws Exception {
115136
116137 PulsarApplication app = PulsarMockApplication .newInstance (producer1Config , redis , pulsar );
117138 assertNotNull (app );
118-
139+
140+ assertNotNull (app .getContext ().getProducers ());
119141 Producer <byte []> producer = app .getContext ().getProducers ().get ("test-1" );
120142
121143 //Create a second producer but bind into different topic
@@ -173,12 +195,15 @@ public void testPulsarAutoClose() throws Exception {
173195 assertNotNull (app );
174196
175197 producer = app .getContext ().getSingleProducer ();
198+ assertNotNull (producer );
176199 assertTrue (producer .isConnected ());
177200
178201 consumer = app .getContext ().getConsumer ();
202+ assertNotNull (consumer );
179203 assertTrue (consumer .isConnected ());
180204
181205 jedis = app .getContext ().getJedis ();
206+ assertNotNull (jedis );
182207 assertTrue (jedis .isConnected ());
183208 }
184209
@@ -218,10 +243,120 @@ public void testInitFailureOnInvalidTopicsPattern() {
218243 public void testInitFailure (Config config ) {
219244 try (PulsarApplication app = PulsarMockApplication .newInstance (config , redis , pulsar )) {
220245 logger .info ("You should never see this message, init should throw an exception" );
221- assertTrue ( false );
246+ fail ( );
222247 }
223248 catch (Exception e ) {
224249 logger .debug ("Exception as expected" );
225250 }
226251 }
252+
253+ @ Test
254+ public void testHttpServer () throws Exception {
255+ Config base = PulsarMockApplication .readConfig (CONFIG_FILE );
256+
257+ PulsarApplication app = PulsarMockApplication .newInstance (base , redis , pulsar );
258+ assertNotNull (app );
259+
260+ logger .info ("Pulsar Application created, testing HealthServer" );
261+
262+ final Producer <byte []> producer = app .getContext ().getSingleProducer ();
263+ final Consumer <byte []> consumer = app .getContext ().getConsumer ();
264+ final Jedis jedis = app .getContext ().getJedis ();
265+ final HealthServer healthServer = app .getContext ().getHealthServer ();
266+
267+ assertNotNull (consumer );
268+ assertTrue (consumer .isConnected ());
269+ assertNotNull (producer );
270+ assertTrue (producer .isConnected ());
271+ assertNotNull (jedis );
272+ assertTrue (jedis .isConnected ());
273+
274+ logger .info ("Creating health check function" );
275+ final BooleanSupplier healthCheck = () -> {
276+ boolean status = true ;
277+ status &= producer .isConnected ();
278+ status &= consumer .isConnected ();
279+ status &= jedis .isConnected ();
280+ return status ;
281+ };
282+ assertNotNull (healthServer );
283+ healthServer .addCheck (healthCheck );
284+
285+ String url = "http://localhost:" + healthServer .port + healthServer .endpoint ;
286+
287+ logger .info ("Checking health" );
288+ HttpResponse <String > response = makeGetRequest (url );
289+ assertEquals (HttpURLConnection .HTTP_OK , response .statusCode ());
290+ assertEquals ("OK" , getContent (response ));
291+
292+ logger .info ("Disconnecting Jedis and checking health" );
293+ jedis .disconnect ();
294+ assertFalse (jedis .isConnected ());
295+
296+ response = makeGetRequest (url );
297+ assertEquals (HttpURLConnection .HTTP_UNAVAILABLE , response .statusCode ());
298+ assertEquals ("FAIL" , getContent (response ));
299+
300+ logger .info ("Reconnecting Jedis and checking health" );
301+ jedis .connect ();
302+ assertTrue (jedis .isConnected ());
303+
304+ response = makeGetRequest (url );
305+ assertEquals (HttpURLConnection .HTTP_OK , response .statusCode ());
306+ assertEquals ("OK" , getContent (response ));
307+
308+ logger .info ("Closing Pulsar consumer and checking health" );
309+ consumer .close ();
310+ assertFalse (consumer .isConnected ());
311+
312+ response = makeGetRequest (url );
313+ assertEquals (HttpURLConnection .HTTP_UNAVAILABLE , response .statusCode ());
314+ assertEquals ("FAIL" , getContent (response ));
315+
316+ response = makePostRequest (url );
317+ assertEquals (HttpURLConnection .HTTP_BAD_METHOD , response .statusCode ());
318+ assertEquals ("Method Not Allowed" , getContent (response ));
319+
320+ url = "http://localhost:" + healthServer .port + "/foo" ;
321+ response = makeGetRequest (url );
322+ assertEquals (HttpURLConnection .HTTP_NOT_FOUND , response .statusCode ());
323+ assertEquals ("Not Found" , getContent (response ));
324+
325+ url = "http://localhost:" + healthServer .port + healthServer .endpoint + "foo" ;
326+ response = makeGetRequest (url );
327+ assertEquals (HttpURLConnection .HTTP_NOT_FOUND , response .statusCode ());
328+ assertEquals ("Not Found" , getContent (response ));
329+
330+ app .close ();
331+ assertFalse (consumer .isConnected ());
332+ assertFalse (producer .isConnected ());
333+ assertFalse (jedis .isConnected ());
334+ }
335+
336+ private HttpResponse <String > makeGetRequest (final String url ) {
337+ return makeRequest ("GET" , url );
338+ }
339+
340+ private HttpResponse <String > makePostRequest (final String url ) {
341+ return makeRequest ("POST" , url );
342+ }
343+
344+ private HttpResponse <String > makeRequest (final String method , final String url ) {
345+ HttpClient client = HttpClient .newHttpClient ();
346+ HttpRequest request ;
347+ if (method .equalsIgnoreCase ("post" )) {
348+ request = HttpRequest .newBuilder (URI .create (url )).POST (HttpRequest .BodyPublishers .noBody ()).build ();
349+ } else {
350+ request = HttpRequest .newBuilder (URI .create (url )).GET ().build ();
351+ }
352+ try {
353+ return client .send (request , HttpResponse .BodyHandlers .ofString ());
354+ } catch (Exception e ) {
355+ throw new RuntimeException (e );
356+ }
357+ }
358+
359+ private String getContent (final HttpResponse <String > response ) {
360+ return response .body ();
361+ }
227362}
0 commit comments