-
Notifications
You must be signed in to change notification settings - Fork 24
Fix async streaming hangs on dead connections #212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
e0a587a
abb4ba3
816f53c
5dd6ec2
e628977
6d3d2f7
461116b
af263c3
1550484
238e00f
d7714a8
84ecf1d
7b118b5
9bfd893
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -51,6 +51,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| MAX_API_RETRY = 5 # Maximum number of consecutive API error retries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TASK_RETRY_LIMIT = 3 # Maximum retry attempts for a failed task | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TASK_RETRY_BACKOFF = 10 # Initial backoff in seconds between task retries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| STREAM_IDLE_TIMEOUT = 1800 # Kill a streaming run if no events received for this long (seconds) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _resolve_model_config( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -391,7 +392,22 @@ async def _run_streamed() -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while rate_limit_backoff: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result = agent0.run_streamed(prompt, max_turns=max_turns) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async for event in result.stream_events(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream = result.stream_events() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async_iter = stream.__aiter__() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event = await asyncio.wait_for( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async_iter.__anext__(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeout=STREAM_IDLE_TIMEOUT, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except StopAsyncIteration: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except asyncio.TimeoutError: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logging.error( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "connection likely dead, raising APITimeoutError" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise APITimeoutError("Stream idle timeout exceeded") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream = result.stream_events() | |
| async_iter = stream.__aiter__() | |
| while True: | |
| try: | |
| event = await asyncio.wait_for( | |
| async_iter.__anext__(), | |
| timeout=STREAM_IDLE_TIMEOUT, | |
| ) | |
| except StopAsyncIteration: | |
| break | |
| except asyncio.TimeoutError: | |
| logging.error( | |
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | |
| "connection likely dead, raising APITimeoutError" | |
| ) | |
| raise APITimeoutError("Stream idle timeout exceeded") | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | |
| stream = None | |
| try: | |
| stream = result.stream_events() | |
| async_iter = stream.__aiter__() | |
| while True: | |
| try: | |
| event = await asyncio.wait_for( | |
| async_iter.__anext__(), | |
| timeout=STREAM_IDLE_TIMEOUT, | |
| ) | |
| except StopAsyncIteration: | |
| break | |
| except asyncio.TimeoutError: | |
| logging.error( | |
| f"Stream idle for {STREAM_IDLE_TIMEOUT}s — " | |
| "connection likely dead, raising APITimeoutError" | |
| ) | |
| raise APITimeoutError("Stream idle timeout exceeded") | |
| if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): | |
| await render_model_output(event.data.delta, async_task=async_task, task_id=task_id) | |
| finally: | |
| if stream is not None: | |
| aclose = getattr(stream, "aclose", None) | |
| if aclose is not None: | |
| await aclose() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch — addressed in abb4ba3. Added finally block with aclose() to ensure the async generator is cleaned up on timeout/early exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client read timeout is set to 300s while the runner’s
STREAM_IDLE_TIMEOUTis 1800s. Since these are two independent timeouts affecting streaming behavior, the effective idle-kill behavior can become unclear to maintainers and may not match the intended 30-minute threshold. Consider centralizing/documenting the relationship between these timeouts (or aligning them) so the configured behavior is unambiguous.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are intentionally different — they guard different failure modes at different layers:
httpx.Timeout(read=300s)— TCP-level. Catches dead connections where the socket itself stops delivering bytes (CLOSE_WAIT). This is the first line of defense.STREAM_IDLE_TIMEOUT(1800s)— Application-level. Catches hangs where the connection is technically alive but no events arrive (e.g. the async generator is stuck, or the server stops sending SSE frames while keeping the connection open).The read timeout fires per individual socket read; the idle timeout fires when no complete event has been yielded for 30 minutes. In practice the httpx timeout catches most dead-connection cases and the idle timeout is a backstop for subtler hangs. I'll add a comment in the code clarifying the relationship.