Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/seclab_taskflow_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from agents.run import DEFAULT_MAX_TURNS
from dotenv import find_dotenv, load_dotenv
from openai import AsyncOpenAI
import httpx

from .capi import get_AI_endpoint, get_AI_token, get_provider

Expand Down Expand Up @@ -182,6 +183,7 @@ def __init__(
base_url=resolved_endpoint,
api_key=resolved_token,
default_headers=provider.extra_headers or None,
timeout=httpx.Timeout(connect=10.0, read=300.0, write=300.0, pool=60.0),
)
Comment on lines 182 to 187
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client read timeout is set to 300s while the runner’s STREAM_IDLE_TIMEOUT is 1800s. Since these are two independent timeouts affecting streaming behavior, the effective idle-kill behavior can become unclear to maintainers and may not match the intended 30-minute threshold. Consider centralizing/documenting the relationship between these timeouts (or aligning them) so the configured behavior is unambiguous.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are intentionally different — they guard different failure modes at different layers:

  • httpx.Timeout(read=300s) — TCP-level. Catches dead connections where the socket itself stops delivering bytes (CLOSE_WAIT). This is the first line of defense.
  • STREAM_IDLE_TIMEOUT(1800s) — Application-level. Catches hangs where the connection is technically alive but no events arrive (e.g. the async generator is stuck, or the server stops sending SSE frames while keeping the connection open).

The read timeout fires per individual socket read; the idle timeout fires when no complete event has been yielded for 30 minutes. In practice the httpx timeout catches most dead-connection cases and the idle timeout is a backstop for subtler hangs. I'll add a comment in the code clarifying the relationship.

set_tracing_disabled(True)
self.run_hooks = run_hooks or TaskRunHooks()
Expand All @@ -198,6 +200,7 @@ def _ToolsToFinalOutputFunction(
else:
model_impl = OpenAIChatCompletionsModel(model=model, openai_client=client)

self._openai_client = client
self.agent = Agent(
name=name,
instructions=instructions,
Expand All @@ -209,6 +212,11 @@ def _ToolsToFinalOutputFunction(
hooks=agent_hooks or TaskAgentHooks(),
)

async def close(self) -> None:
"""Close the underlying AsyncOpenAI client and its httpx connection pool."""
if self._openai_client is not None:
await self._openai_client.close()

async def run(self, prompt: str, max_turns: int = DEFAULT_MAX_TURNS) -> result.RunResult:
"""Run the agent to completion and return the result."""
return await Runner.run(starting_agent=self.agent, input=prompt, max_turns=max_turns, hooks=self.run_hooks)
Expand Down
3 changes: 3 additions & 0 deletions src/seclab_taskflow_agent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def main(
),
debug=debug,
)
# Force-exit on success to prevent asyncio event loop spin on
# dangling connections/tasks that survive cleanup.
os._exit(0)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling os._exit(0) on success will bypass normal interpreter shutdown (no flushing stdio buffers, no finally/atexit handlers, no coverage/profiling hooks), which can truncate output and make the CLI unsafe to embed/invoke from tests or other Python code. Prefer fixing the underlying dangling-task issue (which this PR already addresses) and exiting normally (return from main / let asyncio.run finish, or raise typer.Exit(0) / sys.exit(0) if an explicit exit is needed).

Suggested change
# Force-exit on success to prevent asyncio event loop spin on
# dangling connections/tasks that survive cleanup.
os._exit(0)
return

Copilot uses AI. Check for mistakes.
except KeyboardInterrupt:
typer.echo("\nInterrupted.", err=True)
raise typer.Exit(code=130)
Expand Down
46 changes: 43 additions & 3 deletions src/seclab_taskflow_agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
MAX_API_RETRY = 5 # Maximum number of consecutive API error retries
TASK_RETRY_LIMIT = 3 # Maximum retry attempts for a failed task
TASK_RETRY_BACKOFF = 10 # Initial backoff in seconds between task retries
# Application-level backstop: kill a streaming run if no events yielded for 30 min.
# Complements the TCP-level httpx.Timeout(read=300s) in agent.py which catches
# dead sockets; this catches subtler hangs where the connection stays open but
# the server (or async generator) stops producing events.
STREAM_IDLE_TIMEOUT = 1800


def _resolve_model_config(
Expand Down Expand Up @@ -367,6 +372,7 @@ async def deploy_task_agents(
server_prompts=server_prompts,
important_guidelines=important_guidelines,
)
agent0 = None
agent0 = TaskAgent(
name=primary_name,
Comment on lines 421 to 428
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agent0 is referenced in the outer finally block, but it’s only initialized to None here after multiple earlier operations in the try block (e.g., building handoff agents / system prompt). If an exception is raised before this line runs, the finally block will raise UnboundLocalError when checking agent0, preventing cleanup. Initialize agent0 to None at the very start of the outer try (or before it) and then assign the TaskAgent once created; also avoids the current redundant double-assignment.

Copilot uses AI. Check for mistakes.
Comment on lines 421 to 428
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agent0 is initialized to None only after several operations that can raise (e.g., building prompts / handoff agent setup). If an exception occurs before line 416 executes, the finally block later will reference an unbound local agent0 when attempting to close it, raising UnboundLocalError and masking the original failure. Define agent0: TaskAgent | None = None before the outer try: (or otherwise ensure it is always bound) before entering code that may throw.

This issue also appears in the following locations of the same file:

  • line 505
  • line 824
  • line 824

Copilot uses AI. Check for mistakes.
instructions=prompt_with_handoff_instructions(system_prompt) if handoffs else system_prompt,
Expand All @@ -391,9 +397,31 @@ async def _run_streamed() -> None:
while rate_limit_backoff:
try:
result = agent0.run_streamed(prompt, max_turns=max_turns)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
stream = None
try:
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
finally:
if stream is not None:
aclose = getattr(stream, "aclose", None)
if aclose is not None:
await aclose()
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finally cleanup calls await aclose() directly. Any exception raised during aclose() will override the original streaming error/timeout and can break the retry path. Consider wrapping the aclose() call in a try/except (logging on failure) so cleanup failures don’t mask the root cause.

Suggested change
await aclose()
try:
await aclose()
except Exception:
logging.exception("Failed to close streamed response")

Copilot uses AI. Check for mistakes.
await render_model_output("\n\n", async_task=async_task, task_id=task_id)
return
except APITimeoutError:
Expand Down Expand Up @@ -433,6 +461,10 @@ async def _run_streamed() -> None:
return complete

finally:
# Close the AsyncOpenAI client to release httpx connection pool.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
await agent0.close()
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This finally only closes the primary TaskAgent’s AsyncOpenAI client. Handoff agents are currently created via TaskAgent(...).agent, which keeps the underlying AsyncOpenAI client alive but makes it impossible to close() later, so their httpx connection pools can still leak and reintroduce the same CLOSE_WAIT/kqueue-spin behavior. Consider retaining the handoff TaskAgent wrappers (or their clients) and closing all of them here; also wrap agent0.close() in a try/except so a close failure doesn’t mask an earlier exception or prevent MCP cleanup.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the primary TaskAgent’s AsyncOpenAI client is being closed here. Any additional handoff agents created earlier via TaskAgent(...).agent will still retain their own AsyncOpenAI/httpx pools (the wrapper instance is discarded, but the Agent keeps a reference to the client through its model). To fully prevent lingering CLOSE_WAIT sockets / event-loop spin, track and close all TaskAgent-owned clients (or share a single client across agents) in this finally block.

Suggested change
# Close the AsyncOpenAI client to release httpx connection pool.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
await agent0.close()
# Close all AsyncOpenAI clients reachable from the task agent graph to
# release every httpx connection pool created for handoff agents too.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
root_agent = getattr(agent0, "agent", None)
agents_to_visit: list[Agent[Any]] = []
if root_agent is not None:
agents_to_visit.append(root_agent)
seen_agent_ids: set[int] = set()
extra_clients: list[Any] = []
seen_client_ids: set[int] = set()
primary_client = getattr(getattr(root_agent, "model", None), "client", None)
if primary_client is not None:
seen_client_ids.add(id(primary_client))
while agents_to_visit:
current_agent = agents_to_visit.pop()
current_agent_id = id(current_agent)
if current_agent_id in seen_agent_ids:
continue
seen_agent_ids.add(current_agent_id)
current_client = getattr(getattr(current_agent, "model", None), "client", None)
if current_client is not None and id(current_client) not in seen_client_ids:
seen_client_ids.add(id(current_client))
extra_clients.append(current_client)
for handoff in getattr(current_agent, "handoffs", ()) or ():
handoff_agent = getattr(handoff, "agent", handoff)
if handoff_agent is not None:
agents_to_visit.append(handoff_agent)
await agent0.close()
for client in extra_clients:
close_client = getattr(client, "close", None)
if close_client is None:
continue
try:
await close_client()
except Exception:
logging.exception("Exception while closing handoff agent client")

Copilot uses AI. Check for mistakes.
start_cleanup.set()
cleanup_attempts_left = len(entries)
while cleanup_attempts_left and entries:
Expand All @@ -443,6 +475,14 @@ async def _run_streamed() -> None:
continue
except Exception:
logging.exception("Exception in mcp server cleanup task")
# Cancel the MCP session task if it's still running to prevent
# the asyncio event loop from spinning on a dangling task.
if not mcp_sessions.done():
mcp_sessions.cancel()
try:
await mcp_sessions
except (asyncio.CancelledError, Exception):
pass
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After mcp_sessions.cancel(), the code does await mcp_sessions with no timeout. This can hang indefinitely if the session task suppresses cancellation during cleanup (it catches CancelledError inside its cleanup loop). Use a bounded wait (e.g., asyncio.wait_for(..., timeout=MCP_CLEANUP_TIMEOUT)) and log if cancellation doesn’t complete, so shutdown can’t re-hang in the cleanup path.

Suggested change
await mcp_sessions
except (asyncio.CancelledError, Exception):
pass
await asyncio.wait_for(mcp_sessions, timeout=MCP_CLEANUP_TIMEOUT)
except asyncio.TimeoutError:
logging.warning(
"Timed out waiting for MCP session task cancellation after %s seconds",
MCP_CLEANUP_TIMEOUT,
)
except asyncio.CancelledError:
pass
except Exception:
logging.exception("Exception while waiting for mcp session task cancellation")

Copilot uses AI. Check for mistakes.


async def run_main(
Expand Down
Loading