Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11.1.15
3.11.2.19
17 changes: 15 additions & 2 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def main() -> int: # noqa: C901
# create and report the worker node map
if args.update_server and args.pilot_user.lower() == "atlas": # only send info for atlas for now
try:
send_workernode_map(infosys.queuedata.site, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
send_workernode_map(infosys.queuedata.site, infosys.queuedata.name, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
except Exception as error:
logger.warning(f"exception caught when sending workernode map: {error}")
try:
Expand Down Expand Up @@ -500,8 +500,19 @@ def send_worker_status(
data["site"] = queue
data["node_id"] = get_node_name()

data_new = {}
data_new["worker_id"] = os.environ.get("HARVESTER_WORKER_ID", None)
data_new["harvester_id"] = os.environ.get("HARVESTER_ID", None)
data_new["status"] = status
# data_new["site"] = queue
data_new["node_id"] = get_node_name()

# attempt to send the worker info to the server
# if data_new["worker_id"] and data_new["harvester_id"]:
if data["workerID"] and data["harvesterID"]:
# send_update(
# "update_worker_status", data_new, url, port, ipv=internet_protocol_version, max_attempts=2
# )
send_update(
"updateWorkerPilotStatus", data, url, port, ipv=internet_protocol_version, max_attempts=2
)
Expand All @@ -511,6 +522,7 @@ def send_worker_status(

def send_workernode_map(
site: str,
queue: str,
url: str,
port: int,
internet_protocol_version: str,
Expand All @@ -521,14 +533,15 @@ def send_workernode_map(

Args:
site (str): ATLAS site name.
queue (str): PanDA queue name.
url (str): Server URL.
port (int): Server port.
internet_protocol_version (str): Internet protocol version, IPv4 or IPv6.
logger (Any): Logging object.
"""
# worker node structure to be sent to the server
try:
data = get_workernode_map(site)
data = get_workernode_map(site, queue)
except Exception as e:
logger.warning(f"exception caught when calling get_workernode_map(): {e}")
try:
Expand Down
8 changes: 6 additions & 2 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
get_batchsystem_jobid,
# get_display_info,
get_job_scheduler_id,
get_pilot_id,
get_pilot_state,
has_instruction_sets,
is_virtual_machine,
Expand Down Expand Up @@ -1110,7 +1111,8 @@ def remove_pilot_logs_from_list(list_of_files: list, jobid: str) -> list:
config.Container.container_script, config.Container.release_setup,
config.Container.stagein_status_dictionary, config.Container.stagein_replica_dictionary,
'eventLoopHeartBeat.txt', 'memory_monitor_output.txt', 'memory_monitor_summary.json_snapshot',
f'curl_updateJob_{jobid}.config', config.Pilot.pilot_heartbeat_file]
f'curl_updateJob_{jobid}.config', config.Pilot.pilot_heartbeat_file,
'./panda_token', 'panda_token']
except Exception as error:
logger.warning(f'exception caught: {error}')
to_be_removed = []
Expand Down Expand Up @@ -2369,7 +2371,9 @@ def retrieve(queues: namedtuple, traces: Any, args: object): # noqa: C901
# (only proceed if there is a condor class ad)
if os.environ.get('_CONDOR_JOB_AD', None):
htcondor_envvar(job.jobid)
update_condor_classad(pandaid=job.jobid, state='retrieved')
# update_condor_classad(pandaid=job.jobid, state='retrieved')
pilotid = get_pilot_id(args.version_tag)
update_condor_classad(pandaid=job.jobid, pilotid=pilotid)

# add the job definition to the jobs queue and increase the job counter,
# and wait until the job has finished
Expand Down
2 changes: 2 additions & 0 deletions pilot/info/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def load_url_data(cls, url: str, fname: str = None, cache_time: int = 0, nretry:
:param sleep_time: sleep time (default is 60 s) between retry attempts (int)
:return: data loaded from the url or file content if url passed is a filename (Any).
"""
logger.debug(f'xxx loading data url={url}')

@timeout(seconds=20)
def _readfile(url: str) -> str:
"""
Expand Down
5 changes: 4 additions & 1 deletion pilot/info/extinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ def jsonparser_panda(dat: Any) -> dict:

return {pandaqueue: _dat}

queuedata_url = (os.environ.get('QUEUEDATA_SERVER_URL') or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})
_url = os.environ.get('QUEUEDATA_SERVER_URL')
queuedata_url = (_url or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})
_inf = getattr(config.Information, 'queuedata_url', '')
cric_url = getattr(config.Information, 'queues_url', None)
cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
cvmfs_path = cls.get_cvmfs_path(getattr(config.Information, 'queuedata_cvmfs', None), 'cric_pandaqueues.json')
Expand Down Expand Up @@ -192,6 +194,7 @@ def jsonparser_panda(dat: Any) -> dict:
}
}

logger.debug(f'xxx sources={sources}')
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__(f'pilot.user.{pilot_user}.setup', globals(), locals(), [pilot_user], 0)
queuedata_source_priority = user.get_queuedata_priority()
Expand Down
6 changes: 4 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class JobData(BaseData):
pandasecrets = "" # User defined secrets
pilotsecrets = {} # Real-time logging secrets
requestid = None # Request ID
resourcetype = None # resource type (SCORE, MCORE, etc)

# set by the pilot (not from job definition)
workdir = "" # working directory for this job
Expand Down Expand Up @@ -199,7 +200,7 @@ class JobData(BaseData):
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform',
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'],
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus', 'resourcetype'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses',
'logdata', 'outdata', 'indata', 'cpufrequencies'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
Expand Down Expand Up @@ -553,7 +554,8 @@ def load(self, data: dict, use_kmap: bool = True):
'dask_scheduler_ip': 'scheduler_ip',
'jupyter_session_ip': 'session_ip',
'minramcount': 'minRamCount',
'altstageout': 'altStageOut'
'altstageout': 'altStageOut',
'resourcetype': 'resource_type'
} if use_kmap else {}

self._load_data(data, kmap)
Expand Down
16 changes: 14 additions & 2 deletions pilot/info/queuedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class QueueData(BaseData):
altstageout = None # allow altstageout: force (True) or disable (False) or no preferences (None)
pilot_walltime_grace = 1.0 # pilot walltime grace factor
pilot_rss_grace = 2.0 # pilot rss grace factor
pilot_maxwdir_grace = 1.0 # pilot maxwdir grace factor

# specify the type of attributes for proper data validation and casting
_keys = {int: ['timefloor', 'maxwdir', 'pledgedcpu', 'es_stageout_gap',
Expand Down Expand Up @@ -189,7 +190,8 @@ def allow_altstageout(self):
def set_pilot_walltime_grace(self):
"""Set pilot walltime grace factor based on the queuedata settings."""
try:
_pilot_walltime_grace = float(self.params.get('pilot_walltime_grace', 0))
# using a 1% grace by default, which corresponds to 14 minutes for a 24-hour limit
_pilot_walltime_grace = float(self.params.get('pilot_walltime_grace', 1))
self.pilot_walltime_grace = 1.0 + _pilot_walltime_grace / 100.0
except (ValueError, TypeError) as e:
logger.warning(f"failed to set pilot_walltime_grace: {e}")
Expand All @@ -204,6 +206,15 @@ def set_pilot_rss_grace(self):
logger.warning(f"failed to set pilot_rss_grace: {e}")
self.pilot_rss_grace = 2.0

def set_pilot_maxwdir_grace(self):
"""Set pilot maxwdir grace factor based on the queuedata settings."""
try:
_pilot_maxwdir_grace = float(self.params.get('pilot_maxwdir_grace', 20))
self.pilot_maxwdir_grace = 1.0 + _pilot_maxwdir_grace / 100.0
except (ValueError, TypeError) as e:
logger.warning(f"failed to set pilot_maxwdir_grace: {e}")
self.pilot_maxwdir_grace = 1.0

def clean(self):
"""Validate and finally clean up required data values (required object properties) if needed."""
# validate es_stageout_gap value
Expand Down Expand Up @@ -233,9 +244,10 @@ def clean(self):
# set altstageout settings
self.altstageout = self.allow_altstageout()

# set pilot walltime and rss grace factors
# set pilot grace factors
self.set_pilot_walltime_grace()
self.set_pilot_rss_grace()
self.set_pilot_maxwdir_grace()

## custom function pattern to apply extra validation to the key values
##def clean__keyname(self, raw, value):
Expand Down
56 changes: 55 additions & 1 deletion pilot/scripts/open_remote_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def get_file_lists(turls_string: str) -> dict:


# pylint: disable=useless-param-doc
def try_open_file(turl_str: str, _queues: namedtuple):
def try_open_file_old(turl_str: str, _queues: namedtuple):
"""
Attempt to open a remote file.

Expand Down Expand Up @@ -153,6 +153,60 @@ def try_open_file(turl_str: str, _queues: namedtuple):
_queues.result.put(turl_str)


# pylint: disable=useless-param-doc
def try_open_file(turl_str: str, _queues: namedtuple):
"""
Attempt to open a remote file.

Successfully opened turls will be put in the queues.opened queue.
Unsuccessful turls will be placed in the queues.unopened queue.

:param turl_str: turl (str)
:param _queues: Namedtuple with 'opened', 'unopened', 'result' queues.
"""

def attempt_open(path: str) -> bool:
"""Return True if ROOT successfully opens the file."""
try:
message(f'opening {path}')
_ = ROOT.TFile.SetOpenTimeout(
30 * 1000) # 30 seconds
in_file = ROOT.TFile.Open(path)
except Exception as exc:
message(f'caught exception: {exc}')
return False

if in_file and in_file.IsOpen():
in_file.Close()
message(f'closed {path}')
return True

return False

# --- First attempt (original TURL) ---
opened = attempt_open(turl_str)

# --- Retry logic for davs failures ---
if not opened:
# We only retry if the failure looks like the DAVS issue
retry_path = turl_str + "?filetype=raw"
message("Retrying with ?filetype=raw appended")

opened = attempt_open(retry_path)

# If the retry succeeds, report success with the modified TURL
if opened:
turl_str = retry_path

# --- Queue results ---
if opened:
_queues.opened.put(turl_str)
else:
_queues.unopened.put(turl_str)

_queues.result.put(turl_str)


# pylint: disable=useless-param-doc
def spawn_file_open_thread(_queues: Any, file_list: list) -> threading.Thread:
"""
Expand Down
21 changes: 19 additions & 2 deletions pilot/util/auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from pilot.common.errorcodes import ErrorCodes
from pilot.util.condor import (
get_globaljobid,
update_condor_classad
# update_condor_classad
)
from pilot.util.container import execute
from pilot.util.filehandling import (
Expand Down Expand Up @@ -74,6 +74,23 @@ def pilot_version_banner() -> None:
logger.info('*' * len(version))


def get_pilot_id(version_tag: str) -> str:
"""
Return a unique pilot id.

Used by CondorHT ClassAd.

Args:
version_tag: pilot version tag (string).

Returns:
pilot id (string).
"""
unique_id = os.environ.get("GTAG", "unknown")
pilotversion = os.environ.get('PILOT_VERSION')
return f'{pilotversion}-{version_tag}-{unique_id}'


def is_virtual_machine() -> bool:
"""
Determine if we are running in a virtual machine.
Expand Down Expand Up @@ -346,7 +363,7 @@ def set_pilot_state(job: Any = None, state: str = '') -> None:

if job and job.state != 'failed':
job.state = state
update_condor_classad(state=state)
# update_condor_classad(state=state)


def check_for_final_server_update(update_server: bool) -> None:
Expand Down
Loading