diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 58b0912a..bd28187f 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -14,6 +14,7 @@ """ import inspect +import time from functools import wraps from typing import Optional, Sequence, TypeVar, Union @@ -56,6 +57,7 @@ def __init__( maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -86,10 +88,19 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): """Responsible to call Workflow function in orchestrationWrapper""" - daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) - if inp is None: - return fn(daprWfContext) - return fn(daprWfContext, inp) + instance_id = getattr(ctx, 'instance_id', 'unknown') + + try: + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) + if inp is None: + result = fn(daprWfContext) + else: + result = fn(daprWfContext, inp) + return result + except Exception as e: + self._logger.exception(f"Workflow execution failed - " + f"instance_id: {instance_id}, error: {e}") + raise if hasattr(fn, '_workflow_registered'): # whenever a workflow is registered, it has a _dapr_alternate_name attribute @@ -152,10 +163,19 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" - wfActivityContext = WorkflowActivityContext(ctx) - if inp is None: - return fn(wfActivityContext) - return fn(wfActivityContext, inp) + activity_id = getattr(ctx, 'task_id', 'unknown') + + try: + wfActivityContext = WorkflowActivityContext(ctx) + if inp is None: + result = fn(wfActivityContext) + else: + result = fn(wfActivityContext, inp) + return result + except Exception as e: + self._logger.exception(f"Activity execution failed - " + f"task_id: {activity_id}, error: {e}") + raise if hasattr(fn, '_activity_registered'): # whenever an activity is registered, it has a _dapr_alternate_name attribute @@ -174,13 +194,80 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): ) fn.__dict__['_activity_registered'] = True + def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: + """ + Wait for the worker's gRPC stream to become ready to receive work items. + + This method polls the worker's is_worker_ready() method until it returns True + or the timeout is reached. + + Args: + timeout: Maximum time in seconds to wait for the worker to be ready. + Defaults to 30 seconds. + + Returns: + True if the worker's gRPC stream is ready to receive work items, False if timeout. + """ + if not hasattr(self.__worker, 'is_worker_ready'): + return False + + elapsed = 0.0 + poll_interval = 0.1 # 100ms + + while elapsed < timeout: + if self.__worker.is_worker_ready(): + return True + time.sleep(poll_interval) + elapsed += poll_interval + + self._logger.warning( + f"WorkflowRuntime worker readiness check timed out after {timeout} seconds" + ) + return False + def start(self): - """Starts the listening for work items on a background thread.""" - self.__worker.start() + """Starts the listening for work items on a background thread. + + This method waits for the worker's gRPC stream to be fully initialized + before returning, ensuring that workflows can be scheduled immediately + after start() completes. + """ + try: + try: + self.__worker.start() + except Exception as start_error: + self._logger.exception( + f"WorkflowRuntime worker did not start: {start_error}" + ) + raise + + # Verify the worker and its stream reader are ready + if hasattr(self.__worker, 'is_worker_ready'): + try: + is_ready = self.wait_for_worker_ready(timeout=5.0) + if not is_ready: + raise RuntimeError("WorkflowRuntime worker and its stream are not ready") + else: + self._logger.debug("WorkflowRuntime worker is ready and its stream can receive work items") + except Exception as ready_error: + self._logger.exception( + f"WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}" + ) + raise ready_error + else: + self._logger.warning( + "Unable to verify stream readiness. Workflows scheduled immediately may not be received." + ) + except Exception: + raise + def shutdown(self): """Stops the listening for work items on a background thread.""" - self.__worker.stop() + try: + self.__worker.stop() + except Exception: + raise def versioned_workflow( self,