Skip to content

Commit 7fa6674

Browse files
committed
Update _structured.py
1 parent 149320e commit 7fa6674

File tree

1 file changed

+15
-30
lines changed

1 file changed

+15
-30
lines changed

src/l0/_structured.py

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -265,23 +265,15 @@ async def buffering_iterator() -> RawStream:
265265
for attempt in range(max_attempts):
266266
try:
267267
# _internal_run expects a callable factory
268-
# For factory functions, we call them fresh on each attempt to get a new stream
269-
# This is critical for retry logic - streams can only be consumed once
268+
# For factory functions, pass them directly so _internal_run can call fresh on retries
269+
# For direct async iterators (already wrapped in buffering factory above),
270+
# wrap in a lambda - the buffering factory handles replay
270271
if callable(stream_source) and not hasattr(stream_source, "__anext__"):
271-
# It's a factory - call it to get a fresh stream for this attempt
272-
stream_for_attempt = stream_source()
272+
# It's a factory - pass it directly to _internal_run
273+
stream_factory = cast(AwaitableStreamFactory, stream_source)
273274
else:
274-
# It's a direct async iterator (already wrapped in buffering factory above)
275-
# The buffering factory handles replay for us
276-
stream_for_attempt = cast(RawStream, stream_source)
277-
278-
# Wrap in a factory for _internal_run
279-
def make_stream_factory(
280-
s: RawStream = stream_for_attempt,
281-
) -> AwaitableStreamFactory:
282-
return lambda: s
283-
284-
stream_factory = make_stream_factory()
275+
# It's a direct async iterator (wrapped in buffering factory)
276+
stream_factory = lambda src=stream_source: cast(RawStream, src)
285277

286278
# Run through L0 runtime
287279
result = await _internal_run(
@@ -895,24 +887,17 @@ async def buffering_iterator() -> RawStream:
895887
for attempt in range(max_attempts):
896888
try:
897889
# _internal_run expects a callable factory
898-
# For factory functions, we call them fresh on each attempt to get a new stream
899-
# This is critical for retry logic - streams can only be consumed once
890+
# For factory functions, pass them directly so _internal_run can call fresh on retries
891+
# For direct async iterators (already wrapped in buffering factory above),
892+
# wrap in a lambda - the buffering factory handles replay
900893
if callable(stream_source) and not hasattr(stream_source, "__anext__"):
901-
# It's a factory - call it to get a fresh stream for this attempt
902-
stream_for_attempt = stream_source()
894+
# It's a factory - pass it directly to _internal_run
895+
stream_factory = cast(AwaitableStreamFactory, stream_source)
903896
else:
904-
# It's a direct async iterator (already wrapped in buffering factory above)
905-
# The buffering factory handles replay for us
906-
stream_for_attempt = cast(RawStream, stream_source)
907-
908-
# Wrap in a factory for _internal_run
909-
def make_stream_factory(
910-
s: RawStream = stream_for_attempt,
911-
) -> AwaitableStreamFactory:
912-
return lambda: s
913-
914-
stream_factory = make_stream_factory()
897+
# It's a direct async iterator (wrapped in buffering factory)
898+
stream_factory = lambda src=stream_source: cast(RawStream, src)
915899

900+
# Run through L0 runtime
916901
result = await _internal_run(
917902
stream=stream_factory,
918903
on_event=on_event,

0 commit comments

Comments
 (0)