From a58d43b62eba7ecdb1c29c3bb58e98c645e0d6ae Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 16:14:52 -0600 Subject: [PATCH 1/3] fix: signal when stream reader thread is ready + logs + try/catch blocks Signed-off-by: Samantha Coyle --- durabletask/worker.py | 63 ++++++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 2d87d6e..b960331 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -282,7 +282,7 @@ def __init__( self._channel_options = channel_options self._stop_timeout = stop_timeout self._current_channel: Optional[grpc.Channel] = None # Store channel reference for cleanup - + self._stream_ready = threading.Event() # Use provided concurrency options or create default ones self._concurrency_options = ( concurrency_options if concurrency_options is not None else ConcurrencyOptions() @@ -298,7 +298,7 @@ def __init__( else: self._interceptors = None - self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options) + self._async_worker_manager = _AsyncWorkerManager(self._concurrency_options, self._logger) @property def concurrency_options(self) -> ConcurrencyOptions: @@ -323,6 +323,9 @@ def add_activity(self, fn: task.Activity) -> str: raise RuntimeError("Activities cannot be added while the worker is running.") return self._registry.add_activity(fn) + def is_worker_ready(self) -> bool: + return self._stream_ready.is_set() and self._is_running + def start(self): """Starts the worker on a background thread and begins listening for work items.""" if self._is_running: @@ -336,6 +339,8 @@ def run_loop(): self._logger.info(f"Starting gRPC worker that connects to {self._host_address}") self._runLoop = Thread(target=run_loop, name="WorkerRunLoop") self._runLoop.start() + if not self._stream_ready.wait(timeout=10): + raise RuntimeError("Failed to establish work item stream connection within 10 seconds") self._is_running = True # TODO: refactor this to be more readable and maintainable. @@ -446,26 +451,31 @@ def should_invalidate_connection(rpc_error): maxConcurrentOrchestrationWorkItems=self._concurrency_options.maximum_concurrent_orchestration_work_items, maxConcurrentActivityWorkItems=self._concurrency_options.maximum_concurrent_activity_work_items, ) - self._response_stream = stub.GetWorkItems(get_work_items_request) - self._logger.info( - f"Successfully connected to {self._host_address}. Waiting for work items..." - ) + try: + self._response_stream = stub.GetWorkItems(get_work_items_request) + self._logger.info( + f"Successfully connected to {self._host_address}. Waiting for work items..." + ) + except Exception as e: + raise # Use a thread to read from the blocking gRPC stream and forward to asyncio import queue - work_item_queue = queue.Queue() SHUTDOWN_SENTINEL = None # NOTE: This is equivalent to the Durabletask Go goroutine calling stream.Recv() in worker_grpc.go StartWorkItemListener() def stream_reader(): try: + if self._response_stream is None: + return stream = self._response_stream # Use next() to allow shutdown check between items # This matches Go's pattern: check ctx.Err() after each stream.Recv() while True: if self._shutdown.is_set(): + self._logger.debug("Stream reader: shutdown detected, exiting loop") break try: @@ -502,25 +512,32 @@ def stream_reader(): self._logger.debug( f"Stream reader: exception during shutdown: {type(stream_error).__name__}: {stream_error}" ) + break # Other stream errors - put in queue for async loop to handle - self._logger.warning( - f"Stream reader: unexpected error: {stream_error}" + self._logger.error( + f"Stream reader: unexpected error: {type(stream_error).__name__}: {stream_error}", + exc_info=True ) raise except Exception as e: + self._logger.exception( + f"Stream reader: fatal exception in stream_reader: {type(e).__name__}: {e}", + exc_info=True + ) if not self._shutdown.is_set(): - work_item_queue.put(e) + try: + work_item_queue.put(e) + except Exception as queue_error: + self._logger.error(f"Stream reader: failed to put exception in queue: {queue_error}") finally: # signal that the stream reader is done (ie matching Go's context cancellation) try: work_item_queue.put(SHUTDOWN_SENTINEL) - except Exception: + except Exception as queue_error: # queue might be closed so ignore this pass - import threading - # Use non-daemon thread (daemon=False) to ensure proper resource cleanup. # Daemon threads exit immediately when the main program exits, which prevents # cleanup of gRPC channel resources and OTel interceptors. Non-daemon threads @@ -528,7 +545,13 @@ def stream_reader(): current_reader_thread = threading.Thread( target=stream_reader, daemon=False, name="StreamReader" ) - current_reader_thread.start() + + try: + current_reader_thread.start() + self._stream_ready.set() + except Exception: + raise + loop = asyncio.get_running_loop() # NOTE: This is a blocking call that will wait for a work item to become available or the shutdown sentinel @@ -1693,7 +1716,7 @@ def _is_suspendable(event: pb.HistoryEvent) -> bool: class _AsyncWorkerManager: - def __init__(self, concurrency_options: ConcurrencyOptions): + def __init__(self, concurrency_options: ConcurrencyOptions, logger: logging.Logger): self.concurrency_options = concurrency_options self.activity_semaphore = None self.orchestration_semaphore = None @@ -1709,6 +1732,7 @@ def __init__(self, concurrency_options: ConcurrencyOptions): thread_name_prefix="DurableTask", ) self._shutdown = False + self._logger = logger def _ensure_queues_for_current_loop(self): """Ensure queues are bound to the current event loop.""" @@ -1716,7 +1740,8 @@ def _ensure_queues_for_current_loop(self): current_loop = asyncio.get_running_loop() if current_loop.is_closed(): return - except RuntimeError: + except RuntimeError as e: + self._logger.exception(f"Failed to get event loop {e}") # No event loop running, can't create queues return @@ -1735,14 +1760,16 @@ def _ensure_queues_for_current_loop(self): try: while not self.activity_queue.empty(): existing_activity_items.append(self.activity_queue.get_nowait()) - except Exception: + except Exception as e: + self._logger.debug(f"Failed to append to the activity queue {e}") pass if self.orchestration_queue is not None: try: while not self.orchestration_queue.empty(): existing_orchestration_items.append(self.orchestration_queue.get_nowait()) - except Exception: + except Exception as e: + self._logger.debug(f"Failed to append to the orchestration queue {e}") pass # Create fresh queues for the current event loop From 8250aa9f9aade3d77afa70b56d3cdbb4cb745e31 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 16:18:27 -0600 Subject: [PATCH 2/3] style: clean up Signed-off-by: Samantha Coyle --- durabletask/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index b960331..5cedb04 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -534,7 +534,7 @@ def stream_reader(): # signal that the stream reader is done (ie matching Go's context cancellation) try: work_item_queue.put(SHUTDOWN_SENTINEL) - except Exception as queue_error: + except Exception: # queue might be closed so ignore this pass From 3414d5a1426914b29380666f9c288915475c24f3 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 16:19:50 -0600 Subject: [PATCH 3/3] style: appease linter Signed-off-by: Samantha Coyle --- durabletask/worker.py | 58 +++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 5cedb04..8e42048 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -26,9 +26,11 @@ TInput = TypeVar("TInput") TOutput = TypeVar("TOutput") + class VersionNotRegisteredException(Exception): pass + def _log_all_threads(logger: logging.Logger, context: str = ""): """Helper function to log all currently active threads for debugging.""" active_threads = threading.enumerate() @@ -100,7 +102,9 @@ def __init__(self): self.latest_versioned_orchestrators_version_name = {} self.activities = {} - def add_orchestrator(self, fn: task.Orchestrator, version_name: Optional[str] = None, is_latest: bool = False) -> str: + def add_orchestrator( + self, fn: task.Orchestrator, version_name: Optional[str] = None, is_latest: bool = False + ) -> str: if fn is None: raise ValueError("An orchestrator function argument is required.") @@ -108,7 +112,13 @@ def add_orchestrator(self, fn: task.Orchestrator, version_name: Optional[str] = self.add_named_orchestrator(name, fn, version_name, is_latest) return name - def add_named_orchestrator(self, name: str, fn: task.Orchestrator, version_name: Optional[str] = None, is_latest: bool = False) -> None: + def add_named_orchestrator( + self, + name: str, + fn: task.Orchestrator, + version_name: Optional[str] = None, + is_latest: bool = False, + ) -> None: if not name: raise ValueError("A non-empty orchestrator name is required.") @@ -120,12 +130,16 @@ def add_named_orchestrator(self, name: str, fn: task.Orchestrator, version_name: if name not in self.versioned_orchestrators: self.versioned_orchestrators[name] = {} if version_name in self.versioned_orchestrators[name]: - raise ValueError(f"The version '{version_name}' of '{name}' orchestrator already exists.") + raise ValueError( + f"The version '{version_name}' of '{name}' orchestrator already exists." + ) self.versioned_orchestrators[name][version_name] = fn if is_latest: self.latest_versioned_orchestrators_version_name[name] = version_name - def get_orchestrator(self, name: str, version_name: Optional[str] = None) -> Optional[tuple[task.Orchestrator, str]]: + def get_orchestrator( + self, name: str, version_name: Optional[str] = None + ) -> Optional[tuple[task.Orchestrator, str]]: if name in self.orchestrators: return self.orchestrators.get(name), None @@ -325,7 +339,7 @@ def add_activity(self, fn: task.Activity) -> str: def is_worker_ready(self) -> bool: return self._stream_ready.is_set() and self._is_running - + def start(self): """Starts the worker on a background thread and begins listening for work items.""" if self._is_running: @@ -456,11 +470,12 @@ def should_invalidate_connection(rpc_error): self._logger.info( f"Successfully connected to {self._host_address}. Waiting for work items..." ) - except Exception as e: + except Exception: raise # Use a thread to read from the blocking gRPC stream and forward to asyncio import queue + work_item_queue = queue.Queue() SHUTDOWN_SENTINEL = None @@ -516,20 +531,22 @@ def stream_reader(): # Other stream errors - put in queue for async loop to handle self._logger.error( f"Stream reader: unexpected error: {type(stream_error).__name__}: {stream_error}", - exc_info=True + exc_info=True, ) raise except Exception as e: self._logger.exception( f"Stream reader: fatal exception in stream_reader: {type(e).__name__}: {e}", - exc_info=True + exc_info=True, ) if not self._shutdown.is_set(): try: work_item_queue.put(e) except Exception as queue_error: - self._logger.error(f"Stream reader: failed to put exception in queue: {queue_error}") + self._logger.error( + f"Stream reader: failed to put exception in queue: {queue_error}" + ) finally: # signal that the stream reader is done (ie matching Go's context cancellation) try: @@ -551,7 +568,7 @@ def stream_reader(): self._stream_ready.set() except Exception: raise - + loop = asyncio.get_running_loop() # NOTE: This is a blocking call that will wait for a work item to become available or the shutdown sentinel @@ -783,7 +800,6 @@ def _execute_orchestrator( version = version or pb.OrchestrationVersion() version.patches.extend(result.patches) - res = pb.OrchestratorResponse( instanceId=req.instanceId, actions=result.actions, @@ -955,14 +971,12 @@ def set_failed(self, ex: Exception): ) self._pending_actions[action.id] = action - def set_version_not_registered(self): self._pending_actions.clear() self._completion_status = pb.ORCHESTRATION_STATUS_STALLED action = ph.new_orchestrator_version_not_available_action(self.next_sequence_number()) self._pending_actions[action.id] = action - def set_continued_as_new(self, new_input: Any, save_events: bool): if self._is_complete: return @@ -1173,7 +1187,6 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None: self.set_continued_as_new(new_input, save_events) - def is_patched(self, patch_name: str) -> bool: is_patched = self._is_patched(patch_name) if is_patched: @@ -1201,7 +1214,13 @@ class ExecutionResults: version_name: Optional[str] patches: Optional[list[str]] - def __init__(self, actions: list[pb.OrchestratorAction], encoded_custom_status: Optional[str], version_name: Optional[str] = None, patches: Optional[list[str]] = None): + def __init__( + self, + actions: list[pb.OrchestratorAction], + encoded_custom_status: Optional[str], + version_name: Optional[str] = None, + patches: Optional[list[str]] = None, + ): self.actions = actions self.encoded_custom_status = encoded_custom_status self.version_name = version_name @@ -1277,8 +1296,8 @@ def execute( return ExecutionResults( actions=actions, encoded_custom_status=ctx._encoded_custom_status, - version_name=getattr(ctx, '_version_name', None), - patches=ctx._encountered_patches + version_name=getattr(ctx, "_version_name", None), + patches=ctx._encountered_patches, ) def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None: @@ -1306,9 +1325,10 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven if ctx._orchestrator_version_name: version_name = ctx._orchestrator_version_name - # TODO: Check if we already started the orchestration - fn, version_used = self._registry.get_orchestrator(event.executionStarted.name, version_name=version_name) + fn, version_used = self._registry.get_orchestrator( + event.executionStarted.name, version_name=version_name + ) if fn is None: raise OrchestratorNotRegisteredError(