Skip to content

Commit c4e498d

Browse files
authored
Merge pull request #100 from NERSC/globus
2 parents cb86180 + 58dce6d commit c4e498d

File tree

19 files changed

+618
-118
lines changed

19 files changed

+618
-118
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ authors = [
1414
]
1515
description = "Python client for NERSC SF API"
1616
readme = "README.md"
17-
requires-python = ">=3.7"
17+
requires-python = ">=3.8"
1818
dependencies = [
1919
"authlib",
2020
"httpx",

scripts/run.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def run_unasync():
3434
"AsyncJobSacct": "JobSacct",
3535
"AsyncJobSqueue": "JobSqueue",
3636
"AsyncOAuth2Client": "OAuth2Client",
37+
"AsyncStorage": "Storage",
38+
"AsyncGlobusStorage": "GlobusStorage",
39+
"AsyncGlobusTransfer": "GlobusTransfer",
3740
"aclose": "close",
3841
"_ASYNC_SLEEP": "_SLEEP",
3942
}
@@ -100,7 +103,7 @@ def _to_str_enum(code: str) -> str:
100103
def _fix_date_import(code: str) -> str:
101104
replacements = {
102105
"import date": "import date as date_",
103-
"date: Optional\[date\]": "date: Optional[date_]",
106+
"date: Optional\\[date\\]": "date: Optional[date_]",
104107
}
105108

106109
for target, replacement in replacements.items():
@@ -193,7 +196,7 @@ def resources_codegen(
193196
name = name if name != "int" else f"_{name}"
194197
name = name.replace("-", "_")
195198
names.append(f' {name} = "{value}"')
196-
names.append(f" \"\"\" {s['system_type']}: {s['full_name']}\"\"\"")
199+
names.append(f' """ {s["system_type"]}: {s["full_name"]}"""')
197200

198201
resources = "\n".join(names)
199202
now = datetime.datetime.now()

src/sfapi_client/_async/client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from .._models.resources import Resource
2222
from .groups import AsyncGroup, AsyncGroupMember
2323
from .users import AsyncUser
24+
from .storage import AsyncGlobusStorage, AsyncStorage
2425
from .projects import AsyncProject, AsyncRole
2526
from .paths import AsyncRemotePath
2627

@@ -252,6 +253,7 @@ def __init__(
252253
self.__http_client = None
253254
self._api = None
254255
self._resources = None
256+
self._storage = None
255257
self._wait_interval = wait_interval
256258
self._access_token = access_token
257259

@@ -504,15 +506,27 @@ def resources(self) -> AsyncResources:
504506

505507
return self._resources
506508

509+
@property
510+
def storage(self) -> AsyncStorage:
511+
"""Storage related objects and methods
512+
513+
:return AsyncStorage: Object with storage related objects and methods
514+
"""
515+
if self._storage is None:
516+
self._storage = AsyncStorage(self)
517+
return self._storage
518+
507519

508520
AsyncCompute.model_rebuild()
521+
AsyncGlobusStorage.model_rebuild()
509522
AsyncGroup.model_rebuild()
510523
AsyncUser.model_rebuild()
511524
AsyncProject.model_rebuild()
512525
AsyncRemotePath.model_rebuild()
513526
AsyncRole.model_rebuild()
514527
AsyncGroupMember.model_rebuild()
515528

529+
516530
# Ensure that the job models are built, we need to import here to
517531
# avoid circular imports
518532
from .jobs import AsyncJobSacct, AsyncJobSqueue # noqa: E402

src/sfapi_client/_async/compute.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, List, Optional, Union, Callable
1+
from typing import Dict, List, Optional, Union
22
import json
33
from pydantic import PrivateAttr, ConfigDict
44
from ..exceptions import SfApiError
@@ -7,7 +7,6 @@
77
from .._models import (
88
AppRoutersStatusModelsStatus as ComputeBase,
99
Task,
10-
StatusValue,
1110
PublicHost as Machine,
1211
BodyRunCommandUtilitiesCommandMachinePost as RunCommandBody,
1312
AppRoutersComputeModelsCommandOutput as RunCommandResponse,
@@ -16,26 +15,12 @@
1615
from .paths import AsyncRemotePath
1716
from .._monitor import AsyncJobMonitor
1817
from .._compute import CommandResult, SubmitJobResponse, SubmitJobResponseStatus
18+
from .._utils import check_auth
1919

2020
# Patch to return str names from Enum of py3.11
2121
Machine.__str__ = lambda self: self.value
2222

2323

24-
def check_auth(method: Callable):
25-
def wrapper(self, *args, **kwargs):
26-
if self.client._client_id is None and self.client._access_token is None:
27-
raise SfApiError(
28-
f"Cannot call {self.__class__.__name__}.{method.__name__}() with an unauthenticated client." # noqa: E501
29-
)
30-
elif self.status in [StatusValue.unavailable]:
31-
raise SfApiError(
32-
f"Compute resource {self.name} is {self.status.name}, {self.notes}"
33-
)
34-
return method(self, *args, **kwargs)
35-
36-
return wrapper
37-
38-
3924
class AsyncCompute(ComputeBase):
4025
client: Optional["AsyncClient"] # noqa: F821
4126
_monitor: AsyncJobMonitor = PrivateAttr()

src/sfapi_client/_async/groups.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,9 @@
1-
from typing import Optional, Union, List, Any, Callable
2-
from functools import wraps
1+
from typing import Optional, Union, List, Any
32
from pydantic import ValidationError, Field, BaseModel, ConfigDict
43
from .._models import BatchGroupAction as GroupAction, UserStats as GroupMemberBase
54
from ..exceptions import SfApiError
65
from .users import AsyncUser
7-
8-
9-
def check_auth(method: Callable):
10-
@wraps(method)
11-
def wrapper(self, *args, **kwargs):
12-
if self._client_id is None:
13-
raise SfApiError(
14-
f"Cannot call {self.__class__.__name__}.{method.__name__}() with an unauthenticated client." # noqa: E501
15-
)
16-
return method(self, *args, **kwargs)
17-
18-
return wrapper
6+
from .._utils import check_auth
197

208

219
class AsyncGroupMember(GroupMemberBase):

src/sfapi_client/_async/storage.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
from pathlib import Path
2+
from typing import Optional, Union, List, Any
3+
from pydantic import ConfigDict
4+
import sys
5+
import math
6+
7+
from .._utils import _ASYNC_SLEEP
8+
from ..paths import AsyncRemotePath
9+
from .._utils import check_auth
10+
11+
from .compute import Machine
12+
from .._models import (
13+
AppRoutersStatusModelsStatus as StorageBase,
14+
GlobusTransfer as GlobusTransferModel,
15+
BodyStartGlobusTransferStorageGlobusTransferPost as GlobusBodyPost,
16+
GlobusTransferResult,
17+
GlobusStatus,
18+
)
19+
20+
GLOBUS_TERMINAL_STATES = [
21+
GlobusStatus.CANCELED,
22+
GlobusStatus.FAILED,
23+
GlobusStatus.SUCCEEDED,
24+
]
25+
26+
27+
class AsyncStorage:
28+
def __init__(self, client: "AsyncClient"): # noqa: F821
29+
self.client = client
30+
31+
async def globus(
32+
self,
33+
):
34+
"""Create a globus transfer object to start and monitor transfers
35+
36+
- Must select the Globus option when creating the SuperFacility key
37+
38+
```python
39+
>>> from sfapi_client import AsyncClient
40+
>>> async with AsyncClient(client_id, client_secret) as client:
41+
>>> globus = client.storage.globus()
42+
```
43+
44+
:return AsyncGlobusStorage: Globus object to start and monitor transfers
45+
"""
46+
response = await self.client.get("status/globus")
47+
values = response.json()
48+
values["client"] = self.client
49+
_globus = AsyncGlobusStorage.model_validate(values)
50+
51+
return _globus
52+
53+
54+
class AsyncGlobusTransfer(GlobusTransferResult):
55+
globus: Optional["AsyncGlobusStorage"] # noqa: F821
56+
transfer_id: str
57+
model_config = ConfigDict(arbitrary_types_allowed=True)
58+
59+
async def update(self):
60+
"""Updates the status of the transfer"""
61+
job_state = await self._fetch_state()
62+
self._update(job_state)
63+
64+
def _update(self, new_job_state: Any):
65+
for k in new_job_state.model_fields_set:
66+
v = getattr(new_job_state, k)
67+
setattr(self, k, v)
68+
69+
return self
70+
71+
async def _wait_until(self, states: List[GlobusStatus], timeout: int = sys.maxsize):
72+
max_iteration = math.ceil(timeout / self.globus.client._wait_interval)
73+
iteration = 0
74+
75+
while self.globus_status not in states:
76+
await self.update()
77+
await _ASYNC_SLEEP(self.globus.client._wait_interval)
78+
79+
if iteration == max_iteration:
80+
raise TimeoutError()
81+
82+
iteration += 1
83+
84+
return self.globus_status
85+
86+
async def _wait_until_complete(self, timeout: int = sys.maxsize):
87+
return await self._wait_until(GLOBUS_TERMINAL_STATES, timeout)
88+
89+
def __await__(self):
90+
return self._wait_until_complete().__await__()
91+
92+
async def complete(self, timeout: int = sys.maxsize):
93+
"""Wait for the transfer to complete
94+
95+
>>> from sfapi_client import AsyncClient
96+
>>> async with AsyncClient(client_id, client_secret) as client:
97+
>>> globus = client.storage.globus()
98+
>>> res = await globus.transfer(
99+
"globus-transfer-uuid"
100+
)
101+
>>> await res.complete()
102+
103+
:param int timeout: time to wait for the transfer to complete, defaults to sys.maxsize
104+
:return GlobusStart: Gives the file status for the transfer
105+
"""
106+
return await self._wait_until_complete(timeout)
107+
108+
async def _fetch_state(self):
109+
r = await self.globus.client.get(f"storage/globus/transfer/{self.transfer_id}")
110+
json_response = r.json()
111+
json_response["transfer_id"] = self.transfer_id
112+
json_response["globus"] = self.globus
113+
transfer = AsyncGlobusTransfer.model_validate(json_response)
114+
return transfer
115+
116+
117+
class AsyncGlobusStorage(StorageBase):
118+
client: Optional["AsyncClient"] # noqa: F821
119+
model_config = ConfigDict(arbitrary_types_allowed=True)
120+
121+
def __init__(self, **kwargs):
122+
super().__init__(**kwargs)
123+
124+
@check_auth
125+
async def start_transfer(
126+
self,
127+
source_machine: Union[Machine, str],
128+
target_machine: Union[Machine, str],
129+
source_dir: Union[str, Path, AsyncRemotePath],
130+
target_dir: Union[str, Path, AsyncRemotePath],
131+
label: Optional[str] = None,
132+
) -> AsyncGlobusTransfer:
133+
"""Start a Globus transfer throught the SuperFacility API
134+
135+
- Must select the Globus option when creating the SuperFacility key
136+
137+
```python
138+
>>> from sfapi_client import AsyncClient
139+
>>> async with AsyncClient(client_id, client_secret) as client:
140+
>>> globus_client = client.storage.globus()
141+
>>> res = await globus_client.start_transfer(
142+
Machine.Perlmutter,
143+
"/pscratch/sd/u/user/globus",
144+
Machine.dtns,
145+
"/global/cfs/cdirs/m0000/globus"
146+
)
147+
```
148+
149+
:param str source_dir: Path to file or directory on the source to transfer
150+
:param str target_dir: Path to directory on the target to transfer files to
151+
:param Optional[str] label: Label for the transfer,
152+
defaults to None and the API will create a label for the transfer
153+
:return AsyncGlobusTransfer
154+
"""
155+
156+
if None in [source_machine, source_dir, target_machine, target_dir]:
157+
# Check that all parametes are not none
158+
raise ValueError("sources, and targets cannot be None")
159+
160+
# Make machine names match those in the API endpoint
161+
source_name = (
162+
"dtn" if source_machine in [Machine.dtns, Machine.dtn01] else source_machine
163+
)
164+
target_name = (
165+
"dtn" if target_machine in [Machine.dtns, Machine.dtn01] else target_machine
166+
)
167+
168+
body = GlobusBodyPost(
169+
source_uuid=source_name,
170+
target_uuid=target_name,
171+
source_dir=str(source_dir),
172+
target_dir=str(target_dir),
173+
label=label,
174+
)
175+
176+
r = await self.client.post("storage/globus/transfer", data=body.model_dump())
177+
new_transfer = GlobusTransferModel.model_validate(r.json())
178+
transfer_id = new_transfer.transfer_id
179+
r = await self.client.get(f"storage/globus/transfer/{transfer_id}")
180+
json_response = r.json()
181+
json_response["transfer_id"] = transfer_id
182+
json_response["globus"] = self
183+
transfer = AsyncGlobusTransfer.model_validate(json_response)
184+
return transfer
185+
186+
@check_auth
187+
async def transfer(self, transfer_id: str) -> GlobusTransferResult:
188+
"""Check on Globus transfer status
189+
190+
- Must select the Globus option when creating the SuperFacility key
191+
192+
>>> from sfapi_client import AsyncClient
193+
>>> async with AsyncClient(client_id, client_secret) as client:
194+
>>> globus = client.storage.globus()
195+
>>> res = await globus.transfer(
196+
"globus-transfer-uuid"
197+
)
198+
199+
:param str transfer_uuid: Globus UUID for the transfer
200+
:return GlobusTransferResult
201+
"""
202+
if transfer_id is None:
203+
raise ValueError("Must provide a transfer_uuid")
204+
205+
r = await self.client.get(f"storage/globus/transfer/{transfer_id}")
206+
json_response = r.json()
207+
json_response["transfer_id"] = transfer_id
208+
json_response["globus"] = self
209+
transfer = AsyncGlobusTransfer.model_validate(json_response)
210+
return transfer

src/sfapi_client/_async/users.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
from typing import List, Optional, Callable
2-
from functools import wraps
1+
from typing import List, Optional
32

43
from pydantic import ConfigDict
54

@@ -10,18 +9,7 @@
109
)
1110
from .projects import AsyncProject, AsyncRole
1211
from ..exceptions import SfApiError
13-
14-
15-
def check_auth(method: Callable):
16-
@wraps(method)
17-
def wrapper(self, *args, **kwargs):
18-
if self._client_id is None:
19-
raise SfApiError(
20-
f"Cannot call {self.__class__.__name__}.{method.__name__}() with an unauthenticated client." # noqa: E501
21-
)
22-
return method(self, *args, **kwargs)
23-
24-
return wrapper
12+
from .._utils import check_auth
2513

2614

2715
class AsyncUser(UserBase):

0 commit comments

Comments
 (0)