From 8f6a3e2ff4b59f7e1c49a81cdf4d8d6d385c1a49 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 16:55:49 -0600 Subject: [PATCH 1/2] fix: signal when dt reader stream is ready within wf client start call Signed-off-by: Samantha Coyle --- .../dapr/ext/workflow/workflow_runtime.py | 109 ++++++++++++++++-- 1 file changed, 98 insertions(+), 11 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 593e55c68..4b28310ff 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -14,6 +14,7 @@ """ import inspect +import time from functools import wraps from typing import Optional, Sequence, TypeVar, Union @@ -56,6 +57,7 @@ def __init__( maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -86,10 +88,19 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): """Responsible to call Workflow function in orchestrationWrapper""" - daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) - if inp is None: - return fn(daprWfContext) - return fn(daprWfContext, inp) + instance_id = getattr(ctx, 'instance_id', 'unknown') + + try: + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) + if inp is None: + result = fn(daprWfContext) + else: + result = fn(daprWfContext, inp) + return result + except Exception as e: + self._logger.exception(f"Workflow execution failed - " + f"instance_id: {instance_id}, error: {e}") + raise if hasattr(fn, '_workflow_registered'): # whenever a workflow is registered, it has a _dapr_alternate_name attribute @@ -116,10 +127,19 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" - wfActivityContext = WorkflowActivityContext(ctx) - if inp is None: - return fn(wfActivityContext) - return fn(wfActivityContext, inp) + activity_id = getattr(ctx, 'task_id', 'unknown') + + try: + wfActivityContext = WorkflowActivityContext(ctx) + if inp is None: + result = fn(wfActivityContext) + else: + result = fn(wfActivityContext, inp) + return result + except Exception as e: + self._logger.exception(f"Activity execution failed - " + f"task_id: {activity_id}, error: {e}") + raise if hasattr(fn, '_activity_registered'): # whenever an activity is registered, it has a _dapr_alternate_name attribute @@ -138,13 +158,80 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): ) fn.__dict__['_activity_registered'] = True + def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: + """ + Wait for the worker's gRPC stream to become ready to receive work items. + + This method polls the worker's is_worker_ready() method until it returns True + or the timeout is reached. + + Args: + timeout: Maximum time in seconds to wait for the worker to be ready. + Defaults to 30 seconds. + + Returns: + True if the worker's gRPC stream is ready to receive work items, False if timeout. + """ + if not hasattr(self.__worker, 'is_worker_ready'): + return False + + elapsed = 0.0 + poll_interval = 0.1 # 100ms + + while elapsed < timeout: + if self.__worker.is_worker_ready(): + return True + time.sleep(poll_interval) + elapsed += poll_interval + + self._logger.warning( + f"WorkflowRuntime worker readiness check timed out after {timeout} seconds" + ) + return False + def start(self): - """Starts the listening for work items on a background thread.""" - self.__worker.start() + """Starts the listening for work items on a background thread. + + This method waits for the worker's gRPC stream to be fully initialized + before returning, ensuring that workflows can be scheduled immediately + after start() completes. + """ + try: + try: + self.__worker.start() + except Exception as start_error: + self._logger.exception( + f"WorkflowRuntime worker did not start: {start_error}" + ) + raise + + # Verify the worker and its stream reader are ready + if hasattr(self.__worker, 'is_worker_ready'): + try: + is_ready = self.wait_for_worker_ready(timeout=5.0) + if not is_ready: + raise RuntimeError("WorkflowRuntime worker and its stream are not ready") + else: + self._logger.debug("WorkflowRuntime worker is ready and its stream can receive work items") + except Exception as ready_error: + self._logger.exception( + f"WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}" + ) + raise ready_error + else: + self._logger.warning( + "Unable to verify stream readiness. Workflows scheduled immediately may not be received." + ) + except Exception: + raise + def shutdown(self): """Stops the listening for work items on a background thread.""" - self.__worker.stop() + try: + self.__worker.stop() + except Exception: + raise def workflow(self, __fn: Workflow = None, *, name: Optional[str] = None): """Decorator to register a workflow function. From bace1e478c83a723b17ae71cc7257229bb499132 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 17:02:13 -0600 Subject: [PATCH 2/2] style: appease linter Signed-off-by: Samantha Coyle --- .../dapr/ext/workflow/workflow_runtime.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 9def0900b..bd28187f4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -57,7 +57,7 @@ def __init__( maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) - + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -89,7 +89,7 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): """Responsible to call Workflow function in orchestrationWrapper""" instance_id = getattr(ctx, 'instance_id', 'unknown') - + try: daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) if inp is None: @@ -164,7 +164,7 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" activity_id = getattr(ctx, 'task_id', 'unknown') - + try: wfActivityContext = WorkflowActivityContext(ctx) if inp is None: @@ -210,16 +210,16 @@ def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: """ if not hasattr(self.__worker, 'is_worker_ready'): return False - + elapsed = 0.0 poll_interval = 0.1 # 100ms - + while elapsed < timeout: if self.__worker.is_worker_ready(): return True time.sleep(poll_interval) elapsed += poll_interval - + self._logger.warning( f"WorkflowRuntime worker readiness check timed out after {timeout} seconds" ) @@ -240,7 +240,7 @@ def start(self): f"WorkflowRuntime worker did not start: {start_error}" ) raise - + # Verify the worker and its stream reader are ready if hasattr(self.__worker, 'is_worker_ready'): try: @@ -260,7 +260,7 @@ def start(self): ) except Exception: raise - + def shutdown(self): """Stops the listening for work items on a background thread."""