Skip to content

Commit 088ba3b

Browse files
committed
some more fixes
1 parent ffee571 commit 088ba3b

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

pyatlan/client/aio/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ class AsyncAtlanClient(AtlanClient):
132132
_async_user_cache: Optional[AsyncUserCache] = PrivateAttr(default=None)
133133

134134
def __init__(self, **kwargs):
135+
# Initialize sync client (handles all validation, env vars, etc.)
135136
super().__init__(**kwargs)
136137

137138
if self.oauth_client_id and self.oauth_client_secret:
@@ -143,8 +144,10 @@ def __init__(self, **kwargs):
143144
client_secret=self.oauth_client_secret,
144145
)
145146

147+
# Build proxy/SSL configuration (reuse from sync client)
146148
transport_kwargs = self._build_transport_proxy_config(kwargs)
147149

150+
# Create async session with custom transport that supports retry and proxy
148151
self._async_session = httpx.AsyncClient(
149152
transport=PyatlanAsyncTransport(retry=self.retry, **transport_kwargs),
150153
headers={
@@ -441,6 +444,9 @@ def _api_logger(self, api, path):
441444
async def _create_params(
442445
self, api, query_params, request_obj, exclude_unset: bool = True
443446
):
447+
"""
448+
Async version of _create_params that uses AsyncAtlanRequest for AtlanObject instances.
449+
"""
444450
params = copy.deepcopy(self._request_params)
445451
if self._async_oauth_token_manager:
446452
token = await self._async_oauth_token_manager.get_token()
@@ -451,6 +457,7 @@ async def _create_params(
451457
params["params"] = query_params
452458
if request_obj is not None:
453459
if isinstance(request_obj, AtlanObject):
460+
# Use AsyncAtlanRequest for async retranslation
454461
async_request = AsyncAtlanRequest(instance=request_obj, client=self)
455462
params["data"] = await async_request.json()
456463
elif api.consumes == APPLICATION_ENCODED_FORM:
@@ -691,12 +698,17 @@ async def _handle_error_response(
691698
"\n".join(error_cause_details) if error_cause_details else ""
692699
)
693700

701+
# Retry with impersonation (if _user_id is present) on authentication failure
694702
if (
695703
(self._user_id or self._async_oauth_token_manager)
696704
and not self._401_has_retried.get()
697705
and response.status_code
698706
== ErrorCode.AUTHENTICATION_PASSTHROUGH.http_error_code
699707
):
708+
"""
709+
Async version of token refresh and retry logic.
710+
Handles token refresh and retries the API request upon a 401 Unauthorized response.
711+
"""
700712
try:
701713
LOGGER.debug("Starting async 401 automatic token refresh.")
702714
return await self._handle_401_token_refresh(
@@ -763,6 +775,7 @@ async def _handle_401_token_refresh(
763775
)
764776

765777
try:
778+
# Use sync impersonation call since it's a quick API call
766779
new_token = await self.impersonate.user(user_id=self._user_id)
767780
except Exception as e:
768781
LOGGER.debug(
@@ -778,9 +791,11 @@ async def _handle_401_token_refresh(
778791
self._request_params["headers"]["authorization"] = f"Bearer {self.api_key}"
779792
LOGGER.debug("Successfully completed async 401 automatic token refresh.")
780793

794+
# Async retry loop to ensure token is active before retrying original request
781795
retry_count = 1
782796
while retry_count <= self.retry.total:
783797
try:
798+
# Use async typedef call to validate token
784799
response = await self.typedef.get(
785800
type_category=[AtlanTypeCategory.STRUCT]
786801
)
@@ -791,9 +806,10 @@ async def _handle_401_token_refresh(
791806
"Retrying async to get typedefs (to ensure token is active) after token refresh failed: %s",
792807
e,
793808
)
794-
await asyncio.sleep(retry_count)
809+
await asyncio.sleep(retry_count) # Linear backoff with async sleep
795810
retry_count += 1
796811

812+
# Retry the API call with the new token
797813
return await self._call_api_internal(
798814
api,
799815
path,

pyatlan/client/atlan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ def __init__(self, **data):
194194

195195
# Build proxy/SSL configuration with environment variable fallback
196196
transport_kwargs = self._build_transport_proxy_config(data)
197-
198197
# Configure httpx client with custom transport that supports retry and proxy
199198
# Note: We pass proxy/SSL config to the transport, not the client,
200199
# so that retry logic properly respects these settings
@@ -861,6 +860,7 @@ def _handle_401_token_refresh(
861860
1. Impersonates the user (if a user ID is available) to fetch a new token.
862861
2. Updates the authorization header with the refreshed token.
863862
3. Retries the API request with the new token.
863+
864864
returns: HTTP response received after retrying the request with the refreshed token
865865
"""
866866
if self._oauth_token_manager:

0 commit comments

Comments
 (0)