diff --git a/src/acp/connection.py b/src/acp/connection.py index 32f150e..ff1cb19 100644 --- a/src/acp/connection.py +++ b/src/acp/connection.py @@ -86,6 +86,8 @@ def __init__( self._closed = False self._disconnected = False self._sender = (sender_factory or self._default_sender_factory)(self._writer, self._tasks) + self._observers: list[StreamObserver] = list(observers or []) + self._receive_timeout = receive_timeout if listening: self._recv_task = self._tasks.create( self._receive_loop(), @@ -103,8 +105,6 @@ def __init__( self._run_notification, ) self._dispatcher.start() - self._observers: list[StreamObserver] = list(observers or []) - self._receive_timeout = receive_timeout async def close(self) -> None: """Stop the receive loop and cancel any in-flight handler tasks.""" diff --git a/tests/test_rpc.py b/tests/test_rpc.py index ecacaa5..bce2ff8 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -676,3 +676,28 @@ async def test_spawn_agent_process_roundtrip(tmp_path): assert test_client.notifications assert process.returncode is not None + + +@pytest.mark.asyncio +async def test_connection_init_under_eager_task_factory(server): + # Regression: under asyncio.eager_task_factory the receive loop runs synchronously + # up to its first await inside Connection.__init__, so every attribute it reads + # (e.g. _receive_timeout) must be assigned before _tasks.create(_receive_loop()). + loop = asyncio.get_running_loop() + previous_factory = loop.get_task_factory() + loop.set_task_factory(asyncio.eager_task_factory) + try: + conn = Connection( + lambda method, params, is_notification: None, + server.client_writer, + server.client_reader, + receive_timeout=0.5, + ) + finally: + loop.set_task_factory(previous_factory) + + assert conn._receive_timeout == 0.5 + # Let the loop tick once so any deferred receive-task crash would land. + await asyncio.sleep(0) + assert conn._disconnected is False + await conn.close()