Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,21 @@ public <O> TestResult<O> processEvents(List<Event> events, TypeToken<O> outputTy
}

switch (eventType) {
case EXECUTION_STARTED, INVOCATION_COMPLETED -> {
case EXECUTION_STARTED -> {
// Execution started - no action needed, just track the event
}
case INVOCATION_COMPLETED -> {
var details = event.invocationCompletedDetails();
if (details != null
&& details.error() != null
&& details.error().payload() != null) {
// This will get overridden by the execution events but
// the test cases will still be able to see the error
// if the execution succeeds.
status = ExecutionStatus.FAILED;
error = details.error().payload();
}
}
case EXECUTION_SUCCEEDED -> {
status = ExecutionStatus.SUCCEEDED;
var details = event.executionSucceededDetails();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class TestResult<O> {

/** Returns the execution status (SUCCEEDED, FAILED, or PENDING). */
public ExecutionStatus getStatus() {
if (status == ExecutionStatus.SUCCEEDED && error != null) {
throw new IllegalStateException(
"Execution succeeded while invocation failed with: " + error.errorMessage());
}
return status;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,25 @@ public void deregisterActiveThread(String threadId) {

if (activeThreads.isEmpty()) {
logger.info("No active threads remaining - suspending execution");
preSuspendCheck();
suspendExecution();
}
}
}

private void preSuspendCheck() {
var hasAnyPendingOperation = operationStorage.values().stream().anyMatch(o -> switch (o.type()) {
case STEP -> o.status() == OperationStatus.PENDING;
case WAIT, CALLBACK -> o.status() == OperationStatus.STARTED;
case CHAINED_INVOKE -> o.status() == OperationStatus.PENDING || o.status() == OperationStatus.STARTED;
default -> false;
});

if (!hasAnyPendingOperation) {
logger.warn("Invalid suspension. No operation is pending");
}
}

// ===== Checkpointing =====

// This method will checkpoint the operation updates to the durable backend and return a future which completes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,11 @@ protected Operation waitForOperationCompletion() {
return op;
}

protected void runUserHandler(Runnable runnable, String contextId, ThreadType threadType) {
protected void runUserHandler(Runnable runnable, ThreadType threadType) {
String operationId = getOperationId();
logger.debug("Starting user handler for operation {} ({})", operationId, threadType);
Runnable wrapped = () -> {
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
executionManager.setCurrentThreadContext(new ThreadContext(operationId, threadType));
try {
runnable.run();
} catch (Throwable throwable) {
Expand All @@ -239,11 +241,11 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
"An unhandled exception is thrown from user function: " + throwable);
}
} finally {
if (contextId != null) {
if (operationId != null) {
try {
// if this is a child context or a step context, we need to
// deregister the context's thread from the execution manager
executionManager.deregisterActiveThread(contextId);
executionManager.deregisterActiveThread(operationId);
} catch (SuspendExecutionException e) {
// Expected when this is the last active thread. Must catch here because:
// 1/ This runs in a worker thread detached from handlerFuture
Expand All @@ -257,8 +259,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
};

// runUserHandler is used to ensure that only one user handler is running at a time
if (runningUserHandler.get() != null) {
throw new IllegalStateException("User handler already running");
if (runningUserHandler.get() != null && !runningUserHandler.get().isDone()) {
logger.error("User handler already running for operation {} ({})", getOperationId(), threadType);
throw terminateExecutionWithIllegalDurableOperationException(
"User handler already running: " + getOperationId());
}

// Thread registration is intentionally split across two threads:
Expand All @@ -267,14 +271,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
// 2. setCurrentContext on the CHILD thread — sets the ThreadLocal so operations inside
// the child context know which context they belong to.
// registerActiveThread is idempotent (no-op if already registered).
registerActiveThread(contextId);
registerActiveThread(operationId);

if (!runningUserHandler.compareAndSet(
null,
CompletableFuture.runAsync(
wrapped, getContext().getDurableConfig().getExecutorService()))) {
throw new IllegalStateException("User handler already running");
}
runningUserHandler.set(CompletableFuture.runAsync(
wrapped, getContext().getDurableConfig().getExecutorService()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private void executeChildContext() {
};

// Execute user provided child context code in user-configured executor
runUserHandler(userHandler, contextId, ThreadType.CONTEXT);
runUserHandler(userHandler, ThreadType.CONTEXT);
}

private void handleChildContextSuccess(T result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected void executeItems() {
}
};
// run consumer in the user thread pool, although it's not a real user thread
runUserHandler(consumer, getOperationId(), ThreadType.CONTEXT);
runUserHandler(consumer, ThreadType.CONTEXT);
}

private void handleException(Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void executeStepLogic(int attempt) {
};

// Execute user provided step code in user-configured executor
runUserHandler(userHandler, getOperationId(), ThreadType.STEP);
runUserHandler(userHandler, ThreadType.STEP);
}

private void checkpointStarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private void executeCheckLogic(T currentState, int attempt) {
}
};

runUserHandler(userHandler, getOperationId(), ThreadType.STEP);
runUserHandler(userHandler, ThreadType.STEP);
}

private void handleCheckFailure(Throwable exception) {
Expand Down
Loading