1818
1919from workerfacing_api import settings
2020from workerfacing_api .crud import job_tracking
21+ from workerfacing_api .exceptions import JobDeletedException , JobNotAssignedException
2122from workerfacing_api .schemas .queue_jobs import (
2223 EnvironmentTypes ,
2324 JobFilter ,
@@ -457,7 +458,7 @@ def pop(self, environment: EnvironmentTypes, receipt_handle: str) -> bool:
457458 job .workers = ";" .join (job .workers .split (";" ) + [hostname ])
458459 try :
459460 self ._update_job_status (session , job , status = JobStates .pulled )
460- except ValueError :
461+ except JobDeletedException :
461462 # job probably deleted by user
462463 return False
463464 return True
@@ -484,7 +485,7 @@ def get_job(
484485 if hostname :
485486 workers = job .workers .split (";" )
486487 if not workers or hostname != workers [- 1 ]:
487- raise ValueError (
488+ raise JobNotAssignedException (
488489 f"Job with id { job_id } is not assigned to worker { hostname } "
489490 )
490491 return job
@@ -505,11 +506,11 @@ def _update_job_status(
505506 job_id = job .job ["meta" ]["job_id" ]
506507 assert isinstance (job_id , int )
507508 job_tracking .update_job (job_id , status , runtime_details )
508- except ValueError as e :
509+ except JobDeletedException as e :
509510 # job probably deleted by user
510511 session .delete (job )
511512 session .commit ()
512- raise ValueError ( f"Could not update job, probably deleted by user: { e } " )
513+ raise e from e
513514
514515 def update_job_status (
515516 self ,
@@ -545,19 +546,27 @@ def handle_timeouts(
545546 # TODO: increase priority?
546547 job .num_retries += 1
547548 session .add (job )
548- self .update_job_status (
549- job .id ,
550- JobStates .queued ,
551- f"timeout { job .num_retries } (workers tried: { job .workers } )" ,
552- )
553- n_retry += 1
549+ try :
550+ self .update_job_status (
551+ job .id ,
552+ JobStates .queued ,
553+ f"timeout { job .num_retries } (workers tried: { job .workers } )" ,
554+ )
555+ n_retry += 1
556+ except JobDeletedException :
557+ # job probably deleted by user, skip updating status
558+ pass
554559 jobs_failed = jobs_timeout .filter (QueuedJob .num_retries >= max_retries )
555560 for job in jobs_failed :
556- self .update_job_status (
557- job .id ,
558- JobStates .error ,
559- "max retries reached" ,
560- )
561- n_failed += 1
561+ try :
562+ self .update_job_status (
563+ job .id ,
564+ JobStates .error ,
565+ "max retries reached" ,
566+ )
567+ n_failed += 1
568+ except JobDeletedException :
569+ # job probably deleted by user, skip updating status
570+ pass
562571 session .commit ()
563572 return n_retry , n_failed
0 commit comments