Skip to content

Commit a881b22

Browse files
authored
Merge pull request #187 from PalNilsson/next
3.11.3.9
2 parents 8d7d1f9 + 958643f commit a881b22

File tree

16 files changed

+258
-38
lines changed

16 files changed

+258
-38
lines changed

PILOTVERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.11.2.19
1+
3.11.3.9

pilot.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,25 @@ def main() -> int: # noqa: C901
174174
logger.warning(f"failed to update local OIDC token: {exc}")
175175

176176
# create and report the worker node map
177-
if args.update_server and args.pilot_user.lower() == "atlas": # only send info for atlas for now
177+
# note: the worker node map will always be created, but only sent to the server
178+
# if the user plugin specifies it. For ATLAS there is a special case for Nordugrid (args.update_server == False)
179+
# in which the map is not sent to the PanDA server in this function, but later when the jobReport is uploaded
180+
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
181+
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)
182+
if user.allow_send_workernode_map():
178183
try:
179184
send_workernode_map(infosys.queuedata.site, infosys.queuedata.name, args.url, args.port, "IPv6", logger) # note: assuming IPv6, fallback in place
180185
except Exception as error:
181186
logger.warning(f"exception caught when sending workernode map: {error}")
182-
try:
183-
memory_limits = get_memory_limits(args.url, args.port)
184-
except Exception as error:
185-
logger.warning(f"exception caught when getting resource types: {error}")
186-
else:
187-
logger.debug(f"resource types: {memory_limits}")
188-
if memory_limits:
189-
pilot_cache.resource_types = memory_limits
187+
if args.update_server:
188+
try:
189+
memory_limits = get_memory_limits(args.url, args.port)
190+
except Exception as error:
191+
logger.warning(f"exception caught when getting resource types: {error}")
192+
else:
193+
logger.debug(f"resource types: {memory_limits}")
194+
if memory_limits:
195+
pilot_cache.resource_types = memory_limits
190196

191197
# handle special CRIC variables via params
192198
# internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version
@@ -539,26 +545,30 @@ def send_workernode_map(
539545
internet_protocol_version (str): Internet protocol version, IPv4 or IPv6.
540546
logger (Any): Logging object.
541547
"""
548+
# should the worker node map be sent to the server at this point or later when the job report is sent?
549+
send_now = True if args.update_server else False
550+
542551
# worker node structure to be sent to the server
543552
try:
544553
data = get_workernode_map(site, queue)
545554
except Exception as e:
546555
logger.warning(f"exception caught when calling get_workernode_map(): {e}")
547-
try:
548-
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
549-
except Exception as e:
550-
logger.warning(f"exception caught when sending worker node map to server: {e}")
556+
if send_now:
557+
try:
558+
send_update("api/v1/pilot/update_worker_node", data, url, port, ipv=internet_protocol_version, max_attempts=1)
559+
except Exception as e:
560+
logger.warning(f"exception caught when sending worker node map to server: {e}")
551561

552562
# GPU info
553563
try:
554564
data = get_workernode_gpu_map(site)
555565
except Exception as e:
556566
logger.warning(f"exception caught when calling get_workernode_gpu_map(): {e}")
557-
try:
558-
if data: # only send if data is not empty
567+
if send_now and data: # only send if data is not empty
568+
try:
559569
send_update("api/v1/pilot/update_worker_node_gpu", data, url, port, ipv=internet_protocol_version, max_attempts=1)
560-
except Exception as e:
561-
logger.warning(f"exception caught when sending worker node map to server: {e}")
570+
except Exception as e:
571+
logger.warning(f"exception caught when sending worker node map to server: {e}")
562572

563573

564574
def set_lifetime():

pilot/control/job.py

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,28 @@
2424

2525
"""Job module with functions for job handling."""
2626

27+
from __future__ import annotations
28+
2729
import os
2830
import time
2931
import hashlib
3032
import logging
3133
import queue
3234
import random
3335
from collections import namedtuple
34-
from json import dumps
36+
from json import (
37+
dumps,
38+
loads
39+
)
3540
from glob import glob
36-
from typing import Any
41+
from typing import (
42+
Any,
43+
Callable,
44+
Mapping,
45+
MutableMapping,
46+
Optional,
47+
Tuple
48+
)
3749
from urllib.parse import parse_qsl
3850

3951
from pilot.common.errorcodes import ErrorCodes
@@ -94,6 +106,7 @@
94106
find_text_files,
95107
get_total_input_size,
96108
is_json,
109+
read_json,
97110
remove,
98111
tail,
99112
write_file,
@@ -152,6 +165,9 @@
152165
update_modelstring
153166
)
154167

168+
JsonObject = MutableMapping[str, Any]
169+
ReadJsonFn = Callable[[str], Optional[Mapping[str, Any]]]
170+
155171
errors = ErrorCodes()
156172
logger = logging.getLogger(__name__)
157173
pilot_cache = get_pilot_cache()
@@ -2842,6 +2858,97 @@ def queue_monitor(queues: namedtuple, traces: Any, args: object): # noqa: C901
28422858
logger.info('[job] queue monitor thread has finished')
28432859

28442860

2861+
def load_metadata_dict(metadata: Optional[str]) -> JsonObject:
2862+
"""
2863+
Parse the metadata JSON string into a mutable dictionary.
2864+
2865+
Args:
2866+
metadata: Metadata as a JSON string (or None/empty).
2867+
2868+
Returns:
2869+
A mutable dictionary representation of the metadata. If `metadata` is
2870+
missing or invalid JSON, returns an empty dictionary.
2871+
"""
2872+
if not metadata:
2873+
return {}
2874+
2875+
try:
2876+
parsed = loads(metadata)
2877+
except Exception as error: # pragma: no cover (logger side-effect)
2878+
logger.warning(f"failed to convert metadata string to dictionary: {error}")
2879+
return {}
2880+
2881+
if not isinstance(parsed, dict):
2882+
logger.warning("metadata JSON is not an object; ignoring and starting fresh")
2883+
return {}
2884+
2885+
return parsed
2886+
2887+
2888+
def dump_metadata(metadata_dict: Mapping[str, Any]) -> Optional[str]:
2889+
"""
2890+
Serialize a metadata dictionary into a JSON string.
2891+
2892+
Args:
2893+
metadata_dict: Metadata dictionary to serialize.
2894+
2895+
Returns:
2896+
JSON string on success, otherwise None if serialization fails.
2897+
"""
2898+
try:
2899+
return dumps(metadata_dict)
2900+
except Exception as error: # pragma: no cover (logger side-effect)
2901+
logger.warning(f"failed to convert metadata dictionary to string: {error}")
2902+
return None
2903+
2904+
2905+
def merge_worker_maps(
2906+
metadata_dict: JsonObject,
2907+
pilot_home: str,
2908+
mappings: Tuple[Tuple[str, str], ...],
2909+
read_json: ReadJsonFn,
2910+
) -> bool:
2911+
"""
2912+
Merge worker node maps (worker + GPU) into the metadata dictionary.
2913+
2914+
Reads JSON files from `pilot_home` and merges them into `metadata_dict` under
2915+
the provided metadata keys. Only counts as a change if the new value differs
2916+
from the existing value.
2917+
2918+
Args:
2919+
metadata_dict: Metadata dictionary to update in-place.
2920+
pilot_home: Base directory where pilot map files are located.
2921+
mappings: Tuples of (filename, metadata_key) to read and merge.
2922+
read_json: Function that reads a JSON file and returns a mapping (or None).
2923+
2924+
Returns:
2925+
True if metadata_dict was modified, otherwise False.
2926+
"""
2927+
changed = False
2928+
2929+
for fname, meta_key in mappings:
2930+
path = os.path.join(pilot_home, fname)
2931+
if not os.path.exists(path):
2932+
continue
2933+
2934+
data = read_json(path)
2935+
if not data:
2936+
continue
2937+
2938+
# Ensure JSON-like (mapping) content.
2939+
if not isinstance(data, Mapping):
2940+
logger.warning(f"map file does not contain a JSON object: {path}")
2941+
continue
2942+
2943+
if metadata_dict.get(meta_key) != data:
2944+
metadata_dict[meta_key] = dict(data) # make it JSON-serializable/mutable
2945+
changed = True
2946+
2947+
logger.info(f"added {meta_key} to metadata from {path}")
2948+
2949+
return changed
2950+
2951+
28452952
def update_server(job: Any, args: Any) -> None:
28462953
"""
28472954
Update the server (wrapper for send_state() that also prepares the metadata).
@@ -2856,11 +2963,39 @@ def update_server(job: Any, args: Any) -> None:
28562963
# user specific actions
28572964
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
28582965
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)
2859-
metadata = user.get_metadata(job.workdir)
28602966
try:
28612967
user.update_server(job)
28622968
except Exception as error:
28632969
logger.warning('exception caught in update_server(): %s', error)
2970+
2971+
# the metadata can now be enhanced with the worker node map + GPU map for the case
2972+
# when the pilot is not sending the maps to the server directly. In this case, the maps
2973+
# are extracted on the server side at a later stage
2974+
# note: if the metadata does not exist, we should create it here
2975+
2976+
metadata: Optional[str] = user.get_metadata(job.workdir)
2977+
2978+
if not args.update_server:
2979+
pilot_home: str = os.environ.get("PILOT_HOME", "")
2980+
mappings: Tuple[Tuple[str, str], ...] = (
2981+
(config.Workernode.map, "worker_node"),
2982+
(config.Workernode.gpu_map, "worker_node_gpus"),
2983+
)
2984+
2985+
metadata_dict: JsonObject = load_metadata_dict(metadata)
2986+
changed: bool = merge_worker_maps(
2987+
metadata_dict=metadata_dict,
2988+
pilot_home=pilot_home,
2989+
mappings=mappings,
2990+
read_json=read_json,
2991+
)
2992+
2993+
# Only dump if something changed, OR if metadata was missing and we added something.
2994+
if changed:
2995+
new_metadata = dump_metadata(metadata_dict)
2996+
if new_metadata is not None:
2997+
metadata = new_metadata
2998+
28642999
if job.fileinfo:
28653000
send_state(job, args, job.state, xml=dumps(job.fileinfo), metadata=metadata)
28663001
else:

pilot/control/monitor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,6 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
199199

200200
if n_iterations % 60 == 0:
201201
logger.info(f"{time_since_start}s have passed since pilot start - server update state is \'{environ['SERVER_UPDATE']}\'")
202-
logger.debug(f"args.update_server={args.update_server}")
203202

204203
# every minute run the following check
205204
if is_pilot_check(check='machinefeatures'):

pilot/info/dataloader.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ def load_url_data(cls, url: str, fname: str = None, cache_time: int = 0, nretry:
9393
:param sleep_time: sleep time (default is 60 s) between retry attempts (int)
9494
:return: data loaded from the url or file content if url passed is a filename (Any).
9595
"""
96-
logger.debug(f'xxx loading data url={url}')
97-
9896
@timeout(seconds=20)
9997
def _readfile(url: str) -> str:
10098
"""

pilot/info/extinfo.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,10 @@ def jsonparser_panda(dat: Any) -> dict:
194194
}
195195
}
196196

197-
logger.debug(f'xxx sources={sources}')
198197
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
199198
user = __import__(f'pilot.user.{pilot_user}.setup', globals(), locals(), [pilot_user], 0)
200199
queuedata_source_priority = user.get_queuedata_priority()
201200
priority = priority or queuedata_source_priority
202-
logger.debug(f'queuedata priority={priority}')
203201

204202
return cls.load_data(sources, priority, cache_time)
205203

pilot/user/atlas/common.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from collections import defaultdict
3232
from functools import reduce
3333
from glob import glob
34+
from json import dumps
3435
from random import randint
3536
from signal import SIGTERM, SIGUSR1
3637
from typing import Any
@@ -2660,18 +2661,25 @@ def update_stagein(job: JobData):
26602661
fspec.status = 'no_transfer'
26612662

26622663

2663-
def get_metadata(workdir: str) -> dict or None:
2664+
def get_metadata(workdir: str) -> str:
26642665
"""
26652666
Return the metadata from file.
26662667
26672668
:param workdir: work directory (str)
2668-
:return: metadata (dict).
2669+
:return: metadata (str).
26692670
"""
26702671
path = os.path.join(workdir, config.Payload.jobreport)
2671-
metadata = read_file(path) if os.path.exists(path) else None
2672-
logger.debug(f'metadata={metadata}')
2672+
logger.info(f"reading metadata from: {path}")
2673+
metadata = read_json(path) if os.path.exists(path) else None
2674+
if not os.path.exists(path):
2675+
logger.warning(f'path does not exist: {path}')
2676+
return ""
2677+
if not metadata:
2678+
logger.warning('empty metadata')
2679+
return ""
26732680

2674-
return metadata
2681+
# convert dictionary to string
2682+
return dumps(metadata)
26752683

26762684

26772685
def should_update_logstash(frequency: int = 10) -> bool:
@@ -2850,3 +2858,12 @@ def get_pilot_id(jobid: str) -> str:
28502858
pass
28512859

28522860
return os.environ.get("GTAG", "unknown")
2861+
2862+
2863+
def allow_send_workernode_map() -> bool:
2864+
"""
2865+
Return True if the workernode map should be sent to the server.
2866+
2867+
:return: always True for ATLAS (bool).
2868+
"""
2869+
return True

pilot/user/darkside/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,3 +503,12 @@ def download_command(process: dict, workdir: str, base_urls: list) -> dict:
503503
process['command'] = './' + cmd
504504

505505
return process
506+
507+
508+
def allow_send_workernode_map() -> bool:
509+
"""
510+
Return True if the workernode map should be sent to the server.
511+
512+
:return: False unless requested (bool).
513+
"""
514+
return False

pilot/user/eic/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,3 +503,12 @@ def download_command(process: dict, workdir: str, base_urls: list) -> dict:
503503
process['command'] = './' + cmd
504504

505505
return process
506+
507+
508+
def allow_send_workernode_map() -> bool:
509+
"""
510+
Return True if the workernode map should be sent to the server.
511+
512+
:return: False unless requested (bool).
513+
"""
514+
return False

pilot/user/generic/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,3 +503,12 @@ def download_command(process: dict, workdir: str, base_urls: list) -> dict:
503503
process['command'] = './' + cmd
504504

505505
return process
506+
507+
508+
def allow_send_workernode_map() -> bool:
509+
"""
510+
Return True if the workernode map should be sent to the server.
511+
512+
:return: False unless requested (bool).
513+
"""
514+
return False

0 commit comments

Comments
 (0)