From f11303c915b01290fdc08cdd217c71a14c6624c0 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:25:08 +0530 Subject: [PATCH 1/9] feat: add auto-save scaffolding for periodic tool-result logging Adds _tool_call_counter, _auto_save_interval, _auto_save_dir, _write_auto_save, and _read_tool_log to run_main. When AUTO_SAVE_DIR and AUTO_SAVE_INTERVAL are set, tool results are periodically appended to an NDJSON log file. Disabled by default (interval=0). --- src/seclab_taskflow_agent/runner.py | 45 +++++++++++++++ tests/test_runner.py | 88 +++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+) diff --git a/src/seclab_taskflow_agent/runner.py b/src/seclab_taskflow_agent/runner.py index 5869385..470b26d 100644 --- a/src/seclab_taskflow_agent/runner.py +++ b/src/seclab_taskflow_agent/runner.py @@ -20,6 +20,7 @@ ] import asyncio +import contextlib import json import logging import os @@ -467,8 +468,52 @@ async def run_main( last_mcp_tool_results: list[str] = [] + # Auto-save scaffolding: periodically persist tool results to disk. + # Disabled by default (interval=0). Set AUTO_SAVE_DIR and + # AUTO_SAVE_INTERVAL to enable. + _tool_call_counter = [0] + _auto_save_interval = int(os.getenv("AUTO_SAVE_INTERVAL", "0")) + _auto_save_dir = os.getenv("AUTO_SAVE_DIR", "") + + def _write_auto_save(tool_name: str, result: str) -> None: + """Append tool result to auto-save log (NDJSON, append-only, crash-safe).""" + try: + os.makedirs(_auto_save_dir, exist_ok=True) + save_path = os.path.join(_auto_save_dir, "auto_save_tool_log.ndjson") + entry = json.dumps({ + "turn": _tool_call_counter[0], + "tool": tool_name, + "result_preview": (result or "")[:2000], + }) + with open(save_path, "a") as f: + f.write(entry + "\n") + except Exception as e: + logging.warning(f"Auto-save failed: {e}") + + def _read_tool_log() -> list[dict]: + """Read NDJSON auto-save log. Skips malformed lines.""" + if not _auto_save_dir: + return [] + entries: list[dict] = [] + try: + path = os.path.join(_auto_save_dir, "auto_save_tool_log.ndjson") + if os.path.exists(path): + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + with contextlib.suppress(json.JSONDecodeError): + entries.append(json.loads(line)) + except Exception: + logging.debug("Failed to read auto-save tool log", exc_info=True) + return entries + async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool, result: str) -> None: last_mcp_tool_results.append(result) + _tool_call_counter[0] += 1 + if _auto_save_dir and _auto_save_interval and _tool_call_counter[0] % _auto_save_interval == 0: + _write_auto_save(tool.name, result) async def on_tool_start_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool) -> None: await render_model_output(f"\n** 🤖🛠️ Tool Call: {tool.name}\n") diff --git a/tests/test_runner.py b/tests/test_runner.py index a50c0f2..56dc844 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -441,3 +441,91 @@ def test_raises_type_error_on_non_iterable_result(self): inputs={}, ) ) + + +# =================================================================== +# Auto-save scaffolding +# =================================================================== + + +class TestAutoSave: + """Tests for auto-save tool log scaffolding in run_main.""" + + @staticmethod + def _run_hook(monkeypatch, tmp_path, interval, calls): + """Set up auto-save env, import run_main to trigger closure, simulate tool calls.""" + monkeypatch.setenv("AUTO_SAVE_DIR", str(tmp_path)) + monkeypatch.setenv("AUTO_SAVE_INTERVAL", str(interval)) + + # We need to exercise the on_tool_end_hook closure. + # The simplest way is to import run_main, but the hook is a closure + # inside run_main that we can't call directly. Instead, we test + # the _write_auto_save / _read_tool_log helpers indirectly by + # simulating what run_main does. + import importlib + import seclab_taskflow_agent.runner as runner_mod + + importlib.reload(runner_mod) + + # Construct the same state the closure would have. + counter = [0] + auto_dir = str(tmp_path) + auto_interval = interval + + import os + + os.makedirs(auto_dir, exist_ok=True) + save_path = os.path.join(auto_dir, "auto_save_tool_log.ndjson") + + for tool_name, result in calls: + counter[0] += 1 + if auto_dir and auto_interval and counter[0] % auto_interval == 0: + entry = json.dumps({ + "turn": counter[0], + "tool": tool_name, + "result_preview": (result or "")[:2000], + }) + with open(save_path, "a") as f: + f.write(entry + "\n") + return save_path + + def test_disabled_by_default(self, monkeypatch, tmp_path): + """No log file created when interval is 0.""" + import os + + save_path = self._run_hook(monkeypatch, tmp_path, 0, [("tool_a", "res")]) + assert not os.path.exists(save_path) + + def test_writes_every_n_calls_when_enabled(self, monkeypatch, tmp_path): + """Log entries are written every N calls.""" + calls = [(f"tool_{i}", f"result_{i}") for i in range(6)] + save_path = self._run_hook(monkeypatch, tmp_path, 3, calls) + with open(save_path) as f: + lines = [line.strip() for line in f if line.strip()] + assert len(lines) == 2 # calls 3 and 6 + + def test_log_format_has_turn_tool_preview(self, monkeypatch, tmp_path): + """Each NDJSON entry has the expected keys.""" + calls = [("search_code", "found 5 matches")] + save_path = self._run_hook(monkeypatch, tmp_path, 1, calls) + with open(save_path) as f: + entry = json.loads(f.readline()) + assert entry["turn"] == 1 + assert entry["tool"] == "search_code" + assert entry["result_preview"] == "found 5 matches" + + def test_result_truncated_to_2000(self, monkeypatch, tmp_path): + """Result preview is capped at 2000 characters.""" + long_result = "x" * 5000 + calls = [("big_tool", long_result)] + save_path = self._run_hook(monkeypatch, tmp_path, 1, calls) + with open(save_path) as f: + entry = json.loads(f.readline()) + assert len(entry["result_preview"]) == 2000 + + def test_survives_write_failure(self): + """Auto-save to an impossible path does not raise.""" + import os + + save_path = os.path.join("/dev/null/impossible", "auto_save_tool_log.ndjson") + assert not os.path.exists(save_path) From c562128893dc32057d13eb428093d5748ceff9d2 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:36:11 +0530 Subject: [PATCH 2/9] docs: document AUTO_SAVE_DIR and AUTO_SAVE_INTERVAL in README --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 2600fcc..46d50a6 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,12 @@ Failed tasks are automatically retried up to 3 times with increasing backoff before the session is saved. Session checkpoints are stored in the platform-specific application data directory. +### Auto-save tool log + +Set `AUTO_SAVE_DIR` and `AUTO_SAVE_INTERVAL` to enable periodic tool-result +logging. Every N tool calls, the runner appends an NDJSON entry to +`{AUTO_SAVE_DIR}/auto_save_tool_log.ndjson`. Disabled by default (interval=0). + ### Error Output By default, errors are shown as concise one-line messages. Use `--debug` (or From 09ca05b401f6d7a0bf9d5ce480e6743095a7899e Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:46:41 +0530 Subject: [PATCH 3/9] refactor: extract autosave helpers to module-level testable functions Moves write_auto_save() and read_tool_log() from closures inside run_main() to module-level functions with explicit parameters. Tests now exercise the real implementation instead of duplicating the logic. --- src/seclab_taskflow_agent/runner.py | 81 ++++++++++-------- tests/test_runner.py | 123 +++++++++++----------------- 2 files changed, 95 insertions(+), 109 deletions(-) diff --git a/src/seclab_taskflow_agent/runner.py b/src/seclab_taskflow_agent/runner.py index 470b26d..4fb93d0 100644 --- a/src/seclab_taskflow_agent/runner.py +++ b/src/seclab_taskflow_agent/runner.py @@ -16,7 +16,9 @@ "MAX_RATE_LIMIT_BACKOFF", "RATE_LIMIT_BACKOFF", "deploy_task_agents", + "read_tool_log", "run_main", + "write_auto_save", ] import asyncio @@ -53,6 +55,49 @@ TASK_RETRY_LIMIT = 3 # Maximum retry attempts for a failed task TASK_RETRY_BACKOFF = 10 # Initial backoff in seconds between task retries +AUTO_SAVE_LOG_NAME = "auto_save_tool_log.ndjson" + + +def write_auto_save( + auto_save_dir: str, + turn: int, + tool_name: str, + result: str, +) -> None: + """Append tool result to auto-save log (NDJSON, append-only, crash-safe).""" + try: + os.makedirs(auto_save_dir, exist_ok=True) + save_path = os.path.join(auto_save_dir, AUTO_SAVE_LOG_NAME) + entry = json.dumps({ + "turn": turn, + "tool": tool_name, + "result_preview": (result or "")[:2000], + }) + with open(save_path, "a") as f: + f.write(entry + "\n") + except Exception as e: + logging.warning(f"Auto-save failed: {e}") + + +def read_tool_log(auto_save_dir: str) -> list[dict]: + """Read NDJSON auto-save log. Skips malformed lines.""" + if not auto_save_dir: + return [] + entries: list[dict] = [] + try: + path = os.path.join(auto_save_dir, AUTO_SAVE_LOG_NAME) + if os.path.exists(path): + with open(path) as f: + for line in f: + line = line.strip() + if not line: + continue + with contextlib.suppress(json.JSONDecodeError): + entries.append(json.loads(line)) + except Exception: + logging.debug("Failed to read auto-save tool log", exc_info=True) + return entries + def _resolve_model_config( available_tools: AvailableTools, @@ -475,45 +520,11 @@ async def run_main( _auto_save_interval = int(os.getenv("AUTO_SAVE_INTERVAL", "0")) _auto_save_dir = os.getenv("AUTO_SAVE_DIR", "") - def _write_auto_save(tool_name: str, result: str) -> None: - """Append tool result to auto-save log (NDJSON, append-only, crash-safe).""" - try: - os.makedirs(_auto_save_dir, exist_ok=True) - save_path = os.path.join(_auto_save_dir, "auto_save_tool_log.ndjson") - entry = json.dumps({ - "turn": _tool_call_counter[0], - "tool": tool_name, - "result_preview": (result or "")[:2000], - }) - with open(save_path, "a") as f: - f.write(entry + "\n") - except Exception as e: - logging.warning(f"Auto-save failed: {e}") - - def _read_tool_log() -> list[dict]: - """Read NDJSON auto-save log. Skips malformed lines.""" - if not _auto_save_dir: - return [] - entries: list[dict] = [] - try: - path = os.path.join(_auto_save_dir, "auto_save_tool_log.ndjson") - if os.path.exists(path): - with open(path) as f: - for line in f: - line = line.strip() - if not line: - continue - with contextlib.suppress(json.JSONDecodeError): - entries.append(json.loads(line)) - except Exception: - logging.debug("Failed to read auto-save tool log", exc_info=True) - return entries - async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool, result: str) -> None: last_mcp_tool_results.append(result) _tool_call_counter[0] += 1 if _auto_save_dir and _auto_save_interval and _tool_call_counter[0] % _auto_save_interval == 0: - _write_auto_save(tool.name, result) + write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, result) async def on_tool_start_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool) -> None: await render_model_output(f"\n** 🤖🛠️ Tool Call: {tool.name}\n") diff --git a/tests/test_runner.py b/tests/test_runner.py index 56dc844..4f4923f 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -24,6 +24,8 @@ _merge_reusable_task, _resolve_model_config, _resolve_task_model, + read_tool_log, + write_auto_save, ) @@ -449,83 +451,56 @@ def test_raises_type_error_on_non_iterable_result(self): class TestAutoSave: - """Tests for auto-save tool log scaffolding in run_main.""" - - @staticmethod - def _run_hook(monkeypatch, tmp_path, interval, calls): - """Set up auto-save env, import run_main to trigger closure, simulate tool calls.""" - monkeypatch.setenv("AUTO_SAVE_DIR", str(tmp_path)) - monkeypatch.setenv("AUTO_SAVE_INTERVAL", str(interval)) - - # We need to exercise the on_tool_end_hook closure. - # The simplest way is to import run_main, but the hook is a closure - # inside run_main that we can't call directly. Instead, we test - # the _write_auto_save / _read_tool_log helpers indirectly by - # simulating what run_main does. - import importlib - import seclab_taskflow_agent.runner as runner_mod - - importlib.reload(runner_mod) - - # Construct the same state the closure would have. - counter = [0] - auto_dir = str(tmp_path) - auto_interval = interval - - import os - - os.makedirs(auto_dir, exist_ok=True) - save_path = os.path.join(auto_dir, "auto_save_tool_log.ndjson") - - for tool_name, result in calls: - counter[0] += 1 - if auto_dir and auto_interval and counter[0] % auto_interval == 0: - entry = json.dumps({ - "turn": counter[0], - "tool": tool_name, - "result_preview": (result or "")[:2000], - }) - with open(save_path, "a") as f: - f.write(entry + "\n") - return save_path - - def test_disabled_by_default(self, monkeypatch, tmp_path): - """No log file created when interval is 0.""" - import os - - save_path = self._run_hook(monkeypatch, tmp_path, 0, [("tool_a", "res")]) - assert not os.path.exists(save_path) - - def test_writes_every_n_calls_when_enabled(self, monkeypatch, tmp_path): - """Log entries are written every N calls.""" - calls = [(f"tool_{i}", f"result_{i}") for i in range(6)] - save_path = self._run_hook(monkeypatch, tmp_path, 3, calls) - with open(save_path) as f: - lines = [line.strip() for line in f if line.strip()] - assert len(lines) == 2 # calls 3 and 6 - - def test_log_format_has_turn_tool_preview(self, monkeypatch, tmp_path): + """Tests for write_auto_save / read_tool_log (module-level functions).""" + + def test_disabled_when_no_dir(self): + """read_tool_log returns [] when dir is empty string.""" + assert read_tool_log("") == [] + + def test_write_then_read_roundtrip(self, tmp_path): + """write_auto_save produces entries that read_tool_log can read back.""" + d = str(tmp_path) + write_auto_save(d, turn=1, tool_name="search_code", result="found 5") + write_auto_save(d, turn=2, tool_name="read_file", result="contents") + entries = read_tool_log(d) + assert len(entries) == 2 + assert entries[0]["turn"] == 1 + assert entries[0]["tool"] == "search_code" + assert entries[1]["turn"] == 2 + + def test_log_format_has_turn_tool_preview(self, tmp_path): """Each NDJSON entry has the expected keys.""" - calls = [("search_code", "found 5 matches")] - save_path = self._run_hook(monkeypatch, tmp_path, 1, calls) - with open(save_path) as f: - entry = json.loads(f.readline()) - assert entry["turn"] == 1 - assert entry["tool"] == "search_code" - assert entry["result_preview"] == "found 5 matches" - - def test_result_truncated_to_2000(self, monkeypatch, tmp_path): + d = str(tmp_path) + write_auto_save(d, turn=7, tool_name="search_code", result="found 5 matches") + entries = read_tool_log(d) + assert len(entries) == 1 + assert entries[0] == {"turn": 7, "tool": "search_code", "result_preview": "found 5 matches"} + + def test_result_truncated_to_2000(self, tmp_path): """Result preview is capped at 2000 characters.""" - long_result = "x" * 5000 - calls = [("big_tool", long_result)] - save_path = self._run_hook(monkeypatch, tmp_path, 1, calls) - with open(save_path) as f: - entry = json.loads(f.readline()) - assert len(entry["result_preview"]) == 2000 + d = str(tmp_path) + write_auto_save(d, turn=1, tool_name="big", result="x" * 5000) + entries = read_tool_log(d) + assert len(entries[0]["result_preview"]) == 2000 def test_survives_write_failure(self): - """Auto-save to an impossible path does not raise.""" + """write_auto_save to an impossible path does not raise.""" + write_auto_save("/dev/null/impossible", turn=1, tool_name="t", result="r") + + def test_read_skips_corrupt_trailing_line(self, tmp_path): + """read_tool_log skips truncated/corrupt lines without discarding valid ones.""" import os - save_path = os.path.join("/dev/null/impossible", "auto_save_tool_log.ndjson") - assert not os.path.exists(save_path) + d = str(tmp_path) + write_auto_save(d, turn=1, tool_name="good", result="ok") + # Append a corrupt line simulating crash mid-write + log_path = os.path.join(d, "auto_save_tool_log.ndjson") + with open(log_path, "a") as f: + f.write('{"truncated\n') + entries = read_tool_log(d) + assert len(entries) == 1 + assert entries[0]["tool"] == "good" + + def test_read_empty_dir(self, tmp_path): + """read_tool_log on a dir with no log file returns [].""" + assert read_tool_log(str(tmp_path)) == [] From 72d3da533fbc89ed0755460d1e5815dd60e7e107 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 15:07:34 +0530 Subject: [PATCH 4/9] fix: add explicit UTF-8 encoding and validate AUTO_SAVE_INTERVAL - Add encoding="utf-8" to open() in write_auto_save and read_tool_log - Catch ValueError on non-numeric AUTO_SAVE_INTERVAL with fallback to 0 - Soften docstring from "crash-safe" to "append-only" --- src/seclab_taskflow_agent/runner.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/seclab_taskflow_agent/runner.py b/src/seclab_taskflow_agent/runner.py index 4fb93d0..3c96fc2 100644 --- a/src/seclab_taskflow_agent/runner.py +++ b/src/seclab_taskflow_agent/runner.py @@ -64,7 +64,7 @@ def write_auto_save( tool_name: str, result: str, ) -> None: - """Append tool result to auto-save log (NDJSON, append-only, crash-safe).""" + """Append tool result to auto-save log (NDJSON, append-only).""" try: os.makedirs(auto_save_dir, exist_ok=True) save_path = os.path.join(auto_save_dir, AUTO_SAVE_LOG_NAME) @@ -73,7 +73,7 @@ def write_auto_save( "tool": tool_name, "result_preview": (result or "")[:2000], }) - with open(save_path, "a") as f: + with open(save_path, "a", encoding="utf-8") as f: f.write(entry + "\n") except Exception as e: logging.warning(f"Auto-save failed: {e}") @@ -87,7 +87,7 @@ def read_tool_log(auto_save_dir: str) -> list[dict]: try: path = os.path.join(auto_save_dir, AUTO_SAVE_LOG_NAME) if os.path.exists(path): - with open(path) as f: + with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: @@ -517,7 +517,11 @@ async def run_main( # Disabled by default (interval=0). Set AUTO_SAVE_DIR and # AUTO_SAVE_INTERVAL to enable. _tool_call_counter = [0] - _auto_save_interval = int(os.getenv("AUTO_SAVE_INTERVAL", "0")) + try: + _auto_save_interval = int(os.getenv("AUTO_SAVE_INTERVAL", "0")) + except ValueError: + logging.warning("Invalid AUTO_SAVE_INTERVAL value, defaulting to 0 (disabled)") + _auto_save_interval = 0 _auto_save_dir = os.getenv("AUTO_SAVE_DIR", "") async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool, result: str) -> None: From 5172eb574e0731103afa0631e8944fcd24c8cdf8 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 17:13:42 +0530 Subject: [PATCH 5/9] fix: use mock-based write failure test for cross-platform CI --- tests/test_runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_runner.py b/tests/test_runner.py index 4f4923f..7a9551e 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -484,8 +484,9 @@ def test_result_truncated_to_2000(self, tmp_path): assert len(entries[0]["result_preview"]) == 2000 def test_survives_write_failure(self): - """write_auto_save to an impossible path does not raise.""" - write_auto_save("/dev/null/impossible", turn=1, tool_name="t", result="r") + """write_auto_save suppresses write errors without crashing.""" + with patch("builtins.open", side_effect=OSError("disk full")): + write_auto_save("/tmp/any", turn=1, tool_name="t", result="r") def test_read_skips_corrupt_trailing_line(self, tmp_path): """read_tool_log skips truncated/corrupt lines without discarding valid ones.""" From 9968fcf0aaba7381628e6b1371d4ded1bd116522 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:31:31 +0530 Subject: [PATCH 6/9] feat: add opt-in loop detection for repetitive tool calls Tracks consecutive same-tool-name calls in on_tool_end_hook. When the count reaches the threshold, raises LoopDetectedError to abort the task. Configured per-task via max_consecutive_same_tool or globally via LOOP_MAX_CONSECUTIVE env var. Disabled by default. Skipped for async tasks. Counter resets between prompts in repeat_prompt mode. --- doc/GRAMMAR.md | 18 +++++++ src/seclab_taskflow_agent/models.py | 1 + src/seclab_taskflow_agent/runner.py | 49 +++++++++++++++++++ tests/test_runner.py | 76 +++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+) diff --git a/doc/GRAMMAR.md b/doc/GRAMMAR.md index 5a9a49a..2e49f80 100644 --- a/doc/GRAMMAR.md +++ b/doc/GRAMMAR.md @@ -118,6 +118,24 @@ Example: ... ``` +### max_consecutive_same_tool + +`max_consecutive_same_tool` sets the maximum number of times the same tool can be called +consecutively before the task is aborted with a checkpoint. + +- **Omitted or `null`** (default): inherits from the `LOOP_MAX_CONSECUTIVE` environment variable; if that is also unset or `0`, detection is disabled. +- **`0`**: explicitly disables detection for this task, even when `LOOP_MAX_CONSECUTIVE` is set globally. +- **Positive integer**: enables detection at that threshold. + +```yaml + - task: + max_consecutive_same_tool: 10 + agents: + - seclab_taskflow_agent.personalities.assistant + user_prompt: | + Audit this codebase. +``` + ### Running templated tasks in a loop Often we may want to iterate through the same tasks with different inputs. For example, we may want to fetch all the functions from a code base and then analyze each of the functions. This can be done using two consecutive tasks and with the help of the `repeat_prompt` field. diff --git a/src/seclab_taskflow_agent/models.py b/src/seclab_taskflow_agent/models.py index 6445b64..0309cbe 100644 --- a/src/seclab_taskflow_agent/models.py +++ b/src/seclab_taskflow_agent/models.py @@ -97,6 +97,7 @@ class TaskDefinition(BaseModel): env: dict[str, str] = Field(default_factory=dict) inputs: dict[str, Any] = Field(default_factory=dict) max_steps: int = 0 # 0 means use the runner default + max_consecutive_same_tool: int | None = None # None = use LOOP_MAX_CONSECUTIVE env; 0 = disabled uses: str = "" # async settings (``async`` is a reserved word, aliased) diff --git a/src/seclab_taskflow_agent/runner.py b/src/seclab_taskflow_agent/runner.py index 3c96fc2..ff7da9d 100644 --- a/src/seclab_taskflow_agent/runner.py +++ b/src/seclab_taskflow_agent/runner.py @@ -12,6 +12,7 @@ __all__ = [ "DEFAULT_MAX_TURNS", + "LoopDetectedError", "MAX_API_RETRY", "MAX_RATE_LIMIT_BACKOFF", "RATE_LIMIT_BACKOFF", @@ -99,6 +100,15 @@ def read_tool_log(auto_save_dir: str) -> list[dict]: return entries +class LoopDetectedError(Exception): + """Raised when the agent enters a repetitive tool-call loop.""" + + def __init__(self, message: str, tool_name: str, count: int): + super().__init__(message) + self.tool_name = tool_name + self.count = count + + def _resolve_model_config( available_tools: AvailableTools, model_config_ref: str, @@ -524,12 +534,34 @@ async def run_main( _auto_save_interval = 0 _auto_save_dir = os.getenv("AUTO_SAVE_DIR", "") + # Loop detection state + _consecutive_tool_name = [""] + _consecutive_tool_count = [0] + _loop_threshold = [0] # 0=disabled + _loop_is_async = [False] + async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool, result: str) -> None: last_mcp_tool_results.append(result) _tool_call_counter[0] += 1 if _auto_save_dir and _auto_save_interval and _tool_call_counter[0] % _auto_save_interval == 0: write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, result) + if _loop_threshold[0] > 0 and not _loop_is_async[0]: + if _consecutive_tool_name[0] == tool.name: + _consecutive_tool_count[0] += 1 + else: + _consecutive_tool_name[0] = tool.name + _consecutive_tool_count[0] = 1 + + if _consecutive_tool_count[0] >= _loop_threshold[0]: + if _auto_save_dir: + write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, (result or "")[:500]) + raise LoopDetectedError( + f"{tool.name} called {_consecutive_tool_count[0]} times consecutively", + tool_name=tool.name, + count=_consecutive_tool_count[0], + ) + async def on_tool_start_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool) -> None: await render_model_output(f"\n** 🤖🛠️ Tool Call: {tool.name}\n") @@ -626,6 +658,15 @@ async def on_handoff_hook(context: RunContextWrapper[TContext], agent: Agent[TCo async_task = task.async_task max_concurrent_tasks = task.async_limit + # Configure loop detection for this task + task_loop_val = getattr(task, "max_consecutive_same_tool", None) + _loop_threshold[0] = ( + task_loop_val + if task_loop_val is not None + else int(os.getenv("LOOP_MAX_CONSECUTIVE", "0")) + ) + _loop_is_async[0] = async_task + # Render prompt template (skip if repeat_prompt — result not yet available) if task_prompt and not repeat_prompt: try: @@ -661,6 +702,10 @@ async def run_prompts(async_task: bool = False, max_concurrent_tasks: int = 5) - task_results: list[Any] = [] semaphore = asyncio.Semaphore(max_concurrent_tasks) for p_prompt in prompts_to_run: + # Reset loop detection per prompt + _consecutive_tool_name[0] = "" + _consecutive_tool_count[0] = 0 + resolved_agents: dict[str, Any] = {} current_agents = list(agents_list) if not current_agents: @@ -741,6 +786,10 @@ async def _deploy(ra: dict, pp: str) -> bool: break except (KeyboardInterrupt, SystemExit): raise + except LoopDetectedError as exc: + last_task_error = exc + logging.warning(f"Loop detected in task {task_name!r}: {exc}") + break except (APIConnectionError, APITimeoutError, ConnectionError, TimeoutError) as exc: last_task_error = exc remaining = TASK_RETRY_LIMIT - attempt - 1 diff --git a/tests/test_runner.py b/tests/test_runner.py index 7a9551e..955773b 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -20,6 +20,7 @@ TaskWrapper, ) from seclab_taskflow_agent.runner import ( + LoopDetectedError, _build_prompts_to_run, _merge_reusable_task, _resolve_model_config, @@ -505,3 +506,78 @@ def test_read_skips_corrupt_trailing_line(self, tmp_path): def test_read_empty_dir(self, tmp_path): """read_tool_log on a dir with no log file returns [].""" assert read_tool_log(str(tmp_path)) == [] + + +# =================================================================== +# Loop detection +# =================================================================== + + +class TestLoopDetection: + """Tests for LoopDetectedError and loop detection logic.""" + + def test_raises_after_threshold(self): + """LoopDetectedError has tool_name and count attributes.""" + err = LoopDetectedError("search_code called 10 times consecutively", tool_name="search_code", count=10) + assert err.tool_name == "search_code" + assert err.count == 10 + assert "search_code" in str(err) + + def test_different_tools_reset_counter(self): + """Alternating tools should not trigger detection. + We test the logic directly: if tool changes, count resets to 1.""" + name = [""] + count = [0] + + def simulate_tool_call(tool_name, threshold): + if name[0] == tool_name: + count[0] += 1 + else: + name[0] = tool_name + count[0] = 1 + return count[0] >= threshold + + # Alternate between two tools — never triggers at threshold 3 + for _ in range(10): + assert not simulate_tool_call("tool_a", 3) + assert not simulate_tool_call("tool_b", 3) + + def test_consecutive_same_tool_triggers(self): + """Same tool called threshold times triggers detection.""" + name = [""] + count = [0] + threshold = 5 + + def simulate_tool_call(tool_name): + if name[0] == tool_name: + count[0] += 1 + else: + name[0] = tool_name + count[0] = 1 + return count[0] >= threshold + + for i in range(4): + assert not simulate_tool_call("search_code") + assert simulate_tool_call("search_code") + + def test_no_raise_when_disabled(self): + """With threshold 0, detection is disabled.""" + # threshold=0 means the check is skipped + assert True # The condition `_loop_threshold[0] > 0` guards the check + + def test_task_definition_accepts_max_consecutive_same_tool(self): + t = TaskDefinition(max_consecutive_same_tool=10) + assert t.max_consecutive_same_tool == 10 + + def test_task_definition_defaults_to_none(self): + t = TaskDefinition() + assert t.max_consecutive_same_tool is None + + def test_explicit_zero_disables(self): + t = TaskDefinition(max_consecutive_same_tool=0) + assert t.max_consecutive_same_tool == 0 + + def test_existing_yaml_without_field_parses(self): + """TaskDefinition without max_consecutive_same_tool parses fine.""" + t = TaskDefinition(name="legacy", agents=["p.a"], user_prompt="Hello") + assert t.max_consecutive_same_tool is None From 95908d9f8ec3dc7c66b3b3c2a50b9454afee1ca7 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:37:10 +0530 Subject: [PATCH 7/9] docs: mention max_consecutive_same_tool in README Taskflows section --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 46d50a6..d0293d1 100644 --- a/README.md +++ b/README.md @@ -435,6 +435,9 @@ confirm: A sequence of interdependent tasks performed by a set of Agents. Configured through YAML files of `filetype` `taskflow`. Taskflows supports a number of features, and their details can be found [here](doc/GRAMMAR.md). +For long-running audits, `max_consecutive_same_tool` can limit repetitive tool +calls (see [GRAMMAR.md](doc/GRAMMAR.md#max_consecutive_same_tool)). + Example: ```yaml From d2ba1e39a3a0f62ab02b8e80da6c1c38c0e7ac7e Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:52:56 +0530 Subject: [PATCH 8/9] refactor: extract loop detection into testable check_consecutive_tool_loop Moves the consecutive-tool tracking logic from the on_tool_end_hook closure into a module-level function with explicit state parameters. Tests now exercise the real implementation. Adds tests for negative threshold, counter reset on tool change, and state mutation visibility. --- src/seclab_taskflow_agent/runner.py | 53 +++++-- tests/test_runner.py | 216 ++++++++++++++++++++++------ 2 files changed, 212 insertions(+), 57 deletions(-) diff --git a/src/seclab_taskflow_agent/runner.py b/src/seclab_taskflow_agent/runner.py index ff7da9d..46e1854 100644 --- a/src/seclab_taskflow_agent/runner.py +++ b/src/seclab_taskflow_agent/runner.py @@ -16,6 +16,7 @@ "MAX_API_RETRY", "MAX_RATE_LIMIT_BACKOFF", "RATE_LIMIT_BACKOFF", + "check_consecutive_tool_loop", "deploy_task_agents", "read_tool_log", "run_main", @@ -109,6 +110,38 @@ def __init__(self, message: str, tool_name: str, count: int): self.count = count +def check_consecutive_tool_loop( + tool_name: str, + consecutive_name: list[str], + consecutive_count: list[int], + threshold: int, +) -> None: + """Track consecutive same-tool calls and raise on threshold. + + Args: + tool_name: Name of the tool that just completed. + consecutive_name: Single-element list holding the current tool name streak. + consecutive_count: Single-element list holding the current streak count. + threshold: Maximum allowed consecutive calls. 0 or negative disables. + + Raises: + LoopDetectedError: When *consecutive_count* reaches *threshold*. + """ + if threshold <= 0: + return + if consecutive_name[0] == tool_name: + consecutive_count[0] += 1 + else: + consecutive_name[0] = tool_name + consecutive_count[0] = 1 + if consecutive_count[0] >= threshold: + raise LoopDetectedError( + f"{tool_name} called {consecutive_count[0]} times consecutively", + tool_name=tool_name, + count=consecutive_count[0], + ) + + def _resolve_model_config( available_tools: AvailableTools, model_config_ref: str, @@ -546,21 +579,15 @@ async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TC if _auto_save_dir and _auto_save_interval and _tool_call_counter[0] % _auto_save_interval == 0: write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, result) - if _loop_threshold[0] > 0 and not _loop_is_async[0]: - if _consecutive_tool_name[0] == tool.name: - _consecutive_tool_count[0] += 1 - else: - _consecutive_tool_name[0] = tool.name - _consecutive_tool_count[0] = 1 - - if _consecutive_tool_count[0] >= _loop_threshold[0]: + if not _loop_is_async[0]: + try: + check_consecutive_tool_loop( + tool.name, _consecutive_tool_name, _consecutive_tool_count, _loop_threshold[0], + ) + except LoopDetectedError: if _auto_save_dir: write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, (result or "")[:500]) - raise LoopDetectedError( - f"{tool.name} called {_consecutive_tool_count[0]} times consecutively", - tool_name=tool.name, - count=_consecutive_tool_count[0], - ) + raise async def on_tool_start_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool) -> None: await render_model_output(f"\n** 🤖🛠️ Tool Call: {tool.name}\n") diff --git a/tests/test_runner.py b/tests/test_runner.py index 955773b..68acaaa 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -14,6 +14,7 @@ from seclab_taskflow_agent.models import ( ModelConfigDocument, + PersonalityDocument, TaskDefinition, TaskflowDocument, TaskflowHeader, @@ -25,7 +26,9 @@ _merge_reusable_task, _resolve_model_config, _resolve_task_model, + check_consecutive_tool_loop, read_tool_log, + run_main, write_auto_save, ) @@ -66,6 +69,17 @@ def _make_taskflow_doc(tasks: list[TaskDefinition]) -> TaskflowDocument: ) +def _make_personality() -> PersonalityDocument: + return PersonalityDocument( + **{ + "seclab-taskflow-agent": TaskflowHeader(version="1.0", filetype="personality"), + "personality": "test bot", + "task": "do things", + "toolboxes": [], + } + ) + + def _mock_available_tools() -> MagicMock: return MagicMock() @@ -514,56 +528,55 @@ def test_read_empty_dir(self, tmp_path): class TestLoopDetection: - """Tests for LoopDetectedError and loop detection logic.""" + """Tests for check_consecutive_tool_loop (real implementation).""" def test_raises_after_threshold(self): - """LoopDetectedError has tool_name and count attributes.""" - err = LoopDetectedError("search_code called 10 times consecutively", tool_name="search_code", count=10) - assert err.tool_name == "search_code" - assert err.count == 10 - assert "search_code" in str(err) + """Reaching the threshold raises LoopDetectedError with correct attrs.""" + name, count = [""], [0] + for _ in range(4): + check_consecutive_tool_loop("search_code", name, count, threshold=5) + with pytest.raises(LoopDetectedError) as exc_info: + check_consecutive_tool_loop("search_code", name, count, threshold=5) + assert exc_info.value.tool_name == "search_code" + assert exc_info.value.count == 5 def test_different_tools_reset_counter(self): - """Alternating tools should not trigger detection. - We test the logic directly: if tool changes, count resets to 1.""" - name = [""] - count = [0] - - def simulate_tool_call(tool_name, threshold): - if name[0] == tool_name: - count[0] += 1 - else: - name[0] = tool_name - count[0] = 1 - return count[0] >= threshold - - # Alternate between two tools — never triggers at threshold 3 - for _ in range(10): - assert not simulate_tool_call("tool_a", 3) - assert not simulate_tool_call("tool_b", 3) - - def test_consecutive_same_tool_triggers(self): - """Same tool called threshold times triggers detection.""" - name = [""] - count = [0] - threshold = 5 - - def simulate_tool_call(tool_name): - if name[0] == tool_name: - count[0] += 1 - else: - name[0] = tool_name - count[0] = 1 - return count[0] >= threshold - - for i in range(4): - assert not simulate_tool_call("search_code") - assert simulate_tool_call("search_code") + """Alternating tools never triggers — counter resets on name change.""" + name, count = [""], [0] + for _ in range(20): + check_consecutive_tool_loop("tool_a", name, count, threshold=3) + check_consecutive_tool_loop("tool_b", name, count, threshold=3) def test_no_raise_when_disabled(self): - """With threshold 0, detection is disabled.""" - # threshold=0 means the check is skipped - assert True # The condition `_loop_threshold[0] > 0` guards the check + """Threshold 0 disables detection entirely.""" + name, count = [""], [0] + for _ in range(100): + check_consecutive_tool_loop("same_tool", name, count, threshold=0) + + def test_negative_threshold_disables(self): + """Negative threshold also disables detection.""" + name, count = [""], [0] + for _ in range(100): + check_consecutive_tool_loop("same_tool", name, count, threshold=-1) + + def test_counter_resets_on_different_tool(self): + """Inserting a different tool resets the streak.""" + name, count = [""], [0] + for _ in range(4): + check_consecutive_tool_loop("search_code", name, count, threshold=5) + assert count[0] == 4 + check_consecutive_tool_loop("read_file", name, count, threshold=5) + assert count[0] == 1 + assert name[0] == "read_file" + + def test_state_mutation_visible_to_caller(self): + """The function mutates the lists in-place so callers see updates.""" + name, count = [""], [0] + check_consecutive_tool_loop("tool_x", name, count, threshold=10) + assert name[0] == "tool_x" + assert count[0] == 1 + check_consecutive_tool_loop("tool_x", name, count, threshold=10) + assert count[0] == 2 def test_task_definition_accepts_max_consecutive_same_tool(self): t = TaskDefinition(max_consecutive_same_tool=10) @@ -581,3 +594,118 @@ def test_existing_yaml_without_field_parses(self): """TaskDefinition without max_consecutive_same_tool parses fine.""" t = TaskDefinition(name="legacy", agents=["p.a"], user_prompt="Hello") assert t.max_consecutive_same_tool is None + + +# =================================================================== +# Loop detection integration (drives run_main → on_tool_end_hook) +# =================================================================== + + +def _run_main_with_loop( + monkeypatch, + tmp_path, + task_kwargs, + n_tool_calls, + tool_name="search_code", +): + """Helper: run run_main with a fake deploy that fires N same-tool completions. + + Returns the LoopDetectedError if raised, or None if run_main completed. + """ + task = TaskDefinition( + agents=["test.personality"], + user_prompt="do stuff", + **task_kwargs, + ) + doc = _make_taskflow_doc([task]) + + at = _mock_available_tools() + at.get_taskflow.return_value = doc + at.get_personality.return_value = _make_personality() + + async def fake_deploy(_at, _agents, _prompt, **kwargs): + run_hooks = kwargs.get("run_hooks") + if run_hooks and run_hooks.on_tool_end: + ctx = MagicMock() + agent = MagicMock() + tool = MagicMock() + tool.name = tool_name + for _ in range(n_tool_calls): + await run_hooks.on_tool_end(ctx, agent, tool, "result") + return True + + monkeypatch.setattr("seclab_taskflow_agent.runner.deploy_task_agents", fake_deploy) + monkeypatch.setattr("seclab_taskflow_agent.session._data_dir", lambda: tmp_path) + + with patch("seclab_taskflow_agent.runner.render_model_output", new_callable=AsyncMock): + try: + asyncio.run(run_main(at, None, "test.taskflow", {}, None)) + except LoopDetectedError as exc: + return exc + return None + + +class TestLoopDetectionIntegration: + """Integration tests that drive run_main → on_tool_end_hook.""" + + def test_triggers_via_task_field(self, monkeypatch, tmp_path): + """max_consecutive_same_tool=3 on the task fires after 3 calls.""" + exc = _run_main_with_loop( + monkeypatch, tmp_path, + task_kwargs={"max_consecutive_same_tool": 3}, + n_tool_calls=5, + ) + assert exc is not None + assert exc.tool_name == "search_code" + assert exc.count == 3 + + def test_triggers_via_env_fallback(self, monkeypatch, tmp_path): + """LOOP_MAX_CONSECUTIVE env var is used when task field is None.""" + monkeypatch.setenv("LOOP_MAX_CONSECUTIVE", "4") + exc = _run_main_with_loop( + monkeypatch, tmp_path, + task_kwargs={}, # no task-level field → env fallback + n_tool_calls=6, + ) + assert exc is not None + assert exc.count == 4 + + def test_disabled_when_zero(self, monkeypatch, tmp_path): + """max_consecutive_same_tool=0 disables even with env set.""" + monkeypatch.setenv("LOOP_MAX_CONSECUTIVE", "3") + exc = _run_main_with_loop( + monkeypatch, tmp_path, + task_kwargs={"max_consecutive_same_tool": 0}, + n_tool_calls=10, + ) + assert exc is None + + def test_async_task_bypass(self, monkeypatch, tmp_path): + """Loop detection is skipped for async tasks.""" + task = TaskDefinition( + agents=["test.personality"], + user_prompt="do stuff", + max_consecutive_same_tool=3, + **{"async": True}, + ) + doc = _make_taskflow_doc([task]) + + at = _mock_available_tools() + at.get_taskflow.return_value = doc + at.get_personality.return_value = _make_personality() + + async def fake_deploy(_at, _agents, _prompt, **kwargs): + run_hooks = kwargs.get("run_hooks") + if run_hooks and run_hooks.on_tool_end: + ctx, agent, tool = MagicMock(), MagicMock(), MagicMock() + tool.name = "search_code" + for _ in range(10): + await run_hooks.on_tool_end(ctx, agent, tool, "r") + return True + + monkeypatch.setattr("seclab_taskflow_agent.runner.deploy_task_agents", fake_deploy) + monkeypatch.setattr("seclab_taskflow_agent.session._data_dir", lambda: tmp_path) + + with patch("seclab_taskflow_agent.runner.render_model_output", new_callable=AsyncMock): + # Should NOT raise — async bypass + asyncio.run(run_main(at, None, "test.taskflow", {}, None)) From 13a6f34fdb720d8bad4a1e4de4d3e2bac97949f9 Mon Sep 17 00:00:00 2001 From: sheeki003 <36009418+sheeki03@users.noreply.github.com> Date: Fri, 10 Apr 2026 17:17:05 +0530 Subject: [PATCH 9/9] fix: guard LOOP_MAX_CONSECUTIVE against non-numeric env values --- src/seclab_taskflow_agent/runner.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/seclab_taskflow_agent/runner.py b/src/seclab_taskflow_agent/runner.py index 46e1854..557213e 100644 --- a/src/seclab_taskflow_agent/runner.py +++ b/src/seclab_taskflow_agent/runner.py @@ -687,11 +687,14 @@ async def on_handoff_hook(context: RunContextWrapper[TContext], agent: Agent[TCo # Configure loop detection for this task task_loop_val = getattr(task, "max_consecutive_same_tool", None) - _loop_threshold[0] = ( - task_loop_val - if task_loop_val is not None - else int(os.getenv("LOOP_MAX_CONSECUTIVE", "0")) - ) + if task_loop_val is not None: + _loop_threshold[0] = task_loop_val + else: + try: + _loop_threshold[0] = int(os.getenv("LOOP_MAX_CONSECUTIVE", "0")) + except ValueError: + logging.warning("Invalid LOOP_MAX_CONSECUTIVE value, defaulting to 0 (disabled)") + _loop_threshold[0] = 0 _loop_is_async[0] = async_task # Render prompt template (skip if repeat_prompt — result not yet available)