diff --git a/packages/task-graph/src/common.ts b/packages/task-graph/src/common.ts index b995546c3..eaa1f2ced 100644 --- a/packages/task-graph/src/common.ts +++ b/packages/task-graph/src/common.ts @@ -16,6 +16,7 @@ export * from "./task-graph/ITaskGraph"; export * from "./task-graph/RunContext"; export * from "./task-graph/RunScheduler"; export * from "./task-graph/StreamPump"; +export * from "./task-graph/SubGraphEventBridge"; export * from "./task-graph/TaskGraph"; export * from "./task-graph/TaskGraphEvents"; export * from "./task-graph/TaskGraphRunner"; diff --git a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts index 24d199f4b..a6744f888 100644 --- a/packages/task-graph/src/task-graph/SubGraphEventBridge.ts +++ b/packages/task-graph/src/task-graph/SubGraphEventBridge.ts @@ -4,8 +4,35 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { getLogger } from "@workglow/util"; import type { TaskGraph } from "./TaskGraph"; +/** + * Default maximum bridge nesting depth before re-emit listeners are dropped. + * Each level installs one listener per event type and re-emits up; without a + * cap, a pathologically nested compound task (e.g. a MapTask containing a + * GraphAsTask containing a MapTask…) amplifies a single inner emit into N + * parent emits before reaching any wire subscriber, and downstream consumers + * with a bounded event log can evict legitimate events under sustained + * fan-out. + */ +const DEFAULT_MAX_BRIDGE_DEPTH = 16; + +/** + * Symbol-keyed marker we attach to each subgraph so nested calls can derive + * the current depth from the parent without changing call sites. Stored on a + * symbol to avoid colliding with any user-set property. + */ +const BRIDGE_DEPTH = Symbol.for("@workglow/task-graph/SubGraphEventBridge.depth"); + +/** + * Track parent graphs that have already emitted the depth-cap warning so a + * pathologically nested compound (e.g. an iterator with 1k iterations all at + * over-cap) does not spam one warn per iteration. Held weakly so an evicted + * parent graph can still be garbage-collected. + */ +const warnedParents = new WeakSet(); + /** * Forward a subgraph's per-task events (task_complete, task_progress, and the * task_stream_* trio) up to the parent graph, so tasks nested inside a compound @@ -14,16 +41,55 @@ import type { TaskGraph } from "./TaskGraph"; * and progress. Bubbles recursively: a nested compound forwards its own * subgraph to its parent, which forwards onward. * + * Depth is tracked on the parent graph via a symbol-keyed marker so callers do + * not need to thread a counter through. Once `depth >= maxDepth` the call + * degrades to a no-op (with a single warn log) — see {@link DEFAULT_MAX_BRIDGE_DEPTH}. + * * @returns a teardown that unsubscribes every bridged listener. Callers MUST * invoke it in a `finally` so a rejecting/aborted/early-terminated subgraph * run cannot leak subscriptions (which would double-emit on a later run). */ -export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void { +export function bridgeSubGraphTaskEvents( + subGraph: TaskGraph, + parentGraph: TaskGraph, + depth: number = (parentGraph as unknown as Record)[BRIDGE_DEPTH] ?? 0, + maxDepth: number = DEFAULT_MAX_BRIDGE_DEPTH +): () => void { // A subgraph bridging to itself would re-emit each event back onto the same // graph it just observed, looping forever. This cannot arise from normal // composition (a compound task's subGraph and parentGraph are distinct // instances) but guard anyway so a malformed hierarchy degrades to a no-op. if (subGraph === parentGraph) return () => {}; + if (depth >= maxDepth) { + // Stamp the subgraph at the cap so any nested bridge call (whose + // parentGraph is this subgraph) derives `depth >= maxDepth` and also + // short-circuits — otherwise the depth counter resets at the next level + // and the cap leaks downstream. + const subGraphAsRecord = subGraph as unknown as Record; + const previousDepth = subGraphAsRecord[BRIDGE_DEPTH]; + subGraphAsRecord[BRIDGE_DEPTH] = maxDepth; + if (!warnedParents.has(parentGraph)) { + warnedParents.add(parentGraph); + getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", { + depth, + maxDepth, + }); + } + return () => { + if (previousDepth === undefined) { + delete subGraphAsRecord[BRIDGE_DEPTH]; + } else { + subGraphAsRecord[BRIDGE_DEPTH] = previousDepth; + } + }; + } + + // Stamp the subgraph with its bridge depth so any nested bridge call (whose + // parentGraph is this subgraph) derives `depth + 1` automatically. + const subGraphWithDepth = subGraph as unknown as Record; + const previousDepth = subGraphWithDepth[BRIDGE_DEPTH]; + subGraphWithDepth[BRIDGE_DEPTH] = depth + 1; + const offs = [ subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)), subGraph.subscribe("task_progress", (id, p, m, ...a) => @@ -37,5 +103,14 @@ export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskG parentGraph.emit("task_stream_end", id, out) ), ]; - return () => offs.forEach((off) => off()); + return () => { + offs.forEach((off) => off()); + // Restore the previous depth marker so a later, independently-rooted bridge + // of the same subgraph instance does not inherit a stale counter. + if (previousDepth === undefined) { + delete subGraphWithDepth[BRIDGE_DEPTH]; + } else { + subGraphWithDepth[BRIDGE_DEPTH] = previousDepth; + } + }; } diff --git a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts index 380720f31..6d92ddf8a 100644 --- a/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts +++ b/packages/test/src/test/task-graph/TaskCompleteEvent.test.ts @@ -6,6 +6,7 @@ import type { IExecuteContext, StreamEvent } from "@workglow/task-graph"; import { + bridgeSubGraphTaskEvents, Dataflow, FallbackTask, GraphAsTask, @@ -16,8 +17,10 @@ import { WhileTask, Workflow, } from "@workglow/task-graph"; +import type { ILogger } from "@workglow/util"; +import { NullLogger, setLogger } from "@workglow/util"; import type { DataPortSchema } from "@workglow/util/schema"; -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; class TCEAddOne extends Task<{ value: number }, { value: number }> { static override readonly type = "TCEAddOne"; @@ -398,3 +401,190 @@ describe("task_complete graph event", () => { expect(seen.filter((id) => id === "leaky")).toHaveLength(1); }); }); + +describe("bridgeSubGraphTaskEvents depth cap", () => { + // Each test installs a stub logger and restores a NullLogger after, so warn + // spies do not leak into other tests (and we never emit to the real console). + let warnSpy: ReturnType; + + function installStubLogger(): void { + warnSpy = vi.fn(); + const stub: ILogger = { + debug: () => {}, + info: () => {}, + warn: warnSpy as unknown as ILogger["warn"], + error: () => {}, + fatal: () => {}, + child() { + return stub; + }, + time: () => {}, + timeEnd: () => {}, + group: () => {}, + groupEnd: () => {}, + }; + setLogger(stub); + } + + afterEach(() => { + setLogger(new NullLogger()); + }); + + it("stops bridging past maxDepth (default 16)", () => { + installStubLogger(); + // Build a chain of TaskGraphs: g0 (root) -> g1 -> ... -> g17 (innermost). + // Bridging is chained so each gK forwards into gK-1, mirroring how a stack + // of GraphAsTask wrappers nests bridges at run time. + const N = 17; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + // graphs[i] is the inner subgraph; graphs[i-1] is its parent. + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); + } + + // Track all task IDs that reach the root (graphs[0]). + const rootIds: string[] = []; + graphs[0].subscribe("task_progress", (id) => rootIds.push(String(id))); + + // graphs[16] is bridged at depth 15 (< 16), so its events propagate + // through all 16 hops and must reach the root. + graphs[16].emit("task_progress", "at-cap", 1, "msg"); + expect(rootIds).toContain("at-cap"); + + // graphs[17] is where the depth hits 16 (= maxDepth), so that bridge is + // dropped; its events must NOT reach the root. + graphs[17].emit("task_progress", "over-cap", 1, "msg"); + expect(rootIds).not.toContain("over-cap"); + + // The cap warning was emitted. + expect(warnSpy).toHaveBeenCalled(); + + unbridges.forEach((off) => off()); + }); + + it("logs warning with depth fields when the cap is hit", () => { + installStubLogger(); + const N = 18; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1])); + } + + expect(warnSpy).toHaveBeenCalled(); + const firstCall = warnSpy.mock.calls[0]; + expect(firstCall[0]).toBe("bridgeSubGraphTaskEvents depth cap hit; dropping bridge"); + expect(firstCall[1]).toEqual({ depth: 16, maxDepth: 16 }); + + unbridges.forEach((off) => off()); + }); + + it("accepts a custom maxDepth override", () => { + installStubLogger(); + const N = 6; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + // Explicitly pass maxDepth=4 to each level. Depth is derived from the + // parent's stamped marker, so the first call to exceed the cap is the + // bridge whose computed depth is 4. + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, 4)); + } + + expect(warnSpy).toHaveBeenCalled(); + const firstCall = warnSpy.mock.calls.find( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(firstCall).toBeDefined(); + expect(firstCall![1]).toEqual({ depth: 4, maxDepth: 4 }); + + unbridges.forEach((off) => off()); + }); + + it("stamped over-cap subgraph prevents downstream bridges from restarting depth", () => { + installStubLogger(); + // Build a 5-level chain with maxDepth=2. Bridges install left-to-right + // (outermost first). The first call to exceed the cap MUST stamp the + // subgraph so the next bridge (whose parent is that subgraph) also sees + // depth >= maxDepth — otherwise the parent's BRIDGE_DEPTH reads as + // undefined, depth resets to 0, and the cap leaks downstream. + const maxDepth = 2; + const N = 5; + const graphs: TaskGraph[] = []; + for (let i = 0; i <= N; i++) graphs.push(new TaskGraph()); + + const reached: number[] = []; + for (let i = 0; i < graphs.length; i++) { + const depth = i; + graphs[i].subscribe("task_progress", () => { + reached.push(depth); + }); + } + + const unbridges: Array<() => void> = []; + for (let i = 1; i <= N; i++) { + unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, maxDepth)); + } + + // graphs[1] and graphs[2] install (depth 0, 1). graphs[3] hits cap and + // stamps itself at maxDepth so graphs[4] and graphs[5] also no-op. Emit + // from the innermost graph: only its own subscriber should fire — every + // bridge i=3..5 short-circuited, so no event propagates outward at all. + graphs[N].emit("task_progress", "inner-id", 42, "msg"); + + // No bridge from level 2 onward installed, so only the innermost graph + // (depth N) saw the emit. depth=0 (outermost) must NOT be reached. + expect(reached).toEqual([N]); + expect(reached).not.toContain(0); + + unbridges.forEach((off) => off()); + }); + + it("warn fires exactly once per parentGraph at over-cap", () => { + installStubLogger(); + const parent = new TaskGraph(); + // Call 50 times with the same parent at over-cap (depth seeded above cap). + const teardowns: Array<() => void> = []; + for (let i = 0; i < 50; i++) { + const sub = new TaskGraph(); + teardowns.push(bridgeSubGraphTaskEvents(sub, parent, 16, 16)); + } + + const capCalls = warnSpy.mock.calls.filter( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(capCalls).toHaveLength(1); + + teardowns.forEach((off) => off()); + }); + + it("distinct parents at over-cap each warn once", () => { + installStubLogger(); + const parentA = new TaskGraph(); + const parentB = new TaskGraph(); + const subA = new TaskGraph(); + const subB = new TaskGraph(); + + const off1 = bridgeSubGraphTaskEvents(subA, parentA, 16, 16); + const off2 = bridgeSubGraphTaskEvents(subB, parentB, 16, 16); + // Repeat each parent a few more times — should not produce additional warns. + const off3 = bridgeSubGraphTaskEvents(new TaskGraph(), parentA, 16, 16); + const off4 = bridgeSubGraphTaskEvents(new TaskGraph(), parentB, 16, 16); + + const capCalls = warnSpy.mock.calls.filter( + (c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge" + ); + expect(capCalls).toHaveLength(2); + + off1(); + off2(); + off3(); + off4(); + }); +});