Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11.2.19
3.11.3.9
44 changes: 27 additions & 17 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,25 @@ def main() -> int: # noqa: C901
logger.warning(f"failed to update local OIDC token: {exc}")

# create and report the worker node map
if args.update_server and args.pilot_user.lower() == "atlas": # only send info for atlas for now
# note: the worker node map will always be created, but only sent to the server
# if the user plugin specifies it. For ATLAS there is a special case for Nordugrid (args.update_server == False)
# in which the map is not sent to the PanDA server in this function, but later when the jobReport is uploaded
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)
if user.allow_send_workernode_map():
try:
send_workernode_map(infosys.queuedata.site, infosys.queuedata.name, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
except Exception as error:
logger.warning(f"exception caught when sending workernode map: {error}")
try:
memory_limits = get_memory_limits(args.url, args.port)
except Exception as error:
logger.warning(f"exception caught when getting resource types: {error}")
else:
logger.debug(f"resource types: {memory_limits}")
if memory_limits:
pilot_cache.resource_types = memory_limits
if args.update_server:
try:
memory_limits = get_memory_limits(args.url, args.port)
except Exception as error:
logger.warning(f"exception caught when getting resource types: {error}")
else:
logger.debug(f"resource types: {memory_limits}")
if memory_limits:
pilot_cache.resource_types = memory_limits

# handle special CRIC variables via params
# internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version
Expand Down Expand Up @@ -539,26 +545,30 @@ def send_workernode_map(
internet_protocol_version (str): Internet protocol version, IPv4 or IPv6.
logger (Any): Logging object.
"""
# should the worker node map be sent to the server at this point or later when the job report is sent?
send_now = True if args.update_server else False

# worker node structure to be sent to the server
try:
data = get_workernode_map(site, queue)
except Exception as e:
logger.warning(f"exception caught when calling get_workernode_map(): {e}")
try:
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
except Exception as e:
logger.warning(f"exception caught when sending worker node map to server: {e}")
if send_now:
try:
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
except Exception as e:
logger.warning(f"exception caught when sending worker node map to server: {e}")

# GPU info
try:
data = get_workernode_gpu_map(site)
except Exception as e:
logger.warning(f"exception caught when calling get_workernode_gpu_map(): {e}")
try:
if data: # only send if data is not empty
if send_now and data: # only send if data is not empty
try:
send_update("api/v1/pilot/update_worker_node_gpu", data, url, port, ipv=internet_protocol_version, max_attempts=1)
except Exception as e:
logger.warning(f"exception caught when sending worker node map to server: {e}")
except Exception as e:
logger.warning(f"exception caught when sending worker node map to server: {e}")


def set_lifetime():
Expand Down
141 changes: 138 additions & 3 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,28 @@

"""Job module with functions for job handling."""

from __future__ import annotations

import os
import time
import hashlib
import logging
import queue
import random
from collections import namedtuple
from json import dumps
from json import (
dumps,
loads
)
from glob import glob
from typing import Any
from typing import (
Any,
Callable,
Mapping,
MutableMapping,
Optional,
Tuple
)
from urllib.parse import parse_qsl

from pilot.common.errorcodes import ErrorCodes
Expand Down Expand Up @@ -94,6 +106,7 @@
find_text_files,
get_total_input_size,
is_json,
read_json,
remove,
tail,
write_file,
Expand Down Expand Up @@ -152,6 +165,9 @@
update_modelstring
)

JsonObject = MutableMapping[str, Any]
ReadJsonFn = Callable[[str], Optional[Mapping[str, Any]]]

errors = ErrorCodes()
logger = logging.getLogger(__name__)
pilot_cache = get_pilot_cache()
Expand Down Expand Up @@ -2842,6 +2858,97 @@ def queue_monitor(queues: namedtuple, traces: Any, args: object): # noqa: C901
logger.info('[job] queue monitor thread has finished')


def load_metadata_dict(metadata: Optional[str]) -> JsonObject:
"""
Parse the metadata JSON string into a mutable dictionary.

Args:
metadata: Metadata as a JSON string (or None/empty).

Returns:
A mutable dictionary representation of the metadata. If `metadata` is
missing or invalid JSON, returns an empty dictionary.
"""
if not metadata:
return {}

try:
parsed = loads(metadata)
except Exception as error: # pragma: no cover (logger side-effect)
logger.warning(f"failed to convert metadata string to dictionary: {error}")
return {}

if not isinstance(parsed, dict):
logger.warning("metadata JSON is not an object; ignoring and starting fresh")
return {}

return parsed


def dump_metadata(metadata_dict: Mapping[str, Any]) -> Optional[str]:
"""
Serialize a metadata dictionary into a JSON string.

Args:
metadata_dict: Metadata dictionary to serialize.

Returns:
JSON string on success, otherwise None if serialization fails.
"""
try:
return dumps(metadata_dict)
except Exception as error: # pragma: no cover (logger side-effect)
logger.warning(f"failed to convert metadata dictionary to string: {error}")
return None


def merge_worker_maps(
metadata_dict: JsonObject,
pilot_home: str,
mappings: Tuple[Tuple[str, str], ...],
read_json: ReadJsonFn,
) -> bool:
"""
Merge worker node maps (worker + GPU) into the metadata dictionary.

Reads JSON files from `pilot_home` and merges them into `metadata_dict` under
the provided metadata keys. Only counts as a change if the new value differs
from the existing value.

Args:
metadata_dict: Metadata dictionary to update in-place.
pilot_home: Base directory where pilot map files are located.
mappings: Tuples of (filename, metadata_key) to read and merge.
read_json: Function that reads a JSON file and returns a mapping (or None).

Returns:
True if metadata_dict was modified, otherwise False.
"""
changed = False

for fname, meta_key in mappings:
path = os.path.join(pilot_home, fname)
if not os.path.exists(path):
continue

data = read_json(path)
if not data:
continue

# Ensure JSON-like (mapping) content.
if not isinstance(data, Mapping):
logger.warning(f"map file does not contain a JSON object: {path}")
continue

if metadata_dict.get(meta_key) != data:
metadata_dict[meta_key] = dict(data) # make it JSON-serializable/mutable
changed = True

logger.info(f"added {meta_key} to metadata from {path}")

return changed


def update_server(job: Any, args: Any) -> None:
"""
Update the server (wrapper for send_state() that also prepares the metadata).
Expand All @@ -2856,11 +2963,39 @@ def update_server(job: Any, args: Any) -> None:
# user specific actions
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)
metadata = user.get_metadata(job.workdir)
try:
user.update_server(job)
except Exception as error:
logger.warning('exception caught in update_server(): %s', error)

# the metadata can now be enhanced with the worker node map + GPU map for the case
# when the pilot is not sending the maps to the server directly. In this case, the maps
# are extracted on the server side at a later stage
# note: if the metadata does not exist, we should create it here

metadata: Optional[str] = user.get_metadata(job.workdir)

if not args.update_server:
pilot_home: str = os.environ.get("PILOT_HOME", "")
mappings: Tuple[Tuple[str, str], ...] = (
(config.Workernode.map, "worker_node"),
(config.Workernode.gpu_map, "worker_node_gpus"),
)

metadata_dict: JsonObject = load_metadata_dict(metadata)
changed: bool = merge_worker_maps(
metadata_dict=metadata_dict,
pilot_home=pilot_home,
mappings=mappings,
read_json=read_json,
)

# Only dump if something changed, OR if metadata was missing and we added something.
if changed:
new_metadata = dump_metadata(metadata_dict)
if new_metadata is not None:
metadata = new_metadata

if job.fileinfo:
send_state(job, args, job.state, xml=dumps(job.fileinfo), metadata=metadata)
else:
Expand Down
1 change: 0 additions & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901

if n_iterations % 60 == 0:
logger.info(f"{time_since_start}s have passed since pilot start - server update state is \'{environ['SERVER_UPDATE']}\'")
logger.debug(f"args.update_server={args.update_server}")

# every minute run the following check
if is_pilot_check(check='machinefeatures'):
Expand Down
2 changes: 0 additions & 2 deletions pilot/info/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ def load_url_data(cls, url: str, fname: str = None, cache_time: int = 0, nretry:
:param sleep_time: sleep time (default is 60 s) between retry attempts (int)
:return: data loaded from the url or file content if url passed is a filename (Any).
"""
logger.debug(f'xxx loading data url={url}')

@timeout(seconds=20)
def _readfile(url: str) -> str:
"""
Expand Down
2 changes: 0 additions & 2 deletions pilot/info/extinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,10 @@ def jsonparser_panda(dat: Any) -> dict:
}
}

logger.debug(f'xxx sources={sources}')
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__(f'pilot.user.{pilot_user}.setup', globals(), locals(), [pilot_user], 0)
queuedata_source_priority = user.get_queuedata_priority()
priority = priority or queuedata_source_priority
logger.debug(f'queuedata priority={priority}')

return cls.load_data(sources, priority, cache_time)

Expand Down
27 changes: 22 additions & 5 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from collections import defaultdict
from functools import reduce
from glob import glob
from json import dumps
from random import randint
from signal import SIGTERM, SIGUSR1
from typing import Any
Expand Down Expand Up @@ -2660,18 +2661,25 @@ def update_stagein(job: JobData):
fspec.status = 'no_transfer'


def get_metadata(workdir: str) -> dict or None:
def get_metadata(workdir: str) -> str:
"""
Return the metadata from file.

:param workdir: work directory (str)
:return: metadata (dict).
:return: metadata (str).
"""
path = os.path.join(workdir, config.Payload.jobreport)
metadata = read_file(path) if os.path.exists(path) else None
logger.debug(f'metadata={metadata}')
logger.info(f"reading metadata from: {path}")
metadata = read_json(path) if os.path.exists(path) else None
if not os.path.exists(path):
logger.warning(f'path does not exist: {path}')
return ""
if not metadata:
logger.warning('empty metadata')
return ""

return metadata
# convert dictionary to string
return dumps(metadata)


def should_update_logstash(frequency: int = 10) -> bool:
Expand Down Expand Up @@ -2850,3 +2858,12 @@ def get_pilot_id(jobid: str) -> str:
pass

return os.environ.get("GTAG", "unknown")


def allow_send_workernode_map() -> bool:
"""
Return True if the workernode map should be sent to the server.

:return: always True for ATLAS (bool).
"""
return True
9 changes: 9 additions & 0 deletions pilot/user/darkside/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,12 @@ def download_command(process: dict, workdir: str, base_urls: list) -> dict:
process['command'] = './' + cmd

return process


def allow_send_workernode_map() -> bool:
"""
Return True if the workernode map should be sent to the server.

:return: False unless requested (bool).
"""
return False
9 changes: 9 additions & 0 deletions pilot/user/eic/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,12 @@ def download_command(process: dict, workdir: str, base_urls: list) -> dict:
process['command'] = './' + cmd

return process


def allow_send_workernode_map() -> bool:
"""
Return True if the workernode map should be sent to the server.

:return: False unless requested (bool).
"""
return False
9 changes: 9 additions & 0 deletions pilot/user/generic/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,12 @@ def download_command(process: dict, workdir: str, base_urls: list) -> dict:
process['command'] = './' + cmd

return process


def allow_send_workernode_map() -> bool:
"""
Return True if the workernode map should be sent to the server.

:return: False unless requested (bool).
"""
return False
Loading