@@ -2117,6 +2117,7 @@ async def run(self):
21172117 pass
21182118 except Exception as cancellation_exception :
21192119 self ._logger .error (f"Uncaught error while cancelling entity batch work item: { cancellation_exception } " )
2120+ self .shutdown ()
21202121
21212122 async def _consume_queue (self , queue : asyncio .Queue , semaphore : asyncio .Semaphore ):
21222123 # List to track running tasks
@@ -2139,16 +2140,18 @@ async def _consume_queue(self, queue: asyncio.Queue, semaphore: asyncio.Semaphor
21392140 func , cancellation_func , args , kwargs = work
21402141 # Create a concurrent task for processing
21412142 task = asyncio .create_task (
2142- self ._process_work_item (semaphore , queue , func , args , kwargs )
2143+ self ._process_work_item (semaphore , queue , func , cancellation_func , args , kwargs )
21432144 )
21442145 running_tasks .add (task )
21452146
21462147 async def _process_work_item (
2147- self , semaphore : asyncio .Semaphore , queue : asyncio .Queue , func , args , kwargs
2148+ self , semaphore : asyncio .Semaphore , queue : asyncio .Queue , func , cancellation_func , args , kwargs
21482149 ):
21492150 async with semaphore :
21502151 try :
21512152 await self ._run_func (func , * args , ** kwargs )
2153+ except Exception as work_exception :
2154+ await self ._run_func (cancellation_func , * args , ** kwargs )
21522155 finally :
21532156 queue .task_done ()
21542157
@@ -2168,6 +2171,8 @@ async def _run_func(self, func, *args, **kwargs):
21682171 )
21692172
21702173 def submit_activity (self , func , cancellation_func , * args , ** kwargs ):
2174+ if self ._shutdown :
2175+ raise RuntimeError ("Cannot submit new work items after shutdown has been initiated." )
21712176 work_item = (func , cancellation_func , args , kwargs )
21722177 self ._ensure_queues_for_current_loop ()
21732178 if self .activity_queue is not None :
@@ -2177,6 +2182,8 @@ def submit_activity(self, func, cancellation_func, *args, **kwargs):
21772182 self ._pending_activity_work .append (work_item )
21782183
21792184 def submit_orchestration (self , func , cancellation_func , * args , ** kwargs ):
2185+ if self ._shutdown :
2186+ raise RuntimeError ("Cannot submit new work items after shutdown has been initiated." )
21802187 work_item = (func , cancellation_func , args , kwargs )
21812188 self ._ensure_queues_for_current_loop ()
21822189 if self .orchestration_queue is not None :
@@ -2186,6 +2193,8 @@ def submit_orchestration(self, func, cancellation_func, *args, **kwargs):
21862193 self ._pending_orchestration_work .append (work_item )
21872194
21882195 def submit_entity_batch (self , func , cancellation_func , * args , ** kwargs ):
2196+ if self ._shutdown :
2197+ raise RuntimeError ("Cannot submit new work items after shutdown has been initiated." )
21892198 work_item = (func , cancellation_func , args , kwargs )
21902199 self ._ensure_queues_for_current_loop ()
21912200 if self .entity_batch_queue is not None :
0 commit comments