6363from gridmap .runner import _heart_beat
6464
6565if DRMAA_PRESENT :
66- from drmaa import (InvalidJobException , JobControlAction ,
67- JOB_IDS_SESSION_ALL , Session )
66+ from drmaa import (ExitTimeoutException , InvalidJobException ,
67+ JobControlAction , JOB_IDS_SESSION_ALL , Session ,
68+ TIMEOUT_NO_WAIT )
6869
6970# Python 2.x backward compatibility
7071if sys .version_info < (3 , 0 ):
@@ -274,7 +275,7 @@ def __init__(self, temp_dir='/scratch'):
274275 # uninitialized field (set in check method)
275276 self .jobs = []
276277 self .ids = []
277- self .session_id = - 1
278+ self .session_id = None
278279 self .id_to_job = {}
279280
280281 def __enter__ (self ):
@@ -291,18 +292,27 @@ def __exit__(self, exc_type, exc_value, exc_tb):
291292 # Always close socket
292293 self .socket .close ()
293294
294- # If we encounter an exception, try to kill all jobs
295- if exc_type is not None :
296- self .logger .info ('Encountered %s, so killing all jobs.' ,
297- exc_type .__name__ )
295+ # Clean up if we have a valid session
296+ if self .session_id is not None :
298297 with Session (self .session_id ) as session :
299- # try to kill off all old jobs
298+ # If we encounter an exception, kill all jobs
299+ if exc_type is not None :
300+ self .logger .info ('Encountered %s, so killing all jobs.' ,
301+ exc_type .__name__ )
302+ # try to kill off all old jobs
303+ try :
304+ session .control (JOB_IDS_SESSION_ALL ,
305+ JobControlAction .TERMINATE )
306+ except InvalidJobException :
307+ self .logger .debug ("Could not kill all jobs for " +
308+ "session." , exc_info = True )
309+
310+ # Get rid of job info to prevent memory leak
300311 try :
301- session .control (JOB_IDS_SESSION_ALL ,
302- JobControlAction .TERMINATE )
303- except InvalidJobException :
304- self .logger .debug ("Could not kill all jobs for session." ,
305- exc_info = True )
312+ session .synchronize ([JOB_IDS_SESSION_ALL ], TIMEOUT_NO_WAIT ,
313+ dispose = True )
314+ except ExitTimeoutException :
315+ pass
306316
307317 def check (self , session_id , jobs ):
308318 """
@@ -316,9 +326,10 @@ def check(self, session_id, jobs):
316326 self .session_id = session_id
317327
318328 # determines in which interval to check if jobs are alive
329+ self .logger .debug ('Starting local hearbeat' )
319330 local_heart = multiprocessing .Process (target = _heart_beat ,
320331 args = (- 1 , self .home_address , - 1 ,
321- "" , CHECK_FREQUENCY ))
332+ "" , CHECK_FREQUENCY ))
322333 local_heart .start ()
323334 try :
324335 self .logger .debug ("Starting ZMQ event loop" )
@@ -432,6 +443,8 @@ def check_if_alive(self):
432443
433444 # could have been an exception, we check right away
434445 elif isinstance (job .ret , Exception ):
446+ job .cause_of_death = 'exception'
447+
435448 # Send error email, in addition to raising and logging exception
436449 send_error_mail (job )
437450
0 commit comments