diff --git a/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py b/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py index 3037ab1702..944055ad26 100644 --- a/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py +++ b/dev-tools/airavata-python-sdk/airavata_auth/device_auth.py @@ -10,6 +10,13 @@ class AuthContext: + + @staticmethod + def get_access_token(): + if os.environ.get("CS_ACCESS_TOKEN", None) is None: + context = AuthContext() + context.login() + return os.environ["CS_ACCESS_TOKEN"] def __init__(self): self.settings = Settings() @@ -21,6 +28,8 @@ def __init__(self): self.console = Console() def login(self): + if os.environ.get('CS_ACCESS_TOKEN', None) is not None: + return # Step 1: Request device and user code auth_device_url = f"{self.settings.AUTH_SERVER_URL}/realms/{self.settings.AUTH_REALM}/protocol/openid-connect/auth/device" response = requests.post(auth_device_url, data={ diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py b/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py index dd391c07c4..bb5e33a0dd 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/__init__.py @@ -18,8 +18,9 @@ from . import base, plan from airavata_auth.device_auth import AuthContext -from .runtime import list_runtimes, Runtime +from .runtime import find_runtimes, Runtime from typing import Any +from . import md, neuro context = AuthContext() @@ -27,7 +28,7 @@ def login(): context.login() -__all__ = ["list_runtimes", "base", "plan", "login"] +__all__ = ["find_runtimes", "base", "plan", "login", "md", "neuro"] def display_runtimes(runtimes: list[Runtime]) -> None: diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py index e6d7b10385..f5ca757de1 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py @@ -35,7 +35,9 @@ from airavata.model.experiment.ttypes import ExperimentModel, ExperimentType, UserConfigurationDataModel from airavata.model.scheduling.ttypes import ComputationalResourceSchedulingModel from airavata.model.data.replica.ttypes import DataProductModel, DataProductType, DataReplicaLocationModel, ReplicaLocationCategory -from airavata.model.appcatalog.groupresourceprofile.ttypes import GroupResourceProfile +from airavata.model.appcatalog.groupresourceprofile.ttypes import GroupResourceProfile, ResourceType +from airavata.model.appcatalog.computeresource.ttypes import ComputeResourceDescription +from airavata.model.status.ttypes import JobStatus, JobState, ExperimentStatus, ExperimentState warnings.filterwarnings("ignore", category=DeprecationWarning) logger = logging.getLogger("airavata_sdk.clients") @@ -100,8 +102,10 @@ def create_experiment_model( ) def get_resource_host_id(self, resource_name): - resources: dict = self.api_server_client.get_all_compute_resource_names(self.airavata_token) # type: ignore - return next((str(k) for k, v in resources.items() if v == resource_name)) + resources = self.api_server_client.get_all_compute_resource_names(self.airavata_token) + resource_id = next((k for k in resources if k.startswith(resource_name)), None) + assert resource_id is not None, f"Compute resource {resource_name} not found" + return resource_id def configure_computation_resource_scheduling( self, @@ -188,7 +192,7 @@ def get_process_id(self, experiment_id: str) -> str: """ tree = self.api_server_client.get_detailed_experiment_tree(self.airavata_token, experiment_id) # type: ignore processModels = tree.processes - assert processModels is not None + assert processModels is not None, f"No process models found for experiment {experiment_id}" assert len(processModels) == 1, f"Expected 1 process model, got {len(processModels)}" return processModels[0].processId @@ -213,7 +217,8 @@ def get_preferred_storage(self, gateway_id: str | None = None, sr_hostname: str sr_hostname = sr_hostname or self.default_sr_hostname() # logic sr_names: dict[str, str] = self.api_server_client.get_all_storage_resource_names(self.airavata_token) # type: ignore - sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname)) + sr_id = next((str(k) for k, v in sr_names.items() if v == sr_hostname), None) + assert sr_id is not None, f"Storage resource {sr_hostname} not found" return self.api_server_client.get_gateway_storage_preference(self.airavata_token, gateway_id, sr_id) def get_storage(self, storage_name: str | None = None) -> any: # type: ignore @@ -225,7 +230,8 @@ def get_storage(self, storage_name: str | None = None) -> any: # type: ignore storage_name = storage_name or self.default_sr_hostname() # logic sr_names: dict[str, str] = self.api_server_client.get_all_storage_resource_names(self.airavata_token) # type: ignore - sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name)) + sr_id = next((str(k) for k, v in sr_names.items() if v == storage_name), None) + assert sr_id is not None, f"Storage resource {storage_name} not found" storage = self.api_server_client.get_storage_resource(self.airavata_token, sr_id) return storage @@ -236,11 +242,9 @@ def get_group_resource_profile_id(self, group: str) -> str: """ # logic grps: list[GroupResourceProfile] = self.api_server_client.get_group_resource_list(self.airavata_token, self.default_gateway_id()) # type: ignore - try: - grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group)) - return str(grp_id) - except StopIteration: - raise Exception(f"Group resource profile {group} not found") + grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group), None) + assert grp_id is not None, f"Group resource profile {group} not found" + return str(grp_id) def get_group_resource_profile(self, group_id: str): grp = self.api_server_client.get_group_resource_profile(self.airavata_token, group_id) # type: ignore @@ -253,7 +257,8 @@ def get_compatible_deployments(self, app_interface_id: str, group: str): """ # logic grps: list = self.api_server_client.get_group_resource_list(self.airavata_token, self.default_gateway_id()) # type: ignore - grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group)) + grp_id = next((grp.groupResourceProfileId for grp in grps if grp.groupResourceProfileName == group), None) + assert grp_id is not None, f"Group resource profile {group} not found" deployments = self.api_server_client.get_application_deployments_for_app_module_and_group_resource_profile(self.airavata_token, app_interface_id, grp_id) return deployments @@ -264,13 +269,15 @@ def get_app_interface_id(self, app_name: str, gateway_id: str | None = None): """ gateway_id = str(gateway_id or self.default_gateway_id()) apps: list = self.api_server_client.get_all_application_interfaces(self.airavata_token, gateway_id) # type: ignore - app_id = next((app.applicationInterfaceId for app in apps if app.applicationName == app_name)) + app_id = next((app.applicationInterfaceId for app in apps if app.applicationName == app_name), None) + assert app_id is not None, f"Application interface {app_name} not found" return str(app_id) def get_project_id(self, project_name: str, gateway_id: str | None = None): gateway_id = str(gateway_id or self.default_gateway_id()) projects: list = self.api_server_client.get_user_projects(self.airavata_token, gateway_id, self.user_id, 10, 0) # type: ignore - project_id = next((p.projectID for p in projects if p.name == project_name and p.owner == self.user_id)) + project_id = next((p.projectID for p in projects if p.name == project_name and p.owner == self.user_id), None) + assert project_id is not None, f"Project {project_name} not found" return str(project_id) def get_application_inputs(self, app_interface_id: str) -> list: @@ -323,14 +330,14 @@ def upload_files(self, process_id: str | None, agent_ref: str | None, sr_host: s elif process_id is not None and agent_ref is not None: assert len(local_files) == 1, f"Expected 1 file, got {len(local_files)}" file = local_files[0] - fp = os.path.join("/data", file.name) + fp = os.path.join(".", file.name) rawdata = file.read_bytes() b64data = base64.b64encode(rawdata).decode() res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, "envName": agent_ref, "workingDir": ".", - "arguments": ["sh", "-c", f"echo {b64data} | base64 -d > {fp}"] + "arguments": [f"echo {b64data} | base64 -d > {fp}"] }) data = res.json() if data["error"] is not None: @@ -372,7 +379,7 @@ def list_files(self, process_id: str, agent_ref: str, sr_host: str, remote_dir: "agentId": agent_ref, "envName": agent_ref, "workingDir": ".", - "arguments": ["sh", "-c", r"find /data -type d -name 'venv' -prune -o -type f -printf '%P\n' | sort"] + "arguments": [r"find . -type f -printf '%P\n' | sort"] }) data = res.json() if data["error"] is not None: @@ -405,12 +412,12 @@ def download_file(self, process_id: str, agent_ref: str, sr_host: str, remote_fi """ import os - fp = os.path.join("/data", remote_file) + fp = os.path.join(".", remote_file) res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, "envName": agent_ref, "workingDir": ".", - "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] + "arguments": [f"cat {fp} | base64 -w0"] }) data = res.json() if data["error"] is not None: @@ -441,7 +448,36 @@ def download_file(self, process_id: str, agent_ref: str, sr_host: str, remote_fi assert process_id is not None, f"Expected process_id, got {process_id}" url_path = os.path.join(process_id, remote_file) filemgr_svc_download_url = f"{self.filemgr_svc_url()}/download/live/{url_path}" - + + def execute_cmd(self, agent_ref: str, cmd: str) -> bytes: + """ + Execute a command on a remote directory of a storage resource + TODO add data_svc fallback + + Return Path: /{project_name}/{experiment_name} + + """ + res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ + "agentId": agent_ref, + "envName": agent_ref, + "workingDir": ".", + "arguments": [f"{cmd} | base64 -w0"] + }) + data = res.json() + if data["error"] is not None: + raise Exception(data["error"]) + else: + exc_id = data["executionId"] + while True: + res = requests.get(f"{self.connection_svc_url()}/agent/execute/shell/{exc_id}") + data = res.json() + if data["executed"]: + content = data["responseString"] + import base64 + content = base64.b64decode(content) + return content + time.sleep(1) + def cat_file(self, process_id: str, agent_ref: str, sr_host: str, remote_file: str, remote_dir: str) -> bytes: """ Download files from a remote directory of a storage resource to a local directory @@ -451,12 +487,12 @@ def cat_file(self, process_id: str, agent_ref: str, sr_host: str, remote_file: s """ import os - fp = os.path.join("/data", remote_file) + fp = os.path.join(".", remote_file) res = requests.post(f"{self.connection_svc_url()}/agent/execute/shell", json={ "agentId": agent_ref, "envName": agent_ref, "workingDir": ".", - "arguments": ["sh", "-c", f"cat {fp} | base64 -w0"] + "arguments": [f"cat {fp} | base64 -w0"] }) data = res.json() if data["error"] is not None: @@ -596,8 +632,8 @@ def launch_experiment( def register_input_file(file: Path) -> str: return str(self.register_input_file(file.name, sr_host, sr_id, gateway_id, file.name, abs_path)) - # set up file inputs - print("[AV] Setting up file inputs...") + # set up experiment inputs + print("[AV] Setting up experiment inputs...") files_to_upload = list[Path]() file_refs = dict[str, str | list[str]]() for key, value in file_inputs.items(): @@ -610,11 +646,9 @@ def register_input_file(file: Path) -> str: file_refs[key] = [*map(register_input_file, value)] else: raise ValueError("Invalid file input type") - - # configure experiment inputs experiment_inputs = [] for exp_input in self.api_server_client.get_application_inputs(self.airavata_token, app_interface_id): # type: ignore - assert exp_input.type is not None + assert exp_input.type is not None, f"Invalid exp_input type for {exp_input.name}: {exp_input.type}" if exp_input.type < 3 and exp_input.name in data_inputs: value = data_inputs[exp_input.name] if exp_input.type == 0: @@ -623,11 +657,12 @@ def register_input_file(file: Path) -> str: exp_input.value = repr(value) elif exp_input.type == 3 and exp_input.name in file_refs: ref = file_refs[exp_input.name] - assert isinstance(ref, str) + assert isinstance(ref, str), f"Invalid file ref: {ref}" exp_input.value = ref elif exp_input.type == 4 and exp_input.name in file_refs: exp_input.value = ','.join(file_refs[exp_input.name]) experiment_inputs.append(exp_input) + print(f"[AV] * {exp_input.name}={exp_input.value}") experiment.experimentInputs = experiment_inputs # configure experiment outputs @@ -670,14 +705,15 @@ def register_input_file(file: Path) -> str: # wait until task begins, then get job id print(f"[AV] Experiment {experiment_name} WAITING until task begins...") job_id = job_state = None - while job_state is None: + while job_id in [None, "N/A"]: try: job_id, job_state = self.get_task_status(ex_id) except: time.sleep(2) else: time.sleep(2) - print(f"[AV] Experiment {experiment_name} - Task {job_state} with id: {job_id}") + assert job_state is not None, f"Job state is None for job id: {job_id}" + print(f"[AV] Experiment {experiment_name} - Task {job_state.name} with id: {job_id}") return LaunchState( experiment_id=ex_id, @@ -688,14 +724,12 @@ def register_input_file(file: Path) -> str: sr_host=storage.hostName, ) - def get_experiment_status(self, experiment_id: str) -> Literal['CREATED', 'VALIDATED', 'SCHEDULED', 'LAUNCHED', 'EXECUTING', 'CANCELING', 'CANCELED', 'COMPLETED', 'FAILED']: - states = ["CREATED", "VALIDATED", "SCHEDULED", "LAUNCHED", "EXECUTING", "CANCELING", "CANCELED", "COMPLETED", "FAILED"] + def get_experiment_status(self, experiment_id: str) -> ExperimentState: status = self.api_server_client.get_experiment_status(self.airavata_token, experiment_id) - state = status.state.name - if state in states: - return state - else: - return "FAILED" + if status is None: + return ExperimentState.CREATED + assert isinstance(status, ExperimentStatus) + return status.state def stop_experiment(self, experiment_id: str): status = self.api_server_client.terminate_experiment( @@ -781,23 +815,48 @@ def execute_py(self, project: str, libraries: list[str], code: str, agent_id: st print(f"[av] Remote execution failed! {e}") return None - def get_available_runtimes(self): + def get_available_groups(self, gateway_id: str = "default"): + grps: list[GroupResourceProfile] = self.api_server_client.get_group_resource_list(self.airavata_token, gatewayId=gateway_id) + return grps + + def get_available_runtimes(self, group: str, gateway_id: str = "default"): + grps = self.get_available_groups(gateway_id) + grp_id, gcr_prefs, gcr_policies = next(((x.groupResourceProfileId, x.computePreferences, x.computeResourcePolicies) for x in grps if str(x.groupResourceProfileName).strip() == group.strip()), (None, None, None)) + assert grp_id is not None, f"Group {group} was not found" + assert gcr_prefs is not None, f"Compute preferences for group={grp_id} were not found" + assert gcr_policies is not None, f"Compute policies for group={grp_id} were not found" # type: ignore from .runtime import Remote - return [ - Remote(cluster="login.expanse.sdsc.edu", category="gpu", queue_name="gpu-shared", node_count=1, cpu_count=10, gpu_count=1, walltime=30, group="Default"), - Remote(cluster="login.expanse.sdsc.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=10, gpu_count=0, walltime=30, group="Default"), - Remote(cluster="anvil.rcac.purdue.edu", category="cpu", queue_name="shared", node_count=1, cpu_count=24, gpu_count=0, walltime=30, group="Default"), - ] + runtimes = [] + for pref in gcr_prefs: + cr = self.api_server_client.get_compute_resource(self.airavata_token, pref.computeResourceId) + assert cr is not None, "Compute resource not found" + assert isinstance(cr, ComputeResourceDescription), "Compute resource is not a ComputeResourceDescription" + assert cr.batchQueues is not None, "Compute resource has no batch queues" + for queue in cr.batchQueues: + if pref.resourceType == ResourceType.SLURM: + policy = next((p for p in gcr_policies if p.computeResourceId == pref.computeResourceId), None) + assert policy is not None, f"Compute resource policy not found for {pref.computeResourceId}" + if queue.queueName not in (policy.allowedBatchQueues or []): + continue + runtime = Remote( + cluster=pref.computeResourceId.split("_")[0], + category="GPU" if "gpu" in queue.queueName.lower() else "CPU", + queue_name=queue.queueName, + node_count=queue.maxNodes or 1, + cpu_count=queue.cpuPerNode or 1, + gpu_count=1 if "gpu" in queue.queueName.lower() else 0, + walltime=queue.maxRunTime or 30, + group=group, + ) + runtimes.append(runtime) + return runtimes - def get_task_status(self, experiment_id: str) -> tuple[str, Literal["SUBMITTED", "UN_SUBMITTED", "SETUP", "QUEUED", "ACTIVE", "COMPLETE", "CANCELING", "CANCELED", "FAILED", "HELD", "SUSPENDED", "UNKNOWN"] | None]: - states = ["SUBMITTED", "UN_SUBMITTED", "SETUP", "QUEUED", "ACTIVE", "COMPLETE", "CANCELING", "CANCELED", "FAILED", "HELD", "SUSPENDED", "UNKNOWN"] - job_details: dict = self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) # type: ignore + def get_task_status(self, experiment_id: str) -> tuple[str, JobState]: + job_details: dict[str, JobStatus] = self.api_server_client.get_job_statuses(self.airavata_token, experiment_id) # type: ignore job_id = job_state = None - # get the most recent job id and state for job_id, v in job_details.items(): - if v.reason in states: - job_state = v.reason - else: - job_state = states[int(v.jobState)] - return job_id or "N/A", job_state # type: ignore + job_state = v.jobState + return job_id or "N/A", job_state or JobState.UNKNOWN + + JobState = JobState diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/base.py b/dev-tools/airavata-python-sdk/airavata_experiments/base.py index e9ad36b68e..2ba99a3bcb 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/base.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/base.py @@ -85,49 +85,64 @@ def with_resource(self, resource: Runtime) -> Experiment[T]: self.resource = resource return self - def create_task(self, *allowed_runtimes: Runtime, name: str | None = None) -> None: + def add_run(self, use: list[Runtime], cpus: int, nodes: int, walltime: int, name: str | None = None, **extra_params) -> None: """ Create a task to run the experiment on a given runtime. """ - runtime = random.choice(allowed_runtimes) if len(allowed_runtimes) > 0 else self.resource + runtime = random.choice(use) if len(use) > 0 else self.resource uuid_str = str(uuid.uuid4())[:4].upper() - + # override runtime args with given values + runtime = runtime.model_copy() + runtime.args["cpu_count"] = cpus + runtime.args["node_count"] = nodes + runtime.args["walltime"] = walltime + # add extra inputs to task inputs + task_inputs = {**self.inputs, **extra_params} + # create a task with the given runtime and inputs self.tasks.append( Task( - name=name or f"{self.name}_{uuid_str}", + name=f"{name or self.name}_{uuid_str}", app_id=self.application.app_id, - inputs={**self.inputs}, + inputs=task_inputs, runtime=runtime, ) ) print(f"Task created. ({len(self.tasks)} tasks in total)") - def add_sweep(self, *allowed_runtimes: Runtime, **space: list) -> None: + def add_sweep(self, use: list[Runtime], cpus: int, nodes: int, walltime: int, name: str | None = None, **space: list) -> None: """ Add a sweep to the experiment. """ for values in product(space.values()): - runtime = random.choice(allowed_runtimes) if len(allowed_runtimes) > 0 else self.resource + runtime = random.choice(use) if len(use) > 0 else self.resource uuid_str = str(uuid.uuid4())[:4].upper() - + # override runtime args with given values + runtime = runtime.model_copy() + runtime.args["cpu_count"] = cpus + runtime.args["node_count"] = nodes + runtime.args["walltime"] = walltime + # add sweep params to task inputs task_specific_params = dict(zip(space.keys(), values)) agg_inputs = {**self.inputs, **task_specific_params} task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in self.input_mapping.items()} - + # create a task with the given runtime and inputs self.tasks.append(Task( - name=f"{self.name}_{uuid_str}", + name=f"{name or self.name}_{uuid_str}", app_id=self.application.app_id, inputs=task_inputs, runtime=runtime or self.resource, )) - def plan(self, **kwargs) -> Plan: - if len(self.tasks) == 0: - self.create_task(self.resource) + def plan(self) -> Plan: + assert len(self.tasks) > 0, "add_run() must be called before plan() to define runtimes and resources." tasks = [] for t in self.tasks: agg_inputs = {**self.inputs, **t.inputs} task_inputs = {k: {"value": agg_inputs[v[0]], "type": v[1]} for k, v in self.input_mapping.items()} - tasks.append(Task(name=t.name, app_id=self.application.app_id, inputs=task_inputs, runtime=t.runtime)) - return Plan(tasks=tasks) + task = Task(name=t.name, app_id=self.application.app_id, inputs=task_inputs, runtime=t.runtime) + # task.freeze() # TODO upload the task-related data and freeze the task + tasks.append(task) + plan = Plan(tasks=tasks) + plan.save() + return plan diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/plan.py b/dev-tools/airavata-python-sdk/airavata_experiments/plan.py index f231e2583d..4f41135610 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/plan.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/plan.py @@ -25,6 +25,7 @@ from .runtime import is_terminal_state from .task import Task import uuid +from airavata_auth.device_auth import AuthContext from .airavata import AiravataOperator @@ -66,11 +67,13 @@ def __stage_status__(self) -> list: statuses.append(task.status()) return statuses - def __stage_stop__(self) -> None: - print("Stopping task(s)...") - for task in self.tasks: - task.stop() - print("Task(s) stopped.") + def __stage_stop__(self, runs: list[int] = []) -> None: + runs = runs if len(runs) > 0 else list(range(len(self.tasks))) + print(f"Stopping task(s): {runs}") + for i, task in enumerate(self.tasks): + if i in runs: + task.stop() + print(f"Task(s) stopped: {runs}") def __stage_fetch__(self, local_dir: str) -> list[list[str]]: print("Fetching results...") @@ -78,7 +81,7 @@ def __stage_fetch__(self, local_dir: str) -> list[list[str]]: for task in self.tasks: fps.append(task.download_all(local_dir)) print("Results fetched.") - self.save_json(os.path.join(local_dir, "plan.json")) + self.export(os.path.join(local_dir, "plan.json")) return fps def launch(self, silent: bool = True) -> None: @@ -119,17 +122,17 @@ def download(self, local_dir: str): assert os.path.isdir(local_dir) self.__stage_fetch__(local_dir) - def stop(self) -> None: - self.__stage_stop__() + def stop(self, runs: list[int] = []) -> None: + self.__stage_stop__(runs) self.save() - def save_json(self, filename: str) -> None: + def export(self, filename: str) -> None: with open(filename, "w") as f: json.dump(self.model_dump(), f, indent=2) def save(self) -> None: settings = Settings() - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) az = av.__airavata_token__(av.access_token, av.default_gateway_id()) assert az.accessToken is not None assert az.claimsMap is not None @@ -162,7 +165,7 @@ def load_json(filename: str) -> Plan: def load(id: str | None) -> Plan: settings = Settings() assert id is not None - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) az = av.__airavata_token__(av.access_token, av.default_gateway_id()) assert az.accessToken is not None assert az.claimsMap is not None @@ -183,7 +186,7 @@ def load(id: str | None) -> Plan: def query() -> list[Plan]: settings = Settings() - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) az = av.__airavata_token__(av.access_token, av.default_gateway_id()) assert az.accessToken is not None assert az.claimsMap is not None diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py b/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py index e843135843..f5c40e3c86 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/runtime.py @@ -16,14 +16,37 @@ from __future__ import annotations import abc -from typing import Any +from typing import Any, Literal from pathlib import Path -import os import pydantic +from airavata_auth.device_auth import AuthContext + # from .task import Task Task = Any +States = Literal[ + # Experiment States + 'CREATED', + 'VALIDATED', + 'SCHEDULED', + 'LAUNCHED', + 'EXECUTING', + 'CANCELING', + 'CANCELED', + 'COMPLETED', + 'FAILED', + # Job States + 'SUBMITTED', + 'QUEUED', + 'ACTIVE', + 'COMPLETE', + 'CANCELED', + 'FAILED', + 'SUSPENDED', + 'UNKNOWN', + 'NON_CRITICAL_FAIL', +] class Runtime(abc.ABC, pydantic.BaseModel): @@ -36,6 +59,9 @@ def execute(self, task: Task) -> None: ... @abc.abstractmethod def execute_py(self, libraries: list[str], code: str, task: Task) -> None: ... + @abc.abstractmethod + def execute_cmd(self, cmd: str, task: Task) -> bytes: ... + @abc.abstractmethod def status(self, task: Task) -> tuple[str, str]: ... @@ -87,6 +113,9 @@ def execute(self, task: Task) -> None: task.agent_ref = str(uuid.uuid4()) task.ref = str(uuid.uuid4()) + def execute_cmd(self, cmd: str, task: Task) -> bytes: + return b"" + def execute_py(self, libraries: list[str], code: str, task: Task) -> None: pass @@ -135,7 +164,7 @@ def execute(self, task: Task) -> None: print(f"[Remote] Creating Experiment: name={task.name}") from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) try: launch_state = av.launch_experiment( experiment_name=task.name, @@ -158,6 +187,23 @@ def execute(self, task: Task) -> None: except Exception as e: print(f"[Remote] Failed to launch experiment: {repr(e)}") raise e + + def execute_cmd(self, cmd: str, task: Task) -> bytes: + assert task.ref is not None + assert task.agent_ref is not None + assert task.pid is not None + assert task.sr_host is not None + assert task.workdir is not None + + from .airavata import AiravataOperator + av = AiravataOperator(AuthContext.get_access_token()) + try: + result = av.execute_cmd(task.agent_ref, cmd) + return result + except Exception as e: + print(f"[Remote] Failed to execute command: {repr(e)}") + return b"" + def execute_py(self, libraries: list[str], code: str, task: Task) -> None: assert task.ref is not None @@ -165,29 +211,29 @@ def execute_py(self, libraries: list[str], code: str, task: Task) -> None: assert task.pid is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) result = av.execute_py(task.project, libraries, code, task.agent_ref, task.pid, task.runtime.args) print(result) - def status(self, task: Task) -> tuple[str, str]: + def status(self, task: Task) -> tuple[str, States]: assert task.ref is not None assert task.agent_ref is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) # prioritize job state, fallback to experiment state job_id, job_state = av.get_task_status(task.ref) - if not job_state or job_state == "UN_SUBMITTED": - return job_id, av.get_experiment_status(task.ref) + if job_state in [AiravataOperator.JobState.UNKNOWN, AiravataOperator.JobState.NON_CRITICAL_FAIL]: + return job_id, av.get_experiment_status(task.ref).name else: - return job_id, job_state + return job_id, job_state.name def signal(self, signal: str, task: Task) -> None: assert task.ref is not None assert task.agent_ref is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) av.stop_experiment(task.ref) def ls(self, task: Task) -> list[str]: @@ -198,7 +244,7 @@ def ls(self, task: Task) -> list[str]: assert task.workdir is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) files = av.list_files(task.pid, task.agent_ref, task.sr_host, task.workdir) return files @@ -210,7 +256,7 @@ def upload(self, file: Path, task: Task) -> str: assert task.workdir is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) result = av.upload_files(task.pid, task.agent_ref, task.sr_host, [file], task.workdir).pop() return result @@ -222,7 +268,7 @@ def download(self, file: str, local_dir: str, task: Task) -> str: assert task.workdir is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) result = av.download_file(task.pid, task.agent_ref, task.sr_host, file, task.workdir, local_dir) return result @@ -234,30 +280,44 @@ def cat(self, file: str, task: Task) -> bytes: assert task.workdir is not None from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) + av = AiravataOperator(AuthContext.get_access_token()) content = av.cat_file(task.pid, task.agent_ref, task.sr_host, file, task.workdir) return content -def list_runtimes( +def find_runtimes( cluster: str | None = None, category: str | None = None, - group: str | None = None, node_count: int | None = None, cpu_count: int | None = None, - walltime: int | None = None, + group: str | None = None, ) -> list[Runtime]: from .airavata import AiravataOperator - av = AiravataOperator(os.environ['CS_ACCESS_TOKEN']) - all_runtimes = av.get_available_runtimes() - out_runtimes = [] - for r in all_runtimes: - if (cluster in [None, r.args["cluster"]]) and (category in [None, r.args["category"]]) and (group in [None, r.args["group"]]): - r.args["node_count"] = node_count or r.args["node_count"] - r.args["cpu_count"] = cpu_count or r.args["cpu_count"] - r.args["walltime"] = walltime or r.args["walltime"] - out_runtimes.append(r) - return out_runtimes - -def is_terminal_state(x): - return x in ["CANCELED", "COMPLETED", "FAILED"] \ No newline at end of file + av = AiravataOperator(AuthContext.get_access_token()) + grps = av.get_available_groups() + grp_names = [str(x.groupResourceProfileName) for x in grps] + if group is not None: + assert group in grp_names, f"Group {group} was not found. Available groups: {repr(grp_names)}" + groups = [g for g in grps if str(g.groupResourceProfileName) == group] + else: + groups = grps + runtimes = [] + for g in groups: + matched_runtimes = [] + assert g.groupResourceProfileName is not None, f"Group {g} has no name" + r: Runtime + for r in av.get_available_runtimes(group=g.groupResourceProfileName): + if (node_count or 1) > int(r.args["node_count"]): + continue + if (cpu_count or 1) > int(r.args["cpu_count"]): + continue + if (cluster or r.args["cluster"]) != r.args["cluster"]: + continue + if (category or r.args["category"]) != r.args["category"]: + continue + matched_runtimes.append(r) + runtimes.extend(matched_runtimes) + return runtimes + +def is_terminal_state(x: States) -> bool: + return x in ["CANCELED", "COMPLETE", "COMPLETED", "FAILED"] \ No newline at end of file diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/task.py b/dev-tools/airavata-python-sdk/airavata_experiments/task.py index bcda796518..fea221b1cf 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/task.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/task.py @@ -72,7 +72,13 @@ def download(self, file: str, local_dir: str) -> str: assert self.ref is not None from pathlib import Path Path(local_dir).mkdir(parents=True, exist_ok=True) - return self.runtime.download(file, local_dir, self) + try: + saved_path = self.runtime.download(file, local_dir, self) + print(f"[Remote] Downloaded {file} -> {saved_path}") + return saved_path + except Exception as e: + print(f"[Remote] Failed to download file: {repr(e)}") + return "" def download_all(self, local_dir: str) -> list[str]: assert self.ref is not None @@ -92,6 +98,10 @@ def download_all(self, local_dir: str) -> list[str]: def cat(self, file: str) -> bytes: assert self.ref is not None return self.runtime.cat(file, self) + + def exec(self, cmd: str) -> bytes: + assert self.ref is not None + return self.runtime.execute_cmd(cmd, self) def stop(self) -> None: assert self.ref is not None diff --git a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py index 1528304462..c445e654b2 100644 --- a/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py +++ b/dev-tools/airavata-python-sdk/airavata_jupyter_magic/__init__.py @@ -43,6 +43,7 @@ from jupyter_client.blocking.client import BlockingKernelClient from airavata_auth.device_auth import AuthContext +from airavata_experiments.plan import Plan from airavata_sdk import Settings # ======================================================================== @@ -62,6 +63,7 @@ class RequestedRuntime: group: str file: str | None use: str | None + plan: str | None class ProcessState(IntEnum): @@ -101,6 +103,7 @@ class ProcessState(IntEnum): ('envName', str), ('pids', list[int]), ('tunnels', dict[str, tuple[str, int]]), + ('links', dict[str, str]), ]) PENDING_STATES = [ @@ -139,23 +142,6 @@ class State: # HELPER FUNCTIONS -def get_access_token(envar_name: str = "CS_ACCESS_TOKEN", state_path: str = "/tmp/av.json") -> str | None: - """ - Get access token from environment or file - - @param None: - @returns: access token if present, None otherwise - - """ - token = os.getenv(envar_name) - if not token: - try: - token = json.load(Path(state_path).open("r")).get("access_token") - except (FileNotFoundError, json.JSONDecodeError): - pass - return token - - def is_runtime_ready(access_token: str, rt: RuntimeInfo, rt_name: str): """ Check if the runtime (i.e., agent job) is ready to receive requests @@ -470,7 +456,8 @@ def submit_agent_job( memory: int | None = None, gpus: int | None = None, gpu_memory: int | None = None, - file: str | None = None, + spec_file: str | None = None, + plan_file: str | None = None, ) -> None: """ Submit an agent job to the given runtime @@ -487,7 +474,8 @@ def submit_agent_job( @param memory: the memory for cpu (MB) @param gpus: the number of gpus (int) @param gpu_memory: the memory for gpu (MB) - @param file: environment file (path) + @param spec_file: environment file (path) + @param plan_file: experiment plan file (path) @returns: None """ @@ -506,14 +494,14 @@ def submit_agent_job( pip: list[str] = [] # if file is provided, validate it and use the given values as defaults - if file is not None: - fp = Path(file) + if spec_file is not None: + fp = Path(spec_file) # validation - assert fp.exists(), f"File {file} does not exist" + assert fp.exists(), f"File {spec_file} does not exist" with open(fp, "r") as f: - content = yaml.safe_load(f) + spec = yaml.safe_load(f) # validation: /workspace - assert (workspace := content.get("workspace", None)) is not None, "missing section: /workspace" + assert (workspace := spec.get("workspace", None)) is not None, "missing section: /workspace" assert (resources := workspace.get("resources", None)) is not None, "missing section: /workspace/resources" assert (min_cpu := resources.get("min_cpu", None)) is not None, "missing section: /workspace/resources/min_cpu" assert (min_mem := resources.get("min_mem", None)) is not None, "missing section: /workspace/resources/min_mem" @@ -523,12 +511,29 @@ def submit_agent_job( assert (datasets := workspace.get("data_collection", None)) is not None, "missing section: /workspace/data_collection" collection = models + datasets # validation: /additional_dependencies - assert (additional_dependencies := content.get("additional_dependencies", None)) is not None, "missing section: /additional_dependencies" + assert (additional_dependencies := spec.get("additional_dependencies", None)) is not None, "missing section: /additional_dependencies" assert (modules := additional_dependencies.get("modules", None)) is not None, "missing /additional_dependencies/modules section" assert (conda := additional_dependencies.get("conda", None)) is not None, "missing /additional_dependencies/conda section" assert (pip := additional_dependencies.get("pip", None)) is not None, "missing /additional_dependencies/pip section" mounts = [f"{i['identifier']}:{i['mount_point']}" for i in collection] + # if plan file is provided, link its runs into the workspace + links = {} + if plan_file is not None: + assert Path(plan_file).exists(), f"Plan {plan_file} does not exist" + assert Path(plan_file).is_file(), f"Plan {plan_file} is not a file" + with open(Path(plan_file), "r") as f: + from airavata_experiments.base import Plan + plan = Plan(**json.load(f)) + for task in plan.tasks: + plan_cluster = str(task.runtime.args.get("cluster", "N/A")) + if plan_cluster == "N/A": + print(f"[av] skipping plan task {task.name}: cluster not specified in the plan") + continue + assert plan_cluster == cluster, f"[av] cluster mismatched: {plan_cluster} in plan, {cluster} in request" + assert task.pid is not None, f"[av] plan task {task.name} has no pid" + links[task.pid] = task.name + # payload data = { 'experimentName': app_name, @@ -554,6 +559,7 @@ def submit_agent_job( print(f"* libraries={data['libraries']}", flush=True) print(f"* pip={data['pip']}", flush=True) print(f"* mounts={data['mounts']}", flush=True) + print(f"* links={links}", flush=True) # Send the POST request headers = generate_headers(access_token, gateway_id) @@ -584,6 +590,7 @@ def submit_agent_job( envName=obj['envName'], pids=[], tunnels={}, + links=links, ) state.all_runtimes[rt_name] = rt print(f'Requested runtime={rt_name}', flush=True) @@ -1114,7 +1121,7 @@ def request_runtime(line: str): Request a runtime with given capabilities """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None [rt_name, *cmd_args] = line.strip().split() @@ -1151,12 +1158,32 @@ def request_runtime(line: str): p.add_argument("--group", type=str, help="resource group", required=False, default="Default") p.add_argument("--file", type=str, help="yml file", required=False) p.add_argument("--use", type=str, help="allowed resources", required=False) + p.add_argument("--plan", type=str, help="experiment plan file", required=False) args = p.parse_args(cmd_args, namespace=RequestedRuntime()) if args.file is not None: - assert args.use is not None - cluster, queue = meta_scheduler(args.use.split(",")) + assert (args.use or args.plan) is not None + if args.use: + cluster, queue = meta_scheduler(args.use.split(",")) + else: + assert args.plan is not None, "--plan is required when --use is not provided" + assert os.path.exists(args.plan), f"--plan={args.plan} file does not exist" + assert os.path.isfile(args.plan), f"--plan={args.plan} is not a file" + with open(args.plan, "r") as f: + plan: Plan = Plan(**json.load(f)) + clusters = [] + queues = [] + for task in plan.tasks: + c, q = task.runtime.args.get("cluster"), task.runtime.args.get("queue_name") + clusters.append(c) + queues.append(q) + assert len(set(clusters)) == 1, "all tasks must be on the same cluster" + assert len(set(queues)) == 1, "all tasks must be on the same queue" + cluster, queue = clusters[0], queues[0] + assert cluster is not None, "cluster is required" + assert queue is not None, "queue is required" + submit_agent_job( rt_name=rt_name, access_token=access_token, @@ -1166,7 +1193,8 @@ def request_runtime(line: str): cluster=cluster, queue=queue, group=args.group, - file=args.file, + spec_file=args.file, + plan_file=args.plan, ) else: assert args.cluster is not None @@ -1194,7 +1222,7 @@ def stat_runtime(line: str): Show the status of the runtime """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt_name = line.strip() @@ -1226,7 +1254,7 @@ def wait_for_runtime(line: str): rt_name, render_live_logs = parts[0], True else: raise ValueError("Usage: %wait_for_runtime [--live]") - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt = state.all_runtimes.get(rt_name, None) @@ -1238,8 +1266,14 @@ def wait_for_runtime(line: str): random_port = random.randint(2000, 6000) * 5 launch_remote_kernel(rt_name, random_port, hostname="127.0.0.1") print(f"Remote Jupyter kernel launched and connected for runtime={rt_name}.") - return - + + # create symlinks + for pid, name in rt.links.items(): + try: + state.kernel_clients[rt_name].execute(f"!ln -s ../{pid} {name}", silent=True, store_history=False) + print(f"[av] linked ../{pid} -> {name}") + except Exception as e: + print(f"[av] failed to link ../{pid} -> {name}: {e}") @register_line_magic def run_subprocess(line: str): @@ -1247,7 +1281,7 @@ def run_subprocess(line: str): Run a subprocess asynchronously """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt_name = state.current_runtime @@ -1279,7 +1313,7 @@ def kill_subprocess(line: str): Kill a running subprocess asynchronously """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt_name = state.current_runtime @@ -1308,7 +1342,7 @@ def open_tunnels(line: str): Open tunnels to the runtime """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt_name = state.current_runtime @@ -1348,7 +1382,7 @@ def close_tunnels(line: str): Close tunnels to the runtime """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt_name = state.current_runtime @@ -1374,7 +1408,7 @@ def restart_runtime(rt_name: str): Restart the runtime """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt = state.all_runtimes.get(rt_name, None) @@ -1389,7 +1423,7 @@ def stop_runtime(rt_name: str): Stop the runtime """ - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt = state.all_runtimes.get(rt_name, None) @@ -1457,7 +1491,7 @@ def launch_remote_kernel(rt_name: str, base_port: int, hostname: str): Launch a remote Jupyter kernel, open tunnels, and connect a local Jupyter client. """ assert ipython is not None - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None # launch kernel and tunnel ports @@ -1535,7 +1569,7 @@ def open_web_terminal(line: str): cmd = f"ttyd -p {random_port} -i 0.0.0.0 --writable bash" # Get access token - access_token = get_access_token() + access_token = AuthContext.get_access_token() if access_token is None: print("Not authenticated. Please run %authenticate first.") return @@ -1667,7 +1701,7 @@ async def run_cell_async( return await orig_run_code(raw_cell, store_history, silent, shell_futures, transformed_cell=transformed_cell, preprocessing_exc_tuple=preprocessing_exc_tuple, cell_id=cell_id) else: # Validation: check runtime is ready and kernel is started - access_token = get_access_token() + access_token = AuthContext.get_access_token() assert access_token is not None rt_info = state.all_runtimes.get(rt, None) if rt_info is None: diff --git a/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py b/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py index 546c3bcd67..0a559918e8 100644 --- a/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py +++ b/dev-tools/airavata-python-sdk/airavata_sdk/__init__.py @@ -164,7 +164,7 @@ def GATEWAY_DATA_STORE_DIR(self): @property def STORAGE_RESOURCE_HOST(self): - return str(os.getenv("STORAGE_RESOURCE_HOST", "cybershuttle.org")) + return str(os.getenv("STORAGE_RESOURCE_HOST", "gateway.cybershuttle.org")) @property def SFTP_PORT(self): diff --git a/dev-tools/airavata-python-sdk/pyproject.toml b/dev-tools/airavata-python-sdk/pyproject.toml index d6fa91b41e..d995930f1b 100644 --- a/dev-tools/airavata-python-sdk/pyproject.toml +++ b/dev-tools/airavata-python-sdk/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "airavata-python-sdk" -version = "2.1.7" +version = "2.2.3" description = "Apache Airavata Python SDK" readme = "README.md" license = "Apache-2.0" diff --git a/modules/agent-framework/airavata-agent/application/README.md b/modules/agent-framework/airavata-agent/application/README.md new file mode 100644 index 0000000000..f57bb9b204 --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/README.md @@ -0,0 +1,19 @@ +# Applications Scripts - Agent + +## NAMD + +### Expanse +```sh +NAMD_CPU_MODULES="cpu/0.17.3b gcc/10.2.0 openmpi/4.1.1" +NAMD_CPU_PATH=/home/scigap/applications/NAMD_3.1alpha2_Linux-x86_64-multicore +NAMD_GPU_MODULES="gpu/0.17.3b" +NAMD_GPU_PATH=/home/scigap/applications/NAMD_3.0.1_Linux-x86_64-multicore-CUDA +``` + +### Delta +```sh +NAMD_CPU_MODULES="openmpi/4.1.6 fftw/3.3.10" +NAMD_CPU_PATH=/sw/namd/NAMD_3.0b3_Linux-x86_64-multicore +NAMD_GPU_MODULES="namd3/2024.02.mulitcore_cuda.s11" +NAMD_GPU_PATH=/sw/namd/NAMD_3.0b3_Linux-x86_64-multicore-CUDA +``` diff --git a/modules/agent-framework/airavata-agent/application/alphafold2.sh b/modules/agent-framework/airavata-agent/application/alphafold2.sh new file mode 100644 index 0000000000..2d1318ffe0 --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/alphafold2.sh @@ -0,0 +1,91 @@ +#!/bin/bash -x +set -euo pipefail + +while getopts t:p:m: option + do + case $option in + t ) MaxDate=$OPTARG ;; + p ) MODEL_PRESET=$OPTARG ;; + m ) Num_Multi=$OPTARG ;; + \? ) cat << ENDCAT1 +>! Usage: $0 [-t Maximum Template Date ] !< +>! [-p Model Preset ] !< +>! [-m Number of Multimers per Model ] !< +ENDCAT1 +# exit 1 ;; + esac +done + +if [ $Num_Multi = "" ]; then + export Num_Multi=1 +fi +#set the environment PATH +export PYTHONNOUSERSITE=True +module reset +module load singularitypro +ALPHAFOLD_HOME=/expanse/projects/qstore/data/alphafold-v2.3.2 +ALPHAFOLD_MODELS=$ALPHAFOLD_HOME/params + +pdb70="" +uniprot="" +pdbseqres="" +nummulti="" + +# check_flags + if [ "monomer" = "${MODEL_PRESET%_*}" ];then + export pdb70="--pdb70_database_path=/data/pdb70/pdb70" + else + export uniprot="--uniprot_database_path=/data/uniprot/uniprot.fasta" + export pdbseqres="--pdb_seqres_database_path=/data/pdb_seqres/pdb_seqres.txt" + export nummulti="--num_multimer_predictions_per_model=$Num_Multi" + fi + +## Copy input to node local scratch +cp input.fasta /scratch/$USER/job_$SLURM_JOBID +#cp -r /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08 /scratch/$USER/job_$SLURM_JOBID/ +cd /scratch/$USER/job_$SLURM_JOBID +ln -s /expanse/projects/qstore/data/alphafold/uniclust30/uniclust30_2018_08 +mkdir bfd +cp /expanse/projects/qstore/data/alphafold/bfd/*index bfd/ +#cp /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata bfd/ +#cp /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata bfd/ +cd bfd +ln -s /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_hhm.ffdata +ln -s /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_cs219.ffdata +ln -s /expanse/projects/qstore/data/alphafold/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt_a3m.ffdata +cd ../ +mkdir alphafold_output +# Create soft links ro rundir form submitdir + +ln -s /scratch/$USER/job_$SLURM_JOBID $SLURM_SUBMIT_DIR/rundir + +#Run the command +singularity run --nv \ + -B /expanse/lustre \ + -B /expanse/projects \ + -B /scratch \ + -B $ALPHAFOLD_HOME:/data \ + -B $ALPHAFOLD_MODELS \ + /cm/shared/apps/containers/singularity/alphafold/alphafold_aria2_v2.3.2.simg \ + --fasta_paths=/scratch/$USER/job_$SLURM_JOBID/input.fasta \ + --uniref90_database_path=/data/uniref90/uniref90.fasta \ + --data_dir=/data \ + --mgnify_database_path=/data/mgnify/mgy_clusters_2022_05.fa \ + --bfd_database_path=/scratch/$USER/job_$SLURM_JOBID/bfd/bfd_metaclust_clu_complete_id30_c90_final_seq.sorted_opt \ + --uniref30_database_path=/data/uniref30/UniRef30_2021_03 \ + $pdbseqres \ + $pdb70 \ + $uniprot \ + --template_mmcif_dir=/data/pdb_mmcif/mmcif_files \ + --obsolete_pdbs_path=/data/pdb_mmcif/obsolete.dat \ + --output_dir=/scratch/$USER/job_$SLURM_JOBID/alphafold_output \ + --max_template_date=$MaxDate \ + --model_preset=$MODEL_PRESET \ + --use_gpu_relax=true \ + --models_to_relax=best \ + $nummulti + +unlink $SLURM_SUBMIT_DIR/rundir + +### Copy back results +tar -cvf $SLURM_SUBMIT_DIR/alphafold_output.tar alphafold_output diff --git a/modules/agent-framework/airavata-agent/application/gaussian16.sh b/modules/agent-framework/airavata-agent/application/gaussian16.sh new file mode 100644 index 0000000000..f4089d116a --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/gaussian16.sh @@ -0,0 +1,256 @@ +#!/bin/sh -x +# $Id: run_g09_chk_recovery.sh,v 1.0 2017/04/22 14:15:00 Sudhakar Exp $ + +if [ $# -lt 1 -o $# -gt 3 ]; then + echo 1>&2 "Usage: $0 gaussian_input_gjf [SEAGrid_UserName] [GPU] " + #echo 1>&2 "Usage: $0 subdir gaussian_input_gjf clobber [gcvars]" + exit 127 +fi + +# subdir depends on whether we're doing freq, water or PES. For freq and water, +# it should be hardcoded in the Xbaya workflow. For PES, it should be an +# additional array generated by the frontend. The contents of this array are +# trivial, but creating an extra Xbaya service to generate it would add +# unnecessary extra complexity. Besides, the frontend cannot avoid having to +# pass at least one array: the array with gjf files. +#subdir=$1 +subdir="$PWD" +#export GAUSS_SCRDIR=/oasis/scratch/comet/$USER/temp_project/$SLURM_JOBID +#export GAUSS_SCRDIR=/expanse/lustre/scratch/$USER/temp_project/$SLURM_JOBID +scratch_subid=$(id -u $user | tail -c2) +scrdir="/storage/scratch1/$scratch_subid/$USER" +export GAUSS_SCRDIR=$scrdir/$SLURM_JOBID +mkdir -p $GAUSS_SCRDIR +gaussian_input_full=$1 +if [ $AIRAVATA_USERNAME ]; then + echo " The Airavata Gateway User is $AIRAVATA_USERNAME" + SG_UserName="$AIRAVATA_USERNAME" +#elif [ $2 ]; then +# SG_UserName=$2 +else + echo " The Airavata Gateway User is missing " +exit +fi + + #export PATH="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16:$PATH" + #export GAUSS_EXEDIR="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16" + #. ~/.bash_profile +#if [ "$2" = "GPU" ]; then + echo "Using GPU version of Gaussian 16" + #module reset; module load gpu/0.15.4 gaussian/16.C.01-cuda + #module load gaussian/16.C.02 + #export PATH="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16:$PATH" + #export GAUSS_EXEDIR="/storage/home/hcoda1/7/spamidig6/ideas_storage/apps/g16" + #. ~/.bash_profile +#fi + +if [ $AIRAVATA_ExptDataDir ]; then + echo "The Airavata Storage Directory for this job is $AIRAVATA_ExptDataDir" + echo "Preparing Cron to push log data to storage periodically" + # Get Slurm total time +# flds=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' | awk -F: '{print NF}'` +# flds=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' | awk -F- '{print NF}'` + $flds +# # if flds 3 $1 is hrs $2 is min and $3 is sec +# if [ $flds = 4 ]; then +# jdys=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F- '{print $1}'` +# hrmnse=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F- '{print $2}'` +# jhrs=`echo $hrmnse | awk -F: '{print $1}'`+24*$jdys +# jmin=`echo $hrmnse | awk -F: '{print $2}'`+60*$jhrs +# elif [ $flds = 3 ]; then +# jhrs=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F: '{print $1}'` +# jmin=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F: '{print $2}'`+60*$jhrs +# elif [ $flds = 2 ]; then +# jmin=`squeue -j $SLURM_JOBID -l | awk 'END { print $7}' |awk -F: '{print $1}'` +# fi +# if [ $jhrs .gt. 5 ]; then +# upd=30 +# else +# upd=$jmin/10 +# fi + # For 5hrs and above uperiod is 30 min and for less than 5 hrs it is 10% of total time + # compute uperiod + # preapre or use prepared updateatorage script + # i(crontab -l 2>/dev/null; echo "*/$uperiod * * * * /path/to/job -with args") | crontab - */30 * * * * /path/to/command + # Use crontab -r to remove it after the job is finished or ended ( under trap) +# mycron=jcron$$ +# #write out current crontab +# crontab -l > $mycron +# #echo new cron into cron file +# #echo "*/$upd * * * * /home/gridchem/bin/joblogscp.sh >/dev/null 2>&1" >> $mycron +# echo "*/$upd * * * * scp *.log pga@gf4.ucs.indiana.edu:$AIRAVATA_ExptDataDir >/dev/null 2>&1" >> $mycron +# #echo "* * * * * sleep 20; /Users/spamidig1/bin/testscp.sh >/dev/null 2>&1" >> $mycron +##install new cron file +# crontab $mycron +# crontab -l +# # delete this entry at the end of the job or trap... +fi +inputs=`echo $AIRAVATA_INPUTS | sed 's/[//'` +inputs=`echo $inputs | sed 's/]//'` +echo "Airavata Inputs: $inputs" +cd $subdir +dos2unix -k $gaussian_input_full +gaussian_input=${gaussian_input_full##*/} +gaussian_output=${gaussian_input%.*}.log +clobber="$3" # set to "" for rerun or debug; otherwise set to 1 +gcvars=$4 +null="" + +# Next line will ususally return "cp: `GCVARS' and `GCVARS' are the same file" +#if [ "$gcvars" ] ; then cp -p $gcvars GCVARS 2>/dev/null ; fi + +#if [ ! "$LOCAL_LOCATION" ] ; then +# if [ -s ~/.paramchemlocation ] ; then +# read LOCAL_LOCATION < ~/.paramchemlocation +# . $LOCAL_LOCATION/environment +# fi +# fi +#if [ ! -d "$LOCAL_LOCATION" ] ; then +# echo "Warning: no valid LOCAL_LOCATION found" >&2 +#gauss_mem=56GB +#gauss_nproc=24 +## Escaping spaces rather than quoting because quoting prevents tilde expansion +##charmm_location=~gridchem/workflow_script/sys_exec/local/c36a6-132-serial\ -chsize\ 25140 +#SCRIPT_LOCATION=~gridchem/workflow_script/sys_exec/tools +#LOCAL_LOCATION=~gridchem/workflow_script/sys_exec/local-comet +#export CONPATH=$SCRIPT_LOCATION +localarc="$HOME/scratch" +##globalarc="ccguser@gridchem.uits.iu.edu:/home/ccguser/mss/internal/$SGUserName" +## exit 1 +# fi +#. $LOCAL_LOCATION/environment + +#read GC_UserName GC_ProjectName GC_WorkflowName TIMESTAMP < GCVARS +usrchkdir=$localarc/${SG_UserName}/ +echo " The Airavata Gateway User Directory is $usrchkdir" +copysubdir="./" +mkdir -p $usrchkdir +#copysubdir=$localarc/${GC_UserName}/${GC_ProjectName}/${GC_WorkflowName}/$subdir +# The way a "false" boolean variable is passed seems to be unstable; it's +# been "", "0" and "false", so we try to cover all reasonable possibilities. +if [ ! "$clobber" ] ; then + clobber=0 + fi +if [ "$clobber" = "0" -o "$clobber" = "false" -o "$clobber" = "no" ] ; then + if [ -s $copysubdir/$gaussian_output ] ; then + echo gaussian_output_log=$copysubdir/$gaussian_output + exit 0 + fi + gaussian_output_full=${gaussian_input_full%.*}.log + if [ -s $gaussian_output_full ] ; then + #mkdir -p $copysubdir + rm -f $copysubdir/$gaussian_output # clean up symlink if something went wrong earlier + if [ $gaussian_input_full -nt $copysubdir/$gaussian_input ] ; then sed 's/\r$//' $gaussian_input_full > $copysubdir/$gaussian_input ; fi + cp -up $gaussian_output_full $copysubdir + echo gaussian_output_log=$gaussian_output_full + exit 0 + fi + fi + +#Process inputfile for run files and other job requirements + # PROCESS CHECKPOINT FILE + # Check to see if the checkpoint file is given a name in input deck + # Input file to look into + dummy="$gaussian_input_full" + #dummy="$Diskinputdir/$Diskinputfile" + checkpt="no" + junk=`/usr/bin/head -5 $dummy | /bin/grep -i "%chk"` + if [ "$junk" != "" ]; then + junk=`echo $junk | /bin/sed 's/=/@/'` + junk=`echo $junk | /bin/sed 's/ //'` + # + # Be careful: Don't lose the subdirectory information for CHKPT file + # Also, add .chk if there is no extension to the Checkpoint file + # + Chkfile=`expr $junk : '.*@\(.*\)'` + Chkdir="$Diskinputdir" + Chkfile=`/bin/basename $Chkfile` + dummy=`expr $Chkfile : '.*\(\..*\)'` + Chkfile=`/bin/basename $Chkfile $dummy` + ChkfileWNE="$Chkfile" + Chkfile="$Chkfile${dummy:-.chk}" +//"`Chkfile=`echo $Chkfile | sed "s/ + # 2005/12/08 create name for $formated_chkfile + formated_chkfile="$ChkfileWNE.fchk" + Chkfile_with_arch="${Chkfile}_$architecture" + echo "DEBUG: checkfile = $Chkfile and formated_chkfile = $formated_chkfile "; + checkpt="yes" +#Retrieve the checkpoint file from the user archive directory + if [ -f "$usrchkdir/$Chkfile" ]; then + cp $usrchkdir/$Chkfile . + fi +//"` export PJobID=`grep -i localjobid $gaussian_input_full | awk -F= '{print $2}' | sed "s/ +# /bin/cat >> $qsubin << HERE2 + #export PJobID=`grep -i localjobid $Diskinputfile | awk -F= '{print $2}' | sed "s/^M//"` + #cd /oasis/scratch/comet/gridchem/temp_project/$PJobID + #cd \$SCRATCH_BATCH + ##if [ ${PJobID:-null} != "$null" ]; then + if [ "${PJobID}" != "" ]; then + #cp -r /work/ccguser/batch_scratch/$PJobID*/* . + #cp -r /oasis/scratch/comet/gridchem/temp_project/$PJobID*/* . + cp -r $HOME/scratch/$PJobID*/* . + ls -l + fi + else + echo "******** NO CHECKPOINT FILE IDENTIFIED ******** " + fi + +mkdir -p $copysubdir +mkdir -p $subdir +cd $subdir +cwd=`pwd` +if [ $gaussian_input_full -nt $copysubdir/$gaussian_input ] ; then sed 's/\r$//' $gaussian_input_full > $copysubdir/$gaussian_input ; fi +cd $copysubdir +rm -f $gaussian_output +if [ "$cwd" != "$subdir" ]; then + ln -s $cwd/$gaussian_output $subdir/$gaussian_output +fi +cd $cwd +if [ $gaussian_input_full -nt $gaussian_input ] ; then sed 's/\r$//' $gaussian_input_full > $gaussian_input ; fi +signals_to_trap="XCPU INT TERM CHLD" +#trap "| grep -v $AIRAVATA_ExptDataDir | crontab -; rm -rf $mycron; cp -p $gaussian_output $copysubdir; cp -p $Chkfile $copysubdir; exit 99" $signals_to_trap +#trap "crontab -l | grep -v $AIRAVATA_ExptDataDir | crontab -; rm -rf $mycron; cp -p $gaussian_output $copysubdir; cp -p $Chkfile $copysubdir; exit 99" $signals_to_trap +cd $HOME/scratch +ln -s $subdir $SLURM_JOBID +ls -l $SLURM_JOBID/ +cd $cwd +#$LOCAL_LOCATION/run_gaussian_local.sh $gaussian_input $gaussian_output +which g16 +g16 $gaussian_input $gaussian_output +#BEGIN{while(getline < infile) if ($0 ~ "^ *--[lL][iI][nN][kK]1--") nlink++} + +#if awk -v infile=$gaussian_input ' +# BEGIN{while(getline < infile) if ($0 ~ "^ *[lL][iI][nN][kK]1") nlink++} +# /^ *Normal termination of Gaussian/{nnormal++} +# END{if (nnormal == nlink+1) exit 1}' $gaussian_output ; then +# echo "Gaussian terminated abnormally." >&2 +# exit 1 +#fi +# Remove the cron entry to periodically stage the data to storage +#crontab -l | grep -v "$AIRAVATA_ExptDataDir"" | crontab - +#crontab -l +#rm $mycron + +#rm $copysubdir/$gaussian_output +cp -p $gaussian_output $copysubdir + if [ -f "$Chkfile" ]; then + cp -p $Chkfile $copysubdir + fi + if [ -f "$GAUSS_SCRDIR/$Chkfile" ]; then + cp -p $GAUSS_SCRDIR/$Chkfile . + fi + # Save checkpoint file to usrchkdir + #mkdir -p $usrchkdir + if [ -f "$Chkfile" ]; then + formchk $Chkfile + cp -f $Chkfile $usrchkdir + cp -f *.fchk $usrchkdir + cp -f *.fchk $copysubdir + fi +#remove rwf files +rm *.rwf* +cd $HOME/scratch +#ln -s $subdir $PBS_JOBID +ls -l $SLURM_JOBID +rm $SLURM_JOBID/*.rwf* +echo gaussian_output_log=$cwd/$gaussian_output +cat: S: No such file or directory diff --git a/modules/agent-framework/airavata-agent/application/gromacs.sh b/modules/agent-framework/airavata-agent/application/gromacs.sh new file mode 100644 index 0000000000..dacb52f24e --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/gromacs.sh @@ -0,0 +1,165 @@ +#!/bin/sh -x +# $Id: run_Gromacs_data_recovery.sh,v 1.0 2017/11/23 12:15:00 Sudhakar Exp $ + +if [ $# -lt 1 -o $# -gt 11 ]; then + echo 1>&2 "Usage: $0 -c coord_file -s tpr_file -g log_file -e ener_file [SEAGrid_UserName] [Gromacs_restart_input] " + #echo 1>&2 "Usage: $0 subdir Gromacs_restart_input" + exit 127 +fi + +# subdir depends on whether we're doing freq, water or PES. For freq and water, +# it should be hardcoded in the Xbaya workflow. For PES, it should be an +# additional array generated by the frontend. The contents of this array are +# trivial, but creating an extra Xbaya service to generate it would add +# unnecessary extra complexity. Besides, the frontend cannot avoid having to +# pass at least one array: the array with gjf files. +#subdir=$1 +subdir="$PWD" +#Gromacs_res_input_full=$1 +Coord_file=$2 +Tpr_file=$4 +Log_file=$6 +Ener_file=$8 +Rest_file=$10 +if [ $AIRAVATA_USERNAME ]; then + echo " The Airavata Gateway User is $AIRAVATA_USERNAME" + SG_UserName="$AIRAVATA_USERNAME" +elif [ $9 ]; then + SG_UserName=$9 +else + echo " The Airavata Gateway User is missing " +exit +fi +dos2unix -k $Gromacs_res_input_full +Gromacs_input=${Gromacs_res_input_full##*/} +Gromacs_output=${Gromacs_input%.*}.log +#clobber="$3" # set to "" for rerun or debug; otherwise set to 1 +clobber="1" # set to "" for rerun or debug; otherwise set to 1 +#gcvars=$4 +gcvars="GCVARS" +null="" +localarc="$HOME/scratch" +# Next line will ususally return "cp: `GCVARS' and `GCVARS' are the same file" +if [ "$gcvars" ] ; then cp -p $gcvars GCVARS 2>/dev/null ; fi +## +#if [ ! "$LOCAL_LOCATION" ] ; then +# if [ -s ~/.paramchemlocation ] ; then +# read LOCAL_LOCATION < ~/.paramchemlocation +# fi +# fi +#if [ ! -d "$LOCAL_LOCATION" ] ; then +# echo "Error: no valid LOCAL_LOCATION found" >&2 +# exit 1 +# fi +#. $LOCAL_LOCATION/environment + +#read GC_UserName GC_ProjectName GC_WorkflowName TIMESTAMP < GCVARS +usrchkdir=$localarc/${SG_UserName}/ +echo " The Airavata Gateway User Directory is $usrchkdir" +copysubdir="./" +#copysubdir=$localarc/${GC_UserName}/${GC_ProjectName}/${GC_WorkflowName}/$subdir +# The way a "false" boolean variable is passed seems to be unstable; it's +# been "", "0" and "false", so we try to cover all reasonable possibilities. +if [ ! "$clobber" ] ; then + clobber=0 + fi +if [ "$clobber" = "0" -o "$clobber" = "false" -o "$clobber" = "no" ] ; then + if [ -s $copysubdir/$Gromacs_output ] ; then + echo Gromacs_output_log=$copysubdir/$Gromacs_output + exit 0 + fi + Gromacs_output_full=${Gromacs_input_full%.*}.log + if [ -s $Gromacs_output_full ] ; then + mkdir -p $copysubdir + rm -f $copysubdir/$Gromacs_output # clean up symlink if something went wrong earlier + if [ $Gromacs_input_full -nt $copysubdir/$Gromacs_input ] ; then sed 's/\r$//' $Gromacs_input_full > $copysubdir/$Gromacs_input ; fi + cp -up $Gromacs_output_full $copysubdir + echo Gromacs_output_log=$Gromacs_output_full + exit 0 + fi + fi + +#Process inputfile for run files and other job requirements +//"`export PJobID=`grep -i localjobid restart.txt| awk -F= '{print $2}' | sed "s/ + if [ ${PJobID:-null} != "$null" ]; then + cp -r /home/scigap/scratch/${SG_UserName}/$PJobID*/* . + fi +# # PROCESS CHECKPOINT FILE +# # Check to see if the checkpoint file is given a name in input deck +# # Input file to look into +# dummy="$Gromacs_input_full" +# #dummy="$Diskinputdir/$Diskinputfile" +# checkpt="no" +# junk=`/usr/bin/head -5 $dummy | /bin/grep -i "%chk"` +# if [ "$junk" != "" ]; then +# junk=`echo $junk | /bin/sed 's/=/@/'` +# junk=`echo $junk | /bin/sed 's/ //'` +# # +# # Be careful: Don't lose the subdirectory information for CHKPT file +# # Also, add .chk if there is no extension to the Checkpoint file +# # +# Chkfile=`expr $junk : '.*@\(.*\)'` +# Chkdir="$Diskinputdir" +# Chkfile=`/bin/basename $Chkfile` +# dummy=`expr $Chkfile : '.*\(\..*\)'` +# Chkfile=`/bin/basename $Chkfile $dummy` +# ChkfileWNE="$Chkfile" +# Chkfile="$Chkfile${dummy:-.chk}" +//"` Chkfile=`echo $Chkfile | sed "s/ +# # 2005/12/08 create name for $formated_chkfile +# formated_chkfile="$ChkfileWNE.fchk" +# Chkfile_with_arch="${Chkfile}_$architecture" +# echo "DEBUG: checkfile = $Chkfile and formated_chkfile = $formated_chkfile "; +# checkpt="yes" +##Retrieve the checkpoint file from the user archive directory +# if [ -f "$usrchkdir/$Chkfile" ]; then +# cp $usrchkdir/$Chkfile . +# fi +# else +# echo "******** NO CHECKPOINT FILE IDENTIFIED ******** " +# fi + +mkdir -p $copysubdir +mkdir -p $subdir +cd $subdir +cwd=`pwd` +if [ $Gromacs_input_full -nt $copysubdir/$Gromacs_input ] ; then sed 's/\r$//' $Gromacs_input_full > $copysubdir/$Gromacs_input ; fi +cd $copysubdir +rm -f $Gromacs_output +if [ "$cwd" != "$subdir" ]; then + ln -s $cwd/$Gromacs_output $subdir/$Gromacs_output +fi +cd $cwd +if [ $Gromacs_input_full -nt $Gromacs_input ] ; then sed 's/\r$//' $Gromacs_input_full > $Gromacs_input ; fi +module unload intel; module load gromacs +if [ ${PJobID:-null} != "$null" ]; then + cp -r /home/scigap/scratch/${SG_UserName}/$PJobID*/* . + #mpiexec -genv I_MPI_FABRICS shm:ofa gmx_mpi mdrun -s $Tpr_file -cpi state.cpt + mpirun -np $SLURM_NTASKS -genv I_MPI_FABRICS shm:ofa gmx_mpi mdrun -v -deffnm em -s $Tpr_file -cpi state.cpt +else + mpirun -np $SLURM_NTASKS -genv I_MPI_FABRICS shm:ofa gmx_mpi mdrun -v -deffnm em -s $Tpr_file -c $Coord_file -g $Log_file -e $Ener_file +fi + +mpiexec gmx_mpi mdrun -s $Tpr_file -cpi state.cpt +##$LOCAL_LOCATION/run_Gromacs_local.sh $Gromacs_input $Gromacs_output +##$LOCAL_LOCATION/run_Gromacs_local.sh $Gromacs_input $Gromacs_output +#BEGIN{while(getline < infile) if ($0 ~ "^ *--[lL][iI][nN][kK]1--") nlink++} + +#if awk -v infile=$Gromacs_input ' +# BEGIN{while(getline < infile) if ($0 ~ "^ *[lL][iI][nN][kK]1") nlink++} +# /^ *Normal termination of Gaussian/{nnormal++} +# END{if (nnormal == nlink+1) exit 1}' $Gromacs_output ; then +# echo "Gaussian terminated abnormally." >&2 +# exit 1 +#fi +#rm $copysubdir/$Gromacs_output +cp -p $Gromacs_output $copysubdir + # Save checkpoint file to usrchkdir +# mkdir -p $usrchkdir +# if [ -f "$Chkfile" ]; then +# cp -f $Chkfile $usrchkdir +# fi +# # Create a link directory for this job with jobID in user scratch + ln -s $PWD ~/scratch/${SG_USERNAME}/$PBS_JOBID +echo Gromacs_output_log=$cwd/$Gromacs_output +#rm *.wfc* diff --git a/modules/agent-framework/airavata-agent/application/namd.sh b/modules/agent-framework/airavata-agent/application/namd.sh new file mode 100644 index 0000000000..240cc2ad10 --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/namd.sh @@ -0,0 +1,130 @@ +#!/bin/bash -x +set -euo pipefail + +# ---------------------------------------------------------------------- +# SETUP +# ---------------------------------------------------------------------- +export PATH=$PWD:$PATH +export WORKDIR=$PWD +export CS_HOME=$HOME/cybershuttle +export MAMBA_ROOT_PREFIX=$CS_HOME/scratch +export TMPDIR=$CS_HOME/scratch/tmp + +# initialize scratch/tmp and scratch/envs (node-local) +CS_TEMP=$(readlink $CS_HOME/scratch/tmp) +CS_ENVS=$(readlink $CS_HOME/scratch/envs) +[ -n "$CS_TEMP" ] && mkdir -p $CS_TEMP +[ -n "$CS_ENVS" ] && mkdir -p $CS_ENVS +NAMD_EXTRA_ARGS=() +FIFO=$(mktemp -u) +mkfifo $FIFO + +# ---------------------------------------------------------------------- +# PARSE COMMAND LINE ARGUMENTS +# ---------------------------------------------------------------------- + +required_vars=("NAMD_CPU_PATH" "NAMD_GPU_PATH" "NAMD_CPU_MODULES" "NAMD_GPU_MODULES") +for var in "${required_vars[@]}"; do + if [ -z "${!var}" ]; then + echo "$var is not set" + exit 2 + fi +done + +while getopts t:n:i:a:s: option; do + case $option in + t) + if [[ "$OPTARG" != "CPU" && "$OPTARG" != "GPU" ]]; then + echo "invalid argument -t $OPTARG: must be CPU|GPU." + exit 2 + fi + EXECUTION_TYPE=$OPTARG + echo "EXECUTION_TYPE=$EXECUTION_TYPE" + module reset + if [ $EXECUTION_TYPE = "CPU" ]; then + # one replica at a time + echo 0 > $FIFO & + NAMD_PATH=$NAMD_CPU_PATH + module load $NAMD_CPU_MODULES + elif [ $EXECUTION_TYPE = "GPU" ]; then + # one replica per GPU + for ((i=0; i<${SLURM_GPUS_ON_NODE:-0}; i++)); do echo "$i" > $FIFO & done + NAMD_PATH=$NAMD_GPU_PATH + NAMD_EXTRA_ARGS+=("--CUDASOAintegrate" "on") + module load $NAMD_GPU_MODULES + fi + module list + ;; + n) + NUM_REPLICAS=$OPTARG + echo "NUM_REPLICAS=$NUM_REPLICAS" + ;; + i) + NAMD_INPUT_FILES=$(find $WORKDIR -maxdepth 1 -type f ! -name "*slurm*" ! -name "*.stdout" ! -name "*.stderr") + NAMD_CONF_FILE=$OPTARG + echo "NAMD_INPUT_FILES=$NAMD_INPUT_FILES" + echo "NAMD_CONF_FILE=$NAMD_CONF_FILE" + ;; + a) + AGENT_ID=$OPTARG + echo "AGENT_ID=$AGENT_ID" + ;; + s) + AGENT_SERVER=$OPTARG + echo "AGENT_SERVER=$AGENT_SERVER" + ;; + \?) + echo 1>&2 "Usage: $0" + echo 1>&2 " -t [CPU|GPU]" + echo 1>&2 " -n [NUM_REPLICAS]" + echo 1>&2 " -i [NAMD_CONF_FILE]" + echo 1>&2 " -a [AGENT_ID]" + echo 1>&2 " -s [AGENT_SERVER]" + exit 2 + ;; + esac +done +shift $((OPTIND - 1)) + +# ---------------------------------------------------------------------- +# RUN AGENT +# ---------------------------------------------------------------------- + +wget -q https://github.com/cyber-shuttle/binaries/releases/download/1.0.1/airavata-agent-linux-amd64 -O $WORKDIR/airavata-agent +wget -q https://github.com/cyber-shuttle/binaries/releases/download/1.0.1/kernel.py -O $WORKDIR/kernel.py +wget -q https://github.com/mamba-org/micromamba-releases/releases/download/2.3.0-1/micromamba-linux-64 -O $WORKDIR/micromamba +chmod +x $WORKDIR/airavata-agent $WORKDIR/micromamba +$WORKDIR/airavata-agent --server "$AGENT_SERVER:19900" --agent "$AGENT_ID" --environ "$AGENT_ID" --lib "" --pip "" & +AGENT_PID=$! +trap 'kill -TERM $AGENT_PID' EXIT +echo "Agent started with PID $AGENT_PID" + + +# ---------------------------------------------------------------------- +# RUN NAMD3 +# ---------------------------------------------------------------------- +PIDS=() +for REPLICA_ID in $(seq 1 $NUM_REPLICAS); do + ( + read TOKEN <$FIFO + + REPLICA_DIR=$WORKDIR/$REPLICA_ID + mkdir $REPLICA_DIR + cp $NAMD_INPUT_FILES $REPLICA_DIR/ + + [[ $EXECUTION_TYPE == "GPU" ]] && export CUDA_VISIBLE_DEVICES=$TOKEN + $NAMD_PATH/namd3 +setcpuaffinity +p $SLURM_CPUS_ON_NODE --cwd $REPLICA_DIR "${NAMD_EXTRA_ARGS[@]}" \ + $REPLICA_DIR/$NAMD_CONF_FILE >$REPLICA_DIR/$NAMD_CONF_FILE.out 2>$REPLICA_DIR/$NAMD_CONF_FILE.err + [[ $EXECUTION_TYPE == "GPU" ]] && unset CUDA_VISIBLE_DEVICES + + echo $TOKEN > $FIFO & + + for FILE in $(ls $REPLICA_DIR/*.*); do + mv $FILE $REPLICA_ID"_"$(basename $FILE) + done + rm -rf $REPLICA_DIR/ + + ) & + PIDS+=($!) +done +wait "${PIDS[@]}" diff --git a/modules/agent-framework/airavata-agent/application/pmemd_cuda.sh b/modules/agent-framework/airavata-agent/application/pmemd_cuda.sh new file mode 100644 index 0000000000..59378da8bb --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/pmemd_cuda.sh @@ -0,0 +1,4 @@ +#!/bin/bash -x +set -euo pipefail + +srun pmemd.cuda "$@" diff --git a/modules/agent-framework/airavata-agent/application/pmemd_mpi.sh b/modules/agent-framework/airavata-agent/application/pmemd_mpi.sh new file mode 100644 index 0000000000..b00bd75162 --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/pmemd_mpi.sh @@ -0,0 +1,4 @@ +#!/bin/bash -x +set -euo pipefail + +srun pmemd.MPI "$@" diff --git a/modules/agent-framework/airavata-agent/application/psi4.sh b/modules/agent-framework/airavata-agent/application/psi4.sh new file mode 100644 index 0000000000..404c713d33 --- /dev/null +++ b/modules/agent-framework/airavata-agent/application/psi4.sh @@ -0,0 +1,4 @@ +#!/bin/bash -x +set -euo pipefail + +psi4 "$@"