Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 98 additions & 11 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

import inspect
import time
from functools import wraps
from typing import Optional, Sequence, TypeVar, Union

Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
maximum_thread_pool_workers: Optional[int] = None,
):
self._logger = Logger('WorkflowRuntime', logger_options)

metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
Expand Down Expand Up @@ -86,10 +88,19 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper"""
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
if inp is None:
return fn(daprWfContext)
return fn(daprWfContext, inp)
instance_id = getattr(ctx, 'instance_id', 'unknown')

try:
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
if inp is None:
result = fn(daprWfContext)
else:
result = fn(daprWfContext, inp)
return result
except Exception as e:
self._logger.exception(f"Workflow execution failed - "
f"instance_id: {instance_id}, error: {e}")
raise

if hasattr(fn, '_workflow_registered'):
# whenever a workflow is registered, it has a _dapr_alternate_name attribute
Expand Down Expand Up @@ -152,10 +163,19 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None):

def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
"""Responsible to call Activity function in activityWrapper"""
wfActivityContext = WorkflowActivityContext(ctx)
if inp is None:
return fn(wfActivityContext)
return fn(wfActivityContext, inp)
activity_id = getattr(ctx, 'task_id', 'unknown')

try:
wfActivityContext = WorkflowActivityContext(ctx)
if inp is None:
result = fn(wfActivityContext)
else:
result = fn(wfActivityContext, inp)
return result
except Exception as e:
self._logger.exception(f"Activity execution failed - "
f"task_id: {activity_id}, error: {e}")
raise

if hasattr(fn, '_activity_registered'):
# whenever an activity is registered, it has a _dapr_alternate_name attribute
Expand All @@ -174,13 +194,80 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
)
fn.__dict__['_activity_registered'] = True

def wait_for_worker_ready(self, timeout: float = 30.0) -> bool:
"""
Wait for the worker's gRPC stream to become ready to receive work items.

This method polls the worker's is_worker_ready() method until it returns True
or the timeout is reached.

Args:
timeout: Maximum time in seconds to wait for the worker to be ready.
Defaults to 30 seconds.

Returns:
True if the worker's gRPC stream is ready to receive work items, False if timeout.
"""
if not hasattr(self.__worker, 'is_worker_ready'):
return False

elapsed = 0.0
poll_interval = 0.1 # 100ms

while elapsed < timeout:
if self.__worker.is_worker_ready():
return True
time.sleep(poll_interval)
elapsed += poll_interval

self._logger.warning(
f"WorkflowRuntime worker readiness check timed out after {timeout} seconds"
)
return False

def start(self):
"""Starts the listening for work items on a background thread."""
self.__worker.start()
"""Starts the listening for work items on a background thread.

This method waits for the worker's gRPC stream to be fully initialized
before returning, ensuring that workflows can be scheduled immediately
after start() completes.
"""
try:
try:
self.__worker.start()
except Exception as start_error:
self._logger.exception(
f"WorkflowRuntime worker did not start: {start_error}"
)
raise

# Verify the worker and its stream reader are ready
if hasattr(self.__worker, 'is_worker_ready'):
try:
is_ready = self.wait_for_worker_ready(timeout=5.0)
if not is_ready:
raise RuntimeError("WorkflowRuntime worker and its stream are not ready")
else:
self._logger.debug("WorkflowRuntime worker is ready and its stream can receive work items")
except Exception as ready_error:
self._logger.exception(
f"WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}"
)
raise ready_error
else:
self._logger.warning(
"Unable to verify stream readiness. Workflows scheduled immediately may not be received."
)
except Exception:
raise


def shutdown(self):
"""Stops the listening for work items on a background thread."""
self.__worker.stop()
try:
self.__worker.stop()
except Exception:
raise

def versioned_workflow(
self,
Expand Down
Loading