From b76b6d143a96cd02f5e03543b4b49aaf719e666d Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 27 Jun 2026 08:27:51 +0000 Subject: [PATCH 1/3] fix(task-graph): cap bridgeSubGraphTaskEvents depth to prevent event amplification bridgeSubGraphTaskEvents installs one re-emit listener per event type per nesting level. A deeply nested compound task (e.g., a MapTask containing GraphAsTask containing MapTask) turns one inner task_progress into N parent emits before reaching any wire subscriber. Downstream consumers with a bounded event log (the sec/builder ExecutionEventLog is capped at 10k events) can evict legitimate events under sustained nested fan-out. Add a default depth cap of 16 with a logged drop once exceeded, and let callers override via the new maxDepth parameter. Co-Authored-By: Claude Opus 4.7 (1M context) Claude-Session: https://claude.ai/code/session_01V3e3m8cMRy5stFhDzGmZrF --- packages/task-graph/src/common.ts | 1 + .../src/task-graph/SubGraphEventBridge.ts | 55 ++++++++- .../test/task-graph/TaskCompleteEvent.test.ts | 114 +++++++++++++++++- 3 files changed, 167 insertions(+), 3 deletions(-) diff --git a/packages/task-graph/src/common.ts b/packages/task-graph/src/common.ts index b995546c3..eaa1f2ced 100644 --- a/packages/task-graph/src/common.ts +++ b/packages/task-graph/src/common.ts @@ -16,6 +16,7 @@ export * from "./task-graph/ITaskGraph"; export * from "./task-graph/RunContext"; export * from "./task-graph/RunScheduler"; export * from "./task-graph/StreamPump"; +export * from "./task-graph/SubGraphEventBridge"; export * from "./task-graph/TaskGraph"; export * from "./task-graph/TaskGraphEvents"; export * from "./task-graph/TaskGraphRunner"; diff --git a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts index 24d199f4b..a847b0e68 100644 --- a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts +++ b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts @@ -4,8 +4,27 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { getLogger } from "@workglow/util"; import type { TaskGraph } from "./TaskGraph"; +/** + * Default maximum bridge nesting depth before re-emit listeners are dropped. + * Each level installs one listener per event type and re-emits up; without a + * cap, a pathologically nested compound task (e.g. a MapTask containing a + * GraphAsTask containing a MapTask…) amplifies a single inner emit into N + * parent emits before reaching any wire subscriber, and downstream consumers + * with a bounded event log can evict legitimate events under sustained + * fan-out. + */ +const DEFAULT_MAX_BRIDGE_DEPTH = 16; + +/** + * Symbol-keyed marker we attach to each subgraph so nested calls can derive + * the current depth from the parent without changing call sites. Stored on a + * symbol to avoid colliding with any user-set property. + */ +const BRIDGE_DEPTH = Symbol.for("@workglow/task-graph/SubGraphEventBridge.depth"); + /** * Forward a subgraph's per-task events (task_complete, task_progress, and the * task_stream_* trio) up to the parent graph, so tasks nested inside a compound @@ -14,16 +33,39 @@ import type { TaskGraph } from "./TaskGraph"; * and progress. Bubbles recursively: a nested compound forwards its own * subgraph to its parent, which forwards onward. * + * Depth is tracked on the parent graph via a symbol-keyed marker so callers do + * not need to thread a counter through. Once `depth >= maxDepth` the call + * degrades to a no-op (with a single warn log) — see {@link DEFAULT_MAX_BRIDGE_DEPTH}. + * * @returns a teardown that unsubscribes every bridged listener. Callers MUST * invoke it in a `finally` so a rejecting/aborted/early-terminated subgraph * run cannot leak subscriptions (which would double-emit on a later run). */ -export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void { +export function bridgeSubGraphTaskEvents( + subGraph: TaskGraph, + parentGraph: TaskGraph, + depth: number = (parentGraph as unknown as Record)[BRIDGE_DEPTH] ?? 0, + maxDepth: number = DEFAULT_MAX_BRIDGE_DEPTH +): () => void { // A subgraph bridging to itself would re-emit each event back onto the same // graph it just observed, looping forever. This cannot arise from normal // composition (a compound task's subGraph and parentGraph are distinct // instances) but guard anyway so a malformed hierarchy degrades to a no-op. if (subGraph === parentGraph) return () => {}; + if (depth >= maxDepth) { + getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", { + depth, + maxDepth, + }); + return () => {}; + } + + // Stamp the subgraph with its bridge depth so any nested bridge call (whose + // parentGraph is this subgraph) derives `depth + 1` automatically. + const subGraphWithDepth = subGraph as unknown as Record; + const previousDepth = subGraphWithDepth[BRIDGE_DEPTH]; + subGraphWithDepth[BRIDGE_DEPTH] = depth + 1; + const offs = [ subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)), subGraph.subscribe("task_progress", (id, p, m, ...a) => @@ -37,5 +79,14 @@ export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskG parentGraph.emit("task_stream_end", id, out) ), ]; - return () => offs.forEach((off) => off()); + return () => { + offs.forEach((off) => off()); + // Restore the previous depth marker so a later, independently-rooted bridge + // of the same subgraph instance does not inherit a stale counter. + if (previousDepth === undefined) { + delete subGraphWithDepth[BRIDGE_DEPTH]; + } else { + subGraphWithDepth[BRIDGE_DEPTH] = previousDepth; + } + }; } diff --git a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts index 380720f31..50d2f74eb 100644 --- a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts +++ b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts @@ -6,6 +6,7 @@ import type { IExecuteContext, StreamEvent } from "@workglow/task-graph"; import { + bridgeSubGraphTaskEvents, Dataflow, FallbackTask, GraphAsTask, @@ -16,8 +17,10 @@ import { WhileTask, Workflow, } from "@workglow/task-graph"; +import type { ILogger } from "@workglow/util"; +import { NullLogger, setLogger } from "@workglow/util"; import type { DataPortSchema } from "@workglow/util/schema"; -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; class TCEAddOne extends Task<{ value: number }, { value: number }> { static override readonly type = "TCEAddOne"; @@ -398,3 +401,112 @@ describe("task_complete graph event", () => { expect(seen.filter((id) => id === "leaky")).toHaveLength(1); }); }); + +describe("bridgeSubGraphTaskEvents depth cap", () => { + // Each test installs a stub logger and restores a NullLogger after, so warn + // spies do not leak into other tests (and we never emit to the real console). + let warnSpy: ReturnType; + + function installStubLogger(): void { + warnSpy = vi.fn(); + const stub: ILogger = { + debug: () => {}, + info: () => {}, + warn: warnSpy as unknown as ILogger["warn"], + error: () => {}, + fatal: () => {}, + child() { + return stub; + }, + time: () => {}, + timeEnd: () => {}, + group: () => {}, + groupEnd: () => {}, + }; + setLogger(stub); + } + + afterEach(() => { + setLogger(new NullLogger()); + }); + + it("stops bridging past maxDepth (default 16)", () => { + installStubLogger(); + // Build a chain of TaskGraphs: g0 (top) -> g1 -> g2 -> ... -> gN (innermost). + // Bridging is chained so each gK forwards into gK-1, mirroring how a stack + // of GraphAsTask wrappers nests bridges at run time. + const N = 20; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + // graphs[i] is the inner subgraph; graphs[i-1] is its parent. + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); + } + + // Emit a task_progress from the innermost graph and count how many bridges + // it traversed up to the outermost. Without the cap this would re-emit N + // times (once per parent); with the cap of 16 it stops after 16 hops. + const reachedDepths: number[] = []; + for (let i = 0; i < graphs.length; i++) { + const depth = i; + graphs[i].subscribe("task_progress", () => { + reachedDepths.push(depth); + }); + } + + graphs[N].emit("task_progress", "inner-id", 42, "msg"); + + // Innermost (depth N) saw the original emit; bridges propagate up at most + // 16 levels (the default cap), so the highest depth reached is N - 16. + const minReached = Math.min(...reachedDepths); + expect(minReached).toBeGreaterThanOrEqual(N - 16); + // The cap fired (depth reached 16), producing at least one warn. + expect(warnSpy).toHaveBeenCalled(); + + unbridges.forEach((off) => off()); + }); + + it("logs warning with depth fields when the cap is hit", () => { + installStubLogger(); + const N = 18; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); + } + + expect(warnSpy).toHaveBeenCalled(); + const firstCall = warnSpy.mock.calls[0]; + expect(firstCall[0]).toBe("bridgeSubGraphTaskEvents depth cap hit; dropping bridge"); + expect(firstCall[1]).toEqual({ depth: 16, maxDepth: 16 }); + + unbridges.forEach((off) => off()); + }); + + it("accepts a custom maxDepth override", () => { + installStubLogger(); + const N = 6; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + // Explicitly pass maxDepth=4 to each level. Depth is derived from the + // parent's stamped marker, so the first call to exceed the cap is the + // bridge whose computed depth is 4. + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, 4)); + } + + expect(warnSpy).toHaveBeenCalled(); + const firstCall = warnSpy.mock.calls.find( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(firstCall).toBeDefined(); + expect(firstCall![1]).toEqual({ depth: 4, maxDepth: 4 }); + + unbridges.forEach((off) => off()); + }); +}); From ca833f1d97c55a1a6d053496b445cd64bf3e677b Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 28 Jun 2026 08:27:41 +0000 Subject: [PATCH 2/3] fix(task-graph): stamp saturating depth on over-cap bridge + dedupe warn per parent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without the saturating stamp, when bridgeSubGraphTaskEvents short-circuits at the depth cap, the subgraph is left without a BRIDGE_DEPTH marker — so the next bridge level reads depth as undefined (0) and the cap leaks downstream. Stamp the subgraph at maxDepth before the early return so every nested bridge also short-circuits. Also gate the warn through a module-level WeakSet keyed by parent graph: an iterator with 1k iterations at over-cap was previously emitting 1k warns per run. --- .../src/task-graph/SubGraphEventBridge.ts | 24 +++++- .../test/task-graph/TaskCompleteEvent.test.ts | 81 +++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts index a847b0e68..e13b99d1d 100644 --- a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts +++ b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts @@ -25,6 +25,14 @@ const DEFAULT_MAX_BRIDGE_DEPTH = 16; */ const BRIDGE_DEPTH = Symbol.for("@workglow/task-graph/SubGraphEventBridge.depth"); +/** + * Track parent graphs that have already emitted the depth-cap warning so a + * pathologically nested compound (e.g. an iterator with 1k iterations all at + * over-cap) does not spam one warn per iteration. Held weakly so an evicted + * parent graph can still be garbage-collected. + */ +const warnedParents = new WeakSet(); + /** * Forward a subgraph's per-task events (task_complete, task_progress, and the * task_stream_* trio) up to the parent graph, so tasks nested inside a compound @@ -53,10 +61,18 @@ export function bridgeSubGraphTaskEvents( // instances) but guard anyway so a malformed hierarchy degrades to a no-op. if (subGraph === parentGraph) return () => {}; if (depth >= maxDepth) { - getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", { - depth, - maxDepth, - }); + // Stamp the subgraph at the cap so any nested bridge call (whose + // parentGraph is this subgraph) derives `depth >= maxDepth` and also + // short-circuits — otherwise the depth counter resets at the next level + // and the cap leaks downstream. + (subGraph as unknown as Record)[BRIDGE_DEPTH] = maxDepth; + if (!warnedParents.has(parentGraph)) { + warnedParents.add(parentGraph); + getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", { + depth, + maxDepth, + }); + } return () => {}; } diff --git a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts index 50d2f74eb..ed416b412 100644 --- a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts +++ b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts @@ -509,4 +509,85 @@ describe("bridgeSubGraphTaskEvents depth cap", () => { unbridges.forEach((off) => off()); }); + + it("stamped over-cap subgraph prevents downstream bridges from restarting depth", () => { + installStubLogger(); + // Build a 5-level chain with maxDepth=2. Bridges install left-to-right + // (outermost first). The first call to exceed the cap MUST stamp the + // subgraph so the next bridge (whose parent is that subgraph) also sees + // depth >= maxDepth — otherwise the parent's BRIDGE_DEPTH reads as + // undefined, depth resets to 0, and the cap leaks downstream. + const maxDepth = 2; + const N = 5; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const reached: number[] = []; + for (let i = 0; i < graphs.length; i++) { + const depth = i; + graphs[i].subscribe("task_progress", () => { + reached.push(depth); + }); + } + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, maxDepth)); + } + + // graphs[1] and graphs[2] install (depth 0, 1). graphs[3] hits cap and + // stamps itself at maxDepth so graphs[4] and graphs[5] also no-op. Emit + // from the innermost graph: only its own subscriber should fire — every + // bridge i=3..5 short-circuited, so no event propagates outward at all. + graphs[N].emit("task_progress", "inner-id", 42, "msg"); + + // No bridge from level 2 onward installed, so only the innermost graph + // (depth N) saw the emit. depth=0 (outermost) must NOT be reached. + expect(reached).toEqual([N]); + expect(reached).not.toContain(0); + + unbridges.forEach((off) => off()); + }); + + it("warn fires exactly once per parentGraph at over-cap", () => { + installStubLogger(); + const parent = new TaskGraph(); + // Call 50 times with the same parent at over-cap (depth seeded above cap). + const teardowns: Array<() => void> = []; + for (let i = 0; i < 50; i++) { + const sub = new TaskGraph(); + teardowns.push(bridgeSubGraphTaskEvents(sub, parent, 16, 16)); + } + + const capCalls = warnSpy.mock.calls.filter( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(capCalls).toHaveLength(1); + + teardowns.forEach((off) => off()); + }); + + it("distinct parents at over-cap each warn once", () => { + installStubLogger(); + const parentA = new TaskGraph(); + const parentB = new TaskGraph(); + const subA = new TaskGraph(); + const subB = new TaskGraph(); + + const off1 = bridgeSubGraphTaskEvents(subA, parentA, 16, 16); + const off2 = bridgeSubGraphTaskEvents(subB, parentB, 16, 16); + // Repeat each parent a few more times — should not produce additional warns. + const off3 = bridgeSubGraphTaskEvents(new TaskGraph(), parentA, 16, 16); + const off4 = bridgeSubGraphTaskEvents(new TaskGraph(), parentB, 16, 16); + + const capCalls = warnSpy.mock.calls.filter( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(capCalls).toHaveLength(2); + + off1(); + off2(); + off3(); + off4(); + }); }); From 00a4beb2095b2f27176fb5aaee4ffedca69ce63a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Jul 2026 18:32:58 +0000 Subject: [PATCH 3/3] Fix stale BRIDGE_DEPTH teardown and tighten depth-cap test assertions --- .../src/task-graph/SubGraphEventBridge.ts | 12 +++++-- .../test/task-graph/TaskCompleteEvent.test.ts | 33 +++++++++---------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts index e13b99d1d..a6744f888 100644 --- a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts +++ b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts @@ -65,7 +65,9 @@ export function bridgeSubGraphTaskEvents( // parentGraph is this subgraph) derives `depth >= maxDepth` and also // short-circuits — otherwise the depth counter resets at the next level // and the cap leaks downstream. - (subGraph as unknown as Record)[BRIDGE_DEPTH] = maxDepth; + const subGraphAsRecord = subGraph as unknown as Record; + const previousDepth = subGraphAsRecord[BRIDGE_DEPTH]; + subGraphAsRecord[BRIDGE_DEPTH] = maxDepth; if (!warnedParents.has(parentGraph)) { warnedParents.add(parentGraph); getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", { @@ -73,7 +75,13 @@ export function bridgeSubGraphTaskEvents( maxDepth, }); } - return () => {}; + return () => { + if (previousDepth === undefined) { + delete subGraphAsRecord[BRIDGE_DEPTH]; + } else { + subGraphAsRecord[BRIDGE_DEPTH] = previousDepth; + } + }; } // Stamp the subgraph with its bridge depth so any nested bridge call (whose diff --git a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts index ed416b412..6d92ddf8a 100644 --- a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts +++ b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts @@ -432,10 +432,10 @@ describe("bridgeSubGraphTaskEvents depth cap", () => { it("stops bridging past maxDepth (default 16)", () => { installStubLogger(); - // Build a chain of TaskGraphs: g0 (top) -> g1 -> g2 -> ... -> gN (innermost). + // Build a chain of TaskGraphs: g0 (root) -> g1 -> ... -> g17 (innermost). // Bridging is chained so each gK forwards into gK-1, mirroring how a stack // of GraphAsTask wrappers nests bridges at run time. - const N = 20; + const N = 17; const graphs: TaskGraph[] = []; for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); @@ -445,24 +445,21 @@ describe("bridgeSubGraphTaskEvents depth cap", () => { unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); } - // Emit a task_progress from the innermost graph and count how many bridges - // it traversed up to the outermost. Without the cap this would re-emit N - // times (once per parent); with the cap of 16 it stops after 16 hops. - const reachedDepths: number[] = []; - for (let i = 0; i < graphs.length; i++) { - const depth = i; - graphs[i].subscribe("task_progress", () => { - reachedDepths.push(depth); - }); - } + // Track all task IDs that reach the root (graphs[0]). + const rootIds: string[] = []; + graphs[0].subscribe("task_progress", (id) => rootIds.push(String(id))); - graphs[N].emit("task_progress", "inner-id", 42, "msg"); + // graphs[16] is bridged at depth 15 (< 16), so its events propagate + // through all 16 hops and must reach the root. + graphs[16].emit("task_progress", "at-cap", 1, "msg"); + expect(rootIds).toContain("at-cap"); + + // graphs[17] is where the depth hits 16 (= maxDepth), so that bridge is + // dropped; its events must NOT reach the root. + graphs[17].emit("task_progress", "over-cap", 1, "msg"); + expect(rootIds).not.toContain("over-cap"); - // Innermost (depth N) saw the original emit; bridges propagate up at most - // 16 levels (the default cap), so the highest depth reached is N - 16. - const minReached = Math.min(...reachedDepths); - expect(minReached).toBeGreaterThanOrEqual(N - 16); - // The cap fired (depth reached 16), producing at least one warn. + // The cap warning was emitted. expect(warnSpy).toHaveBeenCalled(); unbridges.forEach((off) => off());