Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 52 additions & 72 deletions README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docker-compose-c4.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
- TRADER_INGESTION_MODE=AERON
- PAYARA_INSTANCE_NAME=c4-instance-2
- HAZELCAST_MEMBER_NAME=trader-stream-c4-2
- ENABLE_PUBLISHER=false
- ENABLE_PUBLISHER=true
- JVM_TYPE=azul-c4
- JAVA_OPTS=-Xms4g -Xmx4g
-Xlog:gc*:file=/opt/payara/gc.log:time,uptime,level,tags:filecount=5,filesize=10M
Expand Down Expand Up @@ -113,7 +113,7 @@ services:
- TRADER_INGESTION_MODE=AERON
- PAYARA_INSTANCE_NAME=c4-instance-3
- HAZELCAST_MEMBER_NAME=trader-stream-c4-3
- ENABLE_PUBLISHER=false
- ENABLE_PUBLISHER=true
- JVM_TYPE=azul-c4
- JAVA_OPTS=-Xms4g -Xmx4g
-Xlog:gc*:file=/opt/payara/gc.log:time,uptime,level,tags:filecount=5,filesize=10M
Expand Down
4 changes: 2 additions & 2 deletions docker-compose-g1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ services:
- TRADER_INGESTION_MODE=AERON
- PAYARA_INSTANCE_NAME=g1-instance-2
- HAZELCAST_MEMBER_NAME=trader-stream-g1-2
- ENABLE_PUBLISHER=false
- ENABLE_PUBLISHER=true
- JVM_TYPE=eclipse-temurin-g1
- JAVA_OPTS=-Xms4g -Xmx4g
-XX:+UseG1GC
Expand Down Expand Up @@ -115,7 +115,7 @@ services:
- TRADER_INGESTION_MODE=AERON
- PAYARA_INSTANCE_NAME=g1-instance-3
- HAZELCAST_MEMBER_NAME=trader-stream-g1-3
- ENABLE_PUBLISHER=false
- ENABLE_PUBLISHER=true
- JVM_TYPE=eclipse-temurin-g1
- JAVA_OPTS=-Xms4g -Xmx4g
-XX:+UseG1GC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,7 @@ private void offer(UnsafeBuffer buffer, int offset, int length, String messageTy
consecutiveFailures.set(0);
return;
} else if (result == Publication.BACK_PRESSURED) {
// Back pressure is normal at high throughput, don't log
retries--;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
continue;
} else if (result == Publication.NOT_CONNECTED) {
logWarningRateLimited("Publication not connected");
handlePublishFailure(messageType);
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/fish/payara/trader/pressure/AllocationMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

public enum AllocationMode {
OFF(0, 0, "No additional allocation"),
LOW(20, 10240, "2 MB/sec - Light HFT pressure"),
MEDIUM(195, 10240, "20 MB/sec - Moderate tick ingestion"),
HIGH(10000, 10240, "1 GB/sec - Heavy burst trading"),
EXTREME(40000, 10240, "4 GB/sec - Extreme flash crash simulation");
LOW(20, 10240, "2 MB/sec - Light pressure"),
MEDIUM(200, 10240, "20 MB/sec - Moderate pressure"),
HIGH(10000, 10240, "1 GB/sec - Heavy pressure"),
EXTREME(40000, 10240, "4 GB/sec - Extreme pressure");

private final int allocationsPerIteration;
private final int bytesPerAllocation;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package fish.payara.trader.pressure;

import fish.payara.trader.concurrency.VirtualThreadExecutor;
import fish.payara.trader.pressure.patterns.AllocationPattern;
import fish.payara.trader.pressure.patterns.HFTPatternRegistry;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.concurrent.ManagedExecutorService;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -36,16 +35,12 @@ public class MemoryPressureService {
private static final int TENURED_TARGET_MB = 1024;
private final AtomicLong tenuredBytesAllocated = new AtomicLong(0);

// Burst coordination for simulating market events (flash crashes, news releases, etc.)
private volatile long lastBurstTime = 0;
private static final long BURST_INTERVAL_MS = 5000; // 5 seconds
private static final int BURST_MULTIPLIER = 3; // 3x normal allocation rate during bursts
private static final long BURST_INTERVAL_MS = 5000;
private static final int BURST_MULTIPLIER = 3;

@Inject @VirtualThreadExecutor private ManagedExecutorService executorService;

// HFT allocation patterns (order books, ticks, depth snapshots)
@Inject private HFTPatternRegistry patternRegistry;

@PostConstruct
public void init() {
LOGGER.info("MemoryPressureService initialized");
Expand Down Expand Up @@ -123,7 +118,6 @@ private void generateGarbage(AllocationMode mode) {
int baseAllocations = mode.getAllocationsPerIteration();
int bytesPerAlloc = mode.getBytesPerAllocation();

// Burst detection for HIGH and EXTREME modes (simulates market events)
boolean inBurst = false;
if (mode != AllocationMode.OFF) {
long now = System.currentTimeMillis();
Expand All @@ -139,7 +133,6 @@ private void generateGarbage(AllocationMode mode) {

int totalAllocations = inBurst ? baseAllocations * BURST_MULTIPLIER : baseAllocations;

// Parallelize allocation using virtual threads
int numTasks = 4;
int allocationsPerTask = totalAllocations / numTasks;
CompletableFuture<?>[] futures = new CompletableFuture[numTasks];
Expand All @@ -154,11 +147,21 @@ private void generateGarbage(AllocationMode mode) {
CompletableFuture.runAsync(
() -> {
for (int i = 0; i < taskAllocations; i++) {
try {
AllocationPattern pattern = patternRegistry.selectPattern();
pattern.allocate(bytesPerAlloc);
} catch (Exception e) {
// Fail silently in parallel task to avoid flooding logs
int pattern = i % 4;

switch (pattern) {
case 0:
generateStringGarbage(bytesPerAlloc);
break;
case 1:
generateByteArrayGarbage(bytesPerAlloc);
break;
case 2:
generateObjectGarbage(bytesPerAlloc / 64);
break;
case 3:
generateCollectionGarbage(bytesPerAlloc / 100);
break;
}
totalBytesAllocated.addAndGet(bytesPerAlloc);
}
Expand All @@ -178,7 +181,6 @@ private void generateGarbage(AllocationMode mode) {
}
}

// Tenured cleanup logic (unchanged - maintains 1GB cap)
if (mode == AllocationMode.HIGH || mode == AllocationMode.EXTREME) {
while (tenuredBytesAllocated.get() > TENURED_TARGET_MB * 1024L * 1024L) {
if (!tenuredObjects.isEmpty()) {
Expand All @@ -196,6 +198,33 @@ private void generateGarbage(AllocationMode mode) {
}
}

private void generateStringGarbage(int bytes) {
StringBuilder sb = new StringBuilder(bytes);
for (int i = 0; i < bytes / 10; i++) {
sb.append("GARBAGE");
}
String garbage = sb.toString();
}

private void generateByteArrayGarbage(int bytes) {
byte[] garbage = new byte[bytes];
ThreadLocalRandom.current().nextBytes(garbage);
}

private void generateObjectGarbage(int count) {
List<DummyObject> garbage = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
garbage.add(new DummyObject(i, "data-" + i, System.nanoTime()));
}
}

private void generateCollectionGarbage(int count) {
List<Integer> garbage = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
garbage.add(ThreadLocalRandom.current().nextInt());
}
}

private void logStats() {
long now = System.currentTimeMillis();
if (now - lastStatsTime >= 5000) {
Expand Down Expand Up @@ -233,4 +262,16 @@ public AllocationMode getCurrentMode() {
public boolean isRunning() {
return running;
}

private static class DummyObject {
private final int id;
private final String data;
private final long timestamp;

DummyObject(int id, String data, long timestamp) {
this.id = id;
this.data = data;
this.timestamp = timestamp;
}
}
}
3 changes: 2 additions & 1 deletion src/main/webapp/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,8 @@ <h3>System Log</h3>
.catch(error => console.error('Error fetching pressure status:', error));
}

const API_BASE = '/' + window.location.pathname.split('/')[1] + '/api';
const contextRoot = window.location.pathname.split('/')[1];
const API_BASE = (contextRoot ? '/' + contextRoot : '') + '/api';

const ALERT_THRESHOLDS = {
p99Ms: { warning: 10, critical: 50 },
Expand Down
20 changes: 10 additions & 10 deletions src/test/java/fish/payara/trader/pressure/AllocationModeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void shouldHaveCorrectAllocationPerIterationValues(AllocationMode mode) {
switch (mode) {
case OFF -> 0;
case LOW -> 20;
case MEDIUM -> 195;
case MEDIUM -> 200;
case HIGH -> 10000;
case EXTREME -> 40000;
};
Expand Down Expand Up @@ -117,11 +117,11 @@ void shouldCalculateBytesPerSecondCorrectlyForLowMode() {
@Test
@DisplayName("Should calculate bytes per second correctly for MEDIUM mode")
void shouldCalculateBytesPerSecondCorrectlyForMediumMode() {
// 195 allocations * 10240 bytes * 10 iterations/sec = 19,968,000 bytes/sec
// 200 allocations * 10240 bytes * 10 iterations/sec = 20,480,000 bytes/sec
assertEquals(
19_968_000L,
20_480_000L,
AllocationMode.MEDIUM.getBytesPerSecond(),
"MEDIUM mode should allocate ~20 MB/sec");
"MEDIUM mode should allocate 20 MB/sec");
}

@Test
Expand Down Expand Up @@ -194,16 +194,16 @@ void shouldHaveSignificantGapsBetweenModes() {
"LOW should be significantly higher than OFF");

assertTrue(
AllocationMode.MEDIUM.getBytesPerSecond() > AllocationMode.LOW.getBytesPerSecond() * 9,
"MEDIUM should be at least 9x higher than LOW");
AllocationMode.MEDIUM.getBytesPerSecond() > AllocationMode.LOW.getBytesPerSecond() * 5,
"MEDIUM should be at least 5x higher than LOW");

assertTrue(
AllocationMode.HIGH.getBytesPerSecond() > AllocationMode.MEDIUM.getBytesPerSecond() * 50,
"HIGH should be at least 50x higher than MEDIUM");
AllocationMode.HIGH.getBytesPerSecond() > AllocationMode.MEDIUM.getBytesPerSecond() * 40,
"HIGH should be at least 40x higher than MEDIUM");

assertTrue(
AllocationMode.EXTREME.getBytesPerSecond() >= AllocationMode.HIGH.getBytesPerSecond() * 4,
"EXTREME should be at least 4x higher than HIGH");
AllocationMode.EXTREME.getBytesPerSecond() > AllocationMode.HIGH.getBytesPerSecond() * 2,
"EXTREME should be at least 2x higher than HIGH");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void shouldReturnAllModes() {
(Map<String, Map<String, Object>>) response.getEntity();
assertTrue(entity.containsKey("EXTREME"));
assertEquals(
"4 GB/sec - Extreme flash crash simulation",
"4 GB/sec - Extreme pressure",
((Map<String, Object>) entity.get("EXTREME")).get("description"));
}
}
13 changes: 10 additions & 3 deletions start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ case "$ACTION" in
echo " > Dockerfile (Azul) + MODE=AERON"
echo ""
MODE=AERON run_compose -f docker-compose.yml up -d --build --force-recreate
echo ""
echo "🌐 Access: http://localhost:8080/trader-stream-ee/"
;;

azul-direct)
Expand All @@ -55,6 +57,8 @@ case "$ACTION" in
echo " ℹ️ Observe how C4 handles high-allocation legacy code."
echo ""
MODE=DIRECT run_compose -f docker-compose.yml up -d --build --force-recreate
echo ""
echo "🌐 Access: http://localhost:8080/trader-stream-ee/"
;;

# --- Standard OpenJDK (G1 GC) Scenarios ---
Expand All @@ -65,6 +69,8 @@ case "$ACTION" in
echo " ℹ️ Baseline performance: High allocation on G1GC."
echo ""
MODE=DIRECT run_compose -f docker-compose-standard.yml up -d --build --force-recreate
echo ""
echo "🌐 Access: http://localhost:8080/trader-stream-ee/"
;;

standard-aeron)
Expand All @@ -73,6 +79,8 @@ case "$ACTION" in
echo " ℹ️ Observe if off-heap transport helps G1GC."
echo ""
MODE=AERON run_compose -f docker-compose-standard.yml up -d --build --force-recreate
echo ""
echo "🌐 Access: http://localhost:8080/trader-stream-ee/"
;;

# --- Clustered Scenarios ---
Expand Down Expand Up @@ -115,6 +123,8 @@ case "$ACTION" in
echo " ℹ️ Test cluster with high-allocation legacy mode."
echo ""
MODE=DIRECT DOCKERFILE=Dockerfile.scale run_compose -f docker-compose-scale.yml up -d --build --force-recreate
echo ""
echo "🌐 Access: http://localhost:8080/trader-stream-ee/"
;;

cluster-dynamic)
Expand Down Expand Up @@ -245,9 +255,6 @@ case "$ACTION" in
echo " stop - Stop the application"
echo " clean - Stop and clean all containers/volumes"
echo ""
echo "Documentation:"
echo " See SCALABILITY.md for clustering details"
echo ""
exit 1
;;
esac