Skip to content

Commit b2c933d

Browse files
Copilotcgillum
andcommitted
Implement ClientSession reuse with proper lifecycle management
Co-authored-by: cgillum <2704139+cgillum@users.noreply.github.com>
1 parent 9c1fa21 commit b2c933d

File tree

2 files changed

+375
-9
lines changed

2 files changed

+375
-9
lines changed

azure/durable_functions/models/utils/http_utils.py

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,65 @@
1-
from typing import Any, List, Union
1+
from typing import Any, List, Union, Optional
2+
import asyncio
23

34
import aiohttp
45

56

7+
# Global session and lock for thread-safe initialization
8+
_client_session: Optional[aiohttp.ClientSession] = None
9+
_session_lock: asyncio.Lock = asyncio.Lock()
10+
11+
12+
async def _get_session() -> aiohttp.ClientSession:
13+
"""Get or create the shared ClientSession.
14+
15+
Returns
16+
-------
17+
aiohttp.ClientSession
18+
The shared client session with configured timeout and connection pooling.
19+
"""
20+
global _client_session
21+
22+
# Double-check locking pattern for async
23+
if _client_session is None or _client_session.closed:
24+
async with _session_lock:
25+
# Check again after acquiring lock
26+
if _client_session is None or _client_session.closed:
27+
# Configure timeout and connection pooling
28+
timeout = aiohttp.ClientTimeout(
29+
total=60, # Total timeout for entire request
30+
connect=30, # Timeout for connection establishment
31+
sock_connect=30, # Socket connection timeout
32+
sock_read=30 # Socket read timeout
33+
)
34+
35+
# Configure TCP connector with connection pooling
36+
connector = aiohttp.TCPConnector(
37+
limit=100, # Maximum number of connections
38+
limit_per_host=30, # Maximum connections per host
39+
ttl_dns_cache=300, # DNS cache TTL in seconds
40+
enable_cleanup_closed=True # Enable cleanup of closed connections
41+
)
42+
43+
_client_session = aiohttp.ClientSession(
44+
timeout=timeout,
45+
connector=connector
46+
)
47+
48+
return _client_session
49+
50+
51+
async def _close_session() -> None:
52+
"""Close the shared ClientSession if it exists.
53+
54+
This function should be called during worker shutdown.
55+
"""
56+
global _client_session
57+
58+
if _client_session is not None and not _client_session.closed:
59+
await _client_session.close()
60+
_client_session = None
61+
62+
663
async def post_async_request(url: str,
764
data: Any = None,
865
trace_parent: str = None,
@@ -25,19 +82,29 @@ async def post_async_request(url: str,
2582
[int, Any]
2683
Tuple with the Response status code and the data returned from the request
2784
"""
28-
async with aiohttp.ClientSession() as session:
29-
headers = {}
30-
if trace_parent:
31-
headers["traceparent"] = trace_parent
32-
if trace_state:
33-
headers["tracestate"] = trace_state
85+
session = await _get_session()
86+
headers = {}
87+
if trace_parent:
88+
headers["traceparent"] = trace_parent
89+
if trace_state:
90+
headers["tracestate"] = trace_state
91+
92+
try:
3493
async with session.post(url, json=data, headers=headers) as response:
3594
# We disable aiohttp's input type validation
3695
# as the server may respond with alternative
3796
# data encodings. This is potentially unsafe.
3897
# More here: https://docs.aiohttp.org/en/stable/client_advanced.html
3998
data = await response.json(content_type=None)
4099
return [response.status, data]
100+
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
101+
# On connection errors, close and recreate session for next request
102+
# This handles cases where the remote host process recycles
103+
global _client_session
104+
if _client_session is not None and not _client_session.closed:
105+
await _client_session.close()
106+
_client_session = None
107+
raise
41108

42109

43110
async def get_async_request(url: str) -> List[Any]:
@@ -53,12 +120,22 @@ async def get_async_request(url: str) -> List[Any]:
53120
[int, Any]
54121
Tuple with the Response status code and the data returned from the request
55122
"""
56-
async with aiohttp.ClientSession() as session:
123+
session = await _get_session()
124+
125+
try:
57126
async with session.get(url) as response:
58127
data = await response.json(content_type=None)
59128
if data is None:
60129
data = ""
61130
return [response.status, data]
131+
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
132+
# On connection errors, close and recreate session for next request
133+
# This handles cases where the remote host process recycles
134+
global _client_session
135+
if _client_session is not None and not _client_session.closed:
136+
await _client_session.close()
137+
_client_session = None
138+
raise
62139

63140

64141
async def delete_async_request(url: str) -> List[Union[int, Any]]:
@@ -74,7 +151,17 @@ async def delete_async_request(url: str) -> List[Union[int, Any]]:
74151
[int, Any]
75152
Tuple with the Response status code and the data returned from the request
76153
"""
77-
async with aiohttp.ClientSession() as session:
154+
session = await _get_session()
155+
156+
try:
78157
async with session.delete(url) as response:
79158
data = await response.json(content_type=None)
80159
return [response.status, data]
160+
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
161+
# On connection errors, close and recreate session for next request
162+
# This handles cases where the remote host process recycles
163+
global _client_session
164+
if _client_session is not None and not _client_session.closed:
165+
await _client_session.close()
166+
_client_session = None
167+
raise

0 commit comments

Comments
 (0)