Skip to content
Merged
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>fi.hsl</groupId>
<artifactId>transitdata-common</artifactId>
<version>2.0.3-RC4</version>
<version>2.0.3-RC5</version>
<packaging>jar</packaging>
<name>Common utilities for Transitdata projects</name>
<properties>
Expand Down
72 changes: 63 additions & 9 deletions src/main/java/fi/hsl/common/health/HealthServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class HealthServer {
Expand All @@ -21,7 +22,9 @@ public class HealthServer {
public final int port;
public final String endpoint;
public final HttpServer httpServer;
private List<BooleanSupplier> checks = new ArrayList<>();
private final ExecutorService healthCheckExecutor =
Executors.newCachedThreadPool();
private final List<BooleanSupplier> checks = new CopyOnWriteArrayList<>();

public HealthServer(final int port, @NotNull final String endpoint) throws IOException {
this.port = port;
Expand All @@ -30,14 +33,14 @@ public HealthServer(final int port, @NotNull final String endpoint) throws IOExc
httpServer = HttpServer.create(new InetSocketAddress(port), 0);
httpServer.createContext("/", createDefaultHandler());
httpServer.createContext(endpoint, createHandler());
httpServer.setExecutor(Executors.newSingleThreadExecutor());
httpServer.setExecutor(healthCheckExecutor);
httpServer.start();
log.info("HealthServer started");
}

private void writeResponse(@NotNull final HttpExchange httpExchange, @NotNull final int responseCode, @NotNull final String responseBody) throws IOException {
final byte[] response = responseBody.getBytes("UTF-8");
httpExchange.getResponseHeaders().add("Content-Type", "text/plain; charset=UTF-8");
final byte[] response = responseBody.getBytes(StandardCharsets.UTF_8);
httpExchange.getResponseHeaders().add("Content-Type", "text/plain; charset=" + StandardCharsets.UTF_8.name());
httpExchange.sendResponseHeaders(responseCode, response.length);
final OutputStream out = httpExchange.getResponseBody();
out.write(response);
Expand Down Expand Up @@ -91,16 +94,67 @@ public void clearChecks() {
}

public boolean checkHealth() {
boolean isHealthy = true;
for (final BooleanSupplier check : checks) {
isHealthy &= check.getAsBoolean();
try {
CompletionService<Boolean> executorCompletionService
= new ExecutorCompletionService<>(healthCheckExecutor);
int n = checks.size();
List<Future<Boolean>> futures = new ArrayList<>(n);
try {
for (BooleanSupplier check : checks) {
futures.add(executorCompletionService.submit(checkToCallable(check)));
}
for (int i = 0; i < n; ++i) {
try {
Boolean result = executorCompletionService.take().get();
if (result == null || !result) {
return false; // Return false immediately if any check fails
}
} catch (ExecutionException e) {
log.error("A health check task execution failed. Marking unhealthy.", e.getCause() != null ? e.getCause() : e);
return false;
} catch (InterruptedException e) {
log.error("Health check interrupted. Marking unhealthy.", e);
Thread.currentThread().interrupt();
return false;
}
}
} finally {
for (Future<Boolean> f : futures) {
f.cancel(true);
}
}
return true; // Return true only if all checks pass
} catch (Exception e) {
log.error("Exception during health checks", e);
return false;
}
return isHealthy;
}

public void close() {
if (httpServer != null) {
httpServer.stop(0);
}
if (healthCheckExecutor != null) {
healthCheckExecutor.shutdown();
try {
if (!healthCheckExecutor.awaitTermination(3, TimeUnit.SECONDS)) {
healthCheckExecutor.shutdownNow();
}
} catch (InterruptedException ie) {
healthCheckExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

private static Callable<Boolean> checkToCallable(BooleanSupplier check) {
return () -> {
try {
return check.getAsBoolean();
} catch (Exception e) {
log.error("Exception during health check", e);
return false;
}
};
}
}
111 changes: 111 additions & 0 deletions src/test/java/fi/hsl/common/health/HealthServerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package fi.hsl.common.health;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HealthServerTest {

private HealthServer healthServer;
private final int testPort = 0;
private final String testEndpoint = "/healthz";

private static class CountingWrapper implements BooleanSupplier {

private final BooleanSupplier delegate;
private final AtomicInteger callCount = new AtomicInteger(0);

public CountingWrapper(BooleanSupplier delegate) {
this.delegate = delegate;
}

@Override
public boolean getAsBoolean() {
callCount.incrementAndGet();
return delegate.getAsBoolean();
}

public int getCallCount() {
return callCount.get();
}
}

@Before
public void setUp() throws IOException {
healthServer = new HealthServer(testPort, testEndpoint);
}

@After
public void tearDown() {
if (healthServer != null) {
healthServer.close();
}
}

@Test
public void singleUnhealthyCheckReturnsFalse() {
CountingWrapper unhealthyCheck = new CountingWrapper(() -> false);
healthServer.addCheck(unhealthyCheck);
boolean healthStatus = healthServer.checkHealth();
assertFalse(
"Health status should be false when one check is unhealthy.",
healthStatus
);
assertEquals(
"UnhealthyCheck should have been called once.",
1,
unhealthyCheck.getCallCount()
);
}

@Test
public void allHealthyChecksReturnsTrue() {
CountingWrapper healthyCheck1 = new CountingWrapper(() -> true);
CountingWrapper healthyCheck2 = new CountingWrapper(() -> true);
healthServer.addCheck(healthyCheck1);
healthServer.addCheck(healthyCheck2);
boolean healthStatus = healthServer.checkHealth();
assertTrue(
"Health status should be true when all checks are healthy.",
healthStatus
);
assertEquals(
"HealthyCheck1 should have been called once.",
1,
healthyCheck1.getCallCount()
);
assertEquals(
"HealthyCheck2 should have been called once.",
1,
healthyCheck2.getCallCount()
);
}

@Test
public void testCheckHealth_CheckThrowsException_ReturnsFalse() {
final RuntimeException testException = new RuntimeException(
"Simulated check failure"
);
CountingWrapper exceptionThrowingCheck = new CountingWrapper(() -> {
throw testException;
});
healthServer.addCheck(exceptionThrowingCheck);
boolean healthStatus = healthServer.checkHealth();
assertFalse(
"Health status should be false when a check throws an exception.",
healthStatus
);
assertEquals(
"ExceptionThrowingCheck should have been called once.",
1,
exceptionThrowingCheck.getCallCount()
);
}
}