77import org .apache .pulsar .client .api .Consumer ;
88import org .apache .pulsar .client .api .Message ;
99import org .apache .pulsar .client .api .Producer ;
10- import org .apache .pulsar .shade .org .apache .http .HttpResponse ;
11- import org .apache .pulsar .shade .org .apache .http .HttpStatus ;
12- import org .apache .pulsar .shade .org .apache .http .client .HttpClient ;
13- import org .apache .pulsar .shade .org .apache .http .client .methods .*;
14- import org .apache .pulsar .shade .org .apache .http .impl .client .HttpClientBuilder ;
1510import org .junit .BeforeClass ;
1611import org .junit .ClassRule ;
1712import org .junit .Test ;
2116import org .testcontainers .containers .PulsarContainer ;
2217import redis .clients .jedis .Jedis ;
2318
24- import java .io .BufferedReader ;
25- import java .io .IOException ;
26- import java .io .InputStreamReader ;
19+ import java .net .URI ;
20+ import java .net .http .HttpClient ;
21+ import java .net .http .HttpRequest ;
22+ import java .net .http .HttpResponse ;
2723import java .nio .charset .Charset ;
2824import java .util .*;
2925import java .util .concurrent .TimeUnit ;
@@ -62,7 +58,7 @@ public void testRedisContainer() {
6258 Jedis jedis = MockContainers .newMockJedisConnection (redis );
6359 jedis .set ("key" , "value" );
6460 String value = jedis .get ("key" );
65- assertEquals (value , " value" );
61+ assertEquals (" value" , value );
6662 }
6763
6864 @ Test
@@ -71,10 +67,11 @@ public void testPulsarApplicationRedis() throws Exception {
7167 assertNotNull (config );
7268 PulsarApplication app = PulsarMockApplication .newInstance (config , redis , pulsar );
7369 assertNotNull (app );
74-
70+
71+ assertNotNull (app .getContext ().getJedis ());
7572 app .getContext ().getJedis ().set ("pulsar-application-jedis" , "should work" );
7673 String value = app .getContext ().getJedis ().get ("pulsar-application-jedis" );
77- assertEquals (value , "should work" );
74+ assertEquals ("should work" , value );
7875 }
7976
8077
@@ -96,17 +93,20 @@ public void testPulsar() throws Exception {
9693 final String payload = "Test-message" ;
9794
9895 Producer <byte []> producer = app .getContext ().getSingleProducer ();
96+ assertNotNull (producer );
9997 producer .send (payload .getBytes ());
10098
10199 logger .info ("Message sent, reading it back" );
102100
103101 Consumer <byte []> consumer = app .getContext ().getConsumer ();
104- readAndValidateMsg (consumer , new HashSet <>(Arrays . asList (payload )));
102+ readAndValidateMsg (consumer , new HashSet <>(List . of (payload )));
105103
106104 Jedis jedis = app .getContext ().getJedis ();
107-
105+
106+ assertNotNull (consumer );
108107 assertTrue (consumer .isConnected ());
109108 assertTrue (producer .isConnected ());
109+ assertNotNull (jedis );
110110 assertTrue (jedis .isConnected ());
111111
112112 app .close ();
@@ -135,7 +135,8 @@ public void testPulsarWithMultipleTopics() throws Exception {
135135
136136 PulsarApplication app = PulsarMockApplication .newInstance (producer1Config , redis , pulsar );
137137 assertNotNull (app );
138-
138+
139+ assertNotNull (app .getContext ().getProducers ());
139140 Producer <byte []> producer = app .getContext ().getProducers ().get ("test-1" );
140141
141142 //Create a second producer but bind into different topic
@@ -193,12 +194,15 @@ public void testPulsarAutoClose() throws Exception {
193194 assertNotNull (app );
194195
195196 producer = app .getContext ().getSingleProducer ();
197+ assertNotNull (producer );
196198 assertTrue (producer .isConnected ());
197199
198200 consumer = app .getContext ().getConsumer ();
201+ assertNotNull (consumer );
199202 assertTrue (consumer .isConnected ());
200203
201204 jedis = app .getContext ().getJedis ();
205+ assertNotNull (jedis );
202206 assertTrue (jedis .isConnected ());
203207 }
204208
@@ -238,7 +242,7 @@ public void testInitFailureOnInvalidTopicsPattern() {
238242 public void testInitFailure (Config config ) {
239243 try (PulsarApplication app = PulsarMockApplication .newInstance (config , redis , pulsar )) {
240244 logger .info ("You should never see this message, init should throw an exception" );
241- assertTrue ( false );
245+ fail ( );
242246 }
243247 catch (Exception e ) {
244248 logger .debug ("Exception as expected" );
@@ -258,64 +262,68 @@ public void testHttpServer() throws Exception {
258262 final Consumer <byte []> consumer = app .getContext ().getConsumer ();
259263 final Jedis jedis = app .getContext ().getJedis ();
260264 final HealthServer healthServer = app .getContext ().getHealthServer ();
261-
265+
266+ assertNotNull (consumer );
262267 assertTrue (consumer .isConnected ());
268+ assertNotNull (producer );
263269 assertTrue (producer .isConnected ());
270+ assertNotNull (jedis );
264271 assertTrue (jedis .isConnected ());
265272
266273 logger .info ("Creating health check function" );
267274 final BooleanSupplier healthCheck = () -> {
268275 boolean status = true ;
269- if ( producer != null ) status &= producer .isConnected ();
270- if ( consumer != null ) status &= consumer .isConnected ();
271- if ( jedis != null ) status &= jedis .isConnected ();
276+ status &= producer .isConnected ();
277+ status &= consumer .isConnected ();
278+ status &= jedis .isConnected ();
272279 return status ;
273280 };
281+ assertNotNull (healthServer );
274282 healthServer .addCheck (healthCheck );
275283
276284 String url = "http://localhost:" + healthServer .port + healthServer .endpoint ;
277285
278286 logger .info ("Checking health" );
279- HttpResponse response = makeGetRequest (url );
280- assertEquals (HttpStatus . SC_OK , response .getStatusLine (). getStatusCode ());
287+ HttpResponse < String > response = makeGetRequest (url );
288+ assertEquals (1 , response .statusCode ());
281289 assertEquals ("OK" , getContent (response ));
282290
283291 logger .info ("Disconnecting Jedis and checking health" );
284292 jedis .disconnect ();
285293 assertFalse (jedis .isConnected ());
286294
287295 response = makeGetRequest (url );
288- assertEquals (HttpStatus . SC_SERVICE_UNAVAILABLE , response .getStatusLine (). getStatusCode ());
296+ assertEquals (1 , response .statusCode ());
289297 assertEquals ("FAIL" , getContent (response ));
290298
291299 logger .info ("Reconnecting Jedis and checking health" );
292300 jedis .connect ();
293301 assertTrue (jedis .isConnected ());
294302
295303 response = makeGetRequest (url );
296- assertEquals (HttpStatus . SC_OK , response .getStatusLine (). getStatusCode ());
304+ assertEquals (1 , response .statusCode ());
297305 assertEquals ("OK" , getContent (response ));
298306
299307 logger .info ("Closing Pulsar consumer and checking health" );
300308 consumer .close ();
301309 assertFalse (consumer .isConnected ());
302310
303311 response = makeGetRequest (url );
304- assertEquals (HttpStatus . SC_SERVICE_UNAVAILABLE , response .getStatusLine (). getStatusCode ());
312+ assertEquals (1 , response .statusCode ());
305313 assertEquals ("FAIL" , getContent (response ));
306314
307315 response = makePostRequest (url );
308- assertEquals (HttpStatus . SC_METHOD_NOT_ALLOWED , response .getStatusLine (). getStatusCode ());
316+ assertEquals (1 , response .statusCode ());
309317 assertEquals ("Method Not Allowed" , getContent (response ));
310318
311319 url = "http://localhost:" + healthServer .port + "/foo" ;
312320 response = makeGetRequest (url );
313- assertEquals (HttpStatus . SC_NOT_FOUND , response .getStatusLine (). getStatusCode ());
321+ assertEquals (1 , response .statusCode ());
314322 assertEquals ("Not Found" , getContent (response ));
315323
316324 url = "http://localhost:" + healthServer .port + healthServer .endpoint + "foo" ;
317325 response = makeGetRequest (url );
318- assertEquals (HttpStatus . SC_NOT_FOUND , response .getStatusLine (). getStatusCode ());
326+ assertEquals (1 , response .statusCode ());
319327 assertEquals ("Not Found" , getContent (response ));
320328
321329 app .close ();
@@ -324,39 +332,30 @@ public void testHttpServer() throws Exception {
324332 assertFalse (jedis .isConnected ());
325333 }
326334
327- private HttpResponse makeGetRequest (final String url ) throws IOException {
335+ private HttpResponse < String > makeGetRequest (final String url ) {
328336 return makeRequest ("GET" , url );
329337 }
330338
331- private HttpResponse makePostRequest (final String url ) throws IOException {
339+ private HttpResponse < String > makePostRequest (final String url ) {
332340 return makeRequest ("POST" , url );
333341 }
334342
335- private HttpResponse makeRequest (final String method , final String url ) throws IOException {
336- HttpClient client = HttpClientBuilder .create ().build ();
337- HttpUriRequest request ;
338- switch (method .toLowerCase ()) {
339- case "get" :
340- request = new HttpGet (url );
341- break ;
342- case "post" :
343- request = new HttpPost (url );
344- break ;
345- default :
346- request = new HttpGet (url );
347- break ;
343+ private HttpResponse <String > makeRequest (final String method , final String url ) {
344+ HttpClient client = HttpClient .newHttpClient ();
345+ HttpRequest request ;
346+ if (method .equalsIgnoreCase ("post" )) {
347+ request = HttpRequest .newBuilder (URI .create (url )).POST (HttpRequest .BodyPublishers .noBody ()).build ();
348+ } else {
349+ request = HttpRequest .newBuilder (URI .create (url )).GET ().build ();
350+ }
351+ try {
352+ return client .send (request , HttpResponse .BodyHandlers .ofString ());
353+ } catch (Exception e ) {
354+ throw new RuntimeException (e );
348355 }
349- HttpResponse response = client .execute (request );
350- return response ;
351356 }
352357
353- private String getContent (final HttpResponse response ) throws IOException {
354- BufferedReader reader = new BufferedReader (new InputStreamReader (response .getEntity ().getContent ()));
355- StringBuffer content = new StringBuffer ();
356- String line ;
357- while ((line = reader .readLine ()) != null ) {
358- content .append (line );
359- }
360- return content .toString ();
358+ private String getContent (final HttpResponse <String > response ) {
359+ return response .body ();
361360 }
362361}
0 commit comments