feat: add turn admission hook#214
Conversation
6b1602b to
8deff5c
Compare
frostming
left a comment
There was a problem hiding this comment.
Not complete reviewed yet. I think we should first resolve the fundamental design choice.
| class SteeringHandle: | ||
| """Control surface exposed to model hooks through turn state.""" | ||
|
|
||
| session_id: str | ||
| buffer: SteeringBuffer |
There was a problem hiding this comment.
Looks like too many abstraction layers, consider uniting SteeringHandle and SteeringBuffer
There was a problem hiding this comment.
The split between SteeringHandle and SteeringBuffer is probably premature for this PR.
This PR currently treats steering as session-scoped, but there is an unresolved design question around whether steering should eventually target a session, a specific active turn, or a forked tape/turn branch. If that becomes first-class later, a public handle over a different internal routing/storage model might make sense.
For now, since the PR only implements session-scoped steering, I agree we can collapse this into one simpler type and avoid the extra abstraction.
| async def _resolve_message_session(self, message: ChannelMessage) -> str: | ||
| session_id = await self.framework.resolve_session(message) | ||
| message.session_id = session_id | ||
| setattr(message, "_runtime_session_id", session_id) # noqa: B010 |
There was a problem hiding this comment.
What is this property used for and what is different between it and message.session_id?
| STEER = "steer" | ||
|
|
||
|
|
||
| TurnAdmissionAction = AdmitAction | Literal["process", "drop", "wait", "steer"] |
There was a problem hiding this comment.
Let's drop the enum, and only use literals
| session_id: str | ||
| steering: SteeringHandle | ||
| active_tasks: set[asyncio.Task] = field(default_factory=set) | ||
| pending_queue: deque[Envelope] = field(default_factory=deque) |
There was a problem hiding this comment.
I hope the structure of pending and steering is symmetrical, which can reduce the cognitive burden.
Motivation
ChannelManagercurrently schedules each inbound channel message as a new turn immediately. The default concurrent behavior is simple, but plugins do not have a hook to decide whether the next message for an already-active session should be processed now, queued, dropped, or offered to the running turn as steering input.This PR adds an optional scheduling decision point while keeping the default behavior unchanged.
Design
Add a new first-result hook:
Nonemeans no decision; if every implementation returnsNone, Bub keeps the current default concurrent scheduling behavior.AdmitDecisionsupports four actions:PROCESS: schedule immediatelyDROP: discard explicitlyWAIT: process after the active turn finishesSTEER: enqueue as per-session steering input for model hooks to consume viastate["_runtime_steering"]Undrained steering messages are promoted to the front of the pending queue when the active turn finishes, preserving FIFO order.
Scope
ChannelManagerpath; directprocess_inbound()calls are unaffectedDROPexplicitlyVerification
uv run pytest -quv run ruff check .uv run mypy src