diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/HistoryEventProcessor.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/HistoryEventProcessor.java index 73d9f0aa..557ca476 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/HistoryEventProcessor.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/HistoryEventProcessor.java @@ -53,9 +53,21 @@ public TestResult processEvents(List events, TypeToken outputTy } switch (eventType) { - case EXECUTION_STARTED, INVOCATION_COMPLETED -> { + case EXECUTION_STARTED -> { // Execution started - no action needed, just track the event } + case INVOCATION_COMPLETED -> { + var details = event.invocationCompletedDetails(); + if (details != null + && details.error() != null + && details.error().payload() != null) { + // This will get overridden by the execution events but + // the test cases will still be able to see the error + // if the execution succeeds. + status = ExecutionStatus.FAILED; + error = details.error().payload(); + } + } case EXECUTION_SUCCEEDED -> { status = ExecutionStatus.SUCCEEDED; var details = event.executionSucceededDetails(); diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestResult.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestResult.java index 537f4e60..c8d8202b 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestResult.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/TestResult.java @@ -51,6 +51,10 @@ public class TestResult { /** Returns the execution status (SUCCEEDED, FAILED, or PENDING). */ public ExecutionStatus getStatus() { + if (status == ExecutionStatus.SUCCEEDED && error != null) { + throw new IllegalStateException( + "Execution succeeded while invocation failed with: " + error.errorMessage()); + } return status; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java index 3c84b43f..f4ac3cf2 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java @@ -223,11 +223,25 @@ public void deregisterActiveThread(String threadId) { if (activeThreads.isEmpty()) { logger.info("No active threads remaining - suspending execution"); + preSuspendCheck(); suspendExecution(); } } } + private void preSuspendCheck() { + var hasAnyPendingOperation = operationStorage.values().stream().anyMatch(o -> switch (o.type()) { + case STEP -> o.status() == OperationStatus.PENDING; + case WAIT, CALLBACK -> o.status() == OperationStatus.STARTED; + case CHAINED_INVOKE -> o.status() == OperationStatus.PENDING || o.status() == OperationStatus.STARTED; + default -> false; + }); + + if (!hasAnyPendingOperation) { + logger.warn("Invalid suspension. No operation is pending"); + } + } + // ===== Checkpointing ===== // This method will checkpoint the operation updates to the durable backend and return a future which completes diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index 3acd00bc..4312a5ab 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -224,9 +224,11 @@ protected Operation waitForOperationCompletion() { return op; } - protected void runUserHandler(Runnable runnable, String contextId, ThreadType threadType) { + protected void runUserHandler(Runnable runnable, ThreadType threadType) { + String operationId = getOperationId(); + logger.debug("Starting user handler for operation {} ({})", operationId, threadType); Runnable wrapped = () -> { - executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType)); + executionManager.setCurrentThreadContext(new ThreadContext(operationId, threadType)); try { runnable.run(); } catch (Throwable throwable) { @@ -239,11 +241,11 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th "An unhandled exception is thrown from user function: " + throwable); } } finally { - if (contextId != null) { + if (operationId != null) { try { // if this is a child context or a step context, we need to // deregister the context's thread from the execution manager - executionManager.deregisterActiveThread(contextId); + executionManager.deregisterActiveThread(operationId); } catch (SuspendExecutionException e) { // Expected when this is the last active thread. Must catch here because: // 1/ This runs in a worker thread detached from handlerFuture @@ -257,8 +259,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th }; // runUserHandler is used to ensure that only one user handler is running at a time - if (runningUserHandler.get() != null) { - throw new IllegalStateException("User handler already running"); + if (runningUserHandler.get() != null && !runningUserHandler.get().isDone()) { + logger.error("User handler already running for operation {} ({})", getOperationId(), threadType); + throw terminateExecutionWithIllegalDurableOperationException( + "User handler already running: " + getOperationId()); } // Thread registration is intentionally split across two threads: @@ -267,14 +271,10 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th // 2. setCurrentContext on the CHILD thread — sets the ThreadLocal so operations inside // the child context know which context they belong to. // registerActiveThread is idempotent (no-op if already registered). - registerActiveThread(contextId); + registerActiveThread(operationId); - if (!runningUserHandler.compareAndSet( - null, - CompletableFuture.runAsync( - wrapped, getContext().getDurableConfig().getExecutorService()))) { - throw new IllegalStateException("User handler already running"); - } + runningUserHandler.set(CompletableFuture.runAsync( + wrapped, getContext().getDurableConfig().getExecutorService())); } /** diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index defc5efd..bc17f13f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -129,7 +129,7 @@ private void executeChildContext() { }; // Execute user provided child context code in user-configured executor - runUserHandler(userHandler, contextId, ThreadType.CONTEXT); + runUserHandler(userHandler, ThreadType.CONTEXT); } private void handleChildContextSuccess(T result) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index 31f9d43c..d018da2f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -210,7 +210,7 @@ protected void executeItems() { } }; // run consumer in the user thread pool, although it's not a real user thread - runUserHandler(consumer, getOperationId(), ThreadType.CONTEXT); + runUserHandler(consumer, ThreadType.CONTEXT); } private void handleException(Throwable ex) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index b7d72d34..26b97f5f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -112,7 +112,7 @@ private void executeStepLogic(int attempt) { }; // Execute user provided step code in user-configured executor - runUserHandler(userHandler, getOperationId(), ThreadType.STEP); + runUserHandler(userHandler, ThreadType.STEP); } private void checkpointStarted() { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index 64a10f6f..fa8bb7ff 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -167,7 +167,7 @@ private void executeCheckLogic(T currentState, int attempt) { } }; - runUserHandler(userHandler, getOperationId(), ThreadType.STEP); + runUserHandler(userHandler, ThreadType.STEP); } private void handleCheckFailure(Throwable exception) {