Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/task-graph/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export * from "./task-graph/ITaskGraph";
export * from "./task-graph/RunContext";
export * from "./task-graph/RunScheduler";
export * from "./task-graph/StreamPump";
export * from "./task-graph/SubGraphEventBridge";
export * from "./task-graph/TaskGraph";
export * from "./task-graph/TaskGraphEvents";
export * from "./task-graph/TaskGraphRunner";
Expand Down
79 changes: 77 additions & 2 deletions packages/task-graph/src/task-graph/SubGraphEventBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,35 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { getLogger } from "@workglow/util";
import type { TaskGraph } from "./TaskGraph";

/**
* Default maximum bridge nesting depth before re-emit listeners are dropped.
* Each level installs one listener per event type and re-emits up; without a
* cap, a pathologically nested compound task (e.g. a MapTask containing a
* GraphAsTask containing a MapTask…) amplifies a single inner emit into N
* parent emits before reaching any wire subscriber, and downstream consumers
* with a bounded event log can evict legitimate events under sustained
* fan-out.
*/
const DEFAULT_MAX_BRIDGE_DEPTH = 16;

/**
* Symbol-keyed marker we attach to each subgraph so nested calls can derive
* the current depth from the parent without changing call sites. Stored on a
* symbol to avoid colliding with any user-set property.
*/
const BRIDGE_DEPTH = Symbol.for("@workglow/task-graph/SubGraphEventBridge.depth");

/**
* Track parent graphs that have already emitted the depth-cap warning so a
* pathologically nested compound (e.g. an iterator with 1k iterations all at
* over-cap) does not spam one warn per iteration. Held weakly so an evicted
* parent graph can still be garbage-collected.
*/
const warnedParents = new WeakSet<TaskGraph>();

/**
* Forward a subgraph's per-task events (task_complete, task_progress, and the
* task_stream_* trio) up to the parent graph, so tasks nested inside a compound
Expand All @@ -14,16 +41,55 @@ import type { TaskGraph } from "./TaskGraph";
* and progress. Bubbles recursively: a nested compound forwards its own
* subgraph to its parent, which forwards onward.
*
* Depth is tracked on the parent graph via a symbol-keyed marker so callers do
* not need to thread a counter through. Once `depth >= maxDepth` the call
* degrades to a no-op (with a single warn log) — see {@link DEFAULT_MAX_BRIDGE_DEPTH}.
*
* @returns a teardown that unsubscribes every bridged listener. Callers MUST
* invoke it in a `finally` so a rejecting/aborted/early-terminated subgraph
* run cannot leak subscriptions (which would double-emit on a later run).
*/
export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskGraph): () => void {
export function bridgeSubGraphTaskEvents(
subGraph: TaskGraph,
parentGraph: TaskGraph,
depth: number = (parentGraph as unknown as Record<symbol, number>)[BRIDGE_DEPTH] ?? 0,
maxDepth: number = DEFAULT_MAX_BRIDGE_DEPTH
): () => void {
// A subgraph bridging to itself would re-emit each event back onto the same
// graph it just observed, looping forever. This cannot arise from normal
// composition (a compound task's subGraph and parentGraph are distinct
// instances) but guard anyway so a malformed hierarchy degrades to a no-op.
if (subGraph === parentGraph) return () => {};
if (depth >= maxDepth) {
// Stamp the subgraph at the cap so any nested bridge call (whose
// parentGraph is this subgraph) derives `depth >= maxDepth` and also
// short-circuits — otherwise the depth counter resets at the next level
// and the cap leaks downstream.
const subGraphAsRecord = subGraph as unknown as Record<symbol, number>;
const previousDepth = subGraphAsRecord[BRIDGE_DEPTH];
subGraphAsRecord[BRIDGE_DEPTH] = maxDepth;
if (!warnedParents.has(parentGraph)) {
warnedParents.add(parentGraph);
getLogger().warn("bridgeSubGraphTaskEvents depth cap hit; dropping bridge", {
depth,
maxDepth,
});
}
return () => {
if (previousDepth === undefined) {
delete subGraphAsRecord[BRIDGE_DEPTH];
} else {
subGraphAsRecord[BRIDGE_DEPTH] = previousDepth;
}
};
}

// Stamp the subgraph with its bridge depth so any nested bridge call (whose
// parentGraph is this subgraph) derives `depth + 1` automatically.
const subGraphWithDepth = subGraph as unknown as Record<symbol, number>;
const previousDepth = subGraphWithDepth[BRIDGE_DEPTH];
subGraphWithDepth[BRIDGE_DEPTH] = depth + 1;

const offs = [
subGraph.subscribe("task_complete", (id, out) => parentGraph.emit("task_complete", id, out)),
subGraph.subscribe("task_progress", (id, p, m, ...a) =>
Expand All @@ -37,5 +103,14 @@ export function bridgeSubGraphTaskEvents(subGraph: TaskGraph, parentGraph: TaskG
parentGraph.emit("task_stream_end", id, out)
),
];
return () => offs.forEach((off) => off());
return () => {
offs.forEach((off) => off());
// Restore the previous depth marker so a later, independently-rooted bridge
// of the same subgraph instance does not inherit a stale counter.
if (previousDepth === undefined) {
delete subGraphWithDepth[BRIDGE_DEPTH];
} else {
subGraphWithDepth[BRIDGE_DEPTH] = previousDepth;
}
};
}
192 changes: 191 additions & 1 deletion packages/test/src/test/task-graph/TaskCompleteEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import type { IExecuteContext, StreamEvent } from "@workglow/task-graph";
import {
bridgeSubGraphTaskEvents,
Dataflow,
FallbackTask,
GraphAsTask,
Expand All @@ -16,8 +17,10 @@ import {
WhileTask,
Workflow,
} from "@workglow/task-graph";
import type { ILogger } from "@workglow/util";
import { NullLogger, setLogger } from "@workglow/util";
import type { DataPortSchema } from "@workglow/util/schema";
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";

class TCEAddOne extends Task<{ value: number }, { value: number }> {
static override readonly type = "TCEAddOne";
Expand Down Expand Up @@ -398,3 +401,190 @@ describe("task_complete graph event", () => {
expect(seen.filter((id) => id === "leaky")).toHaveLength(1);
});
});

describe("bridgeSubGraphTaskEvents depth cap", () => {
// Each test installs a stub logger and restores a NullLogger after, so warn
// spies do not leak into other tests (and we never emit to the real console).
let warnSpy: ReturnType<typeof vi.fn>;

function installStubLogger(): void {
warnSpy = vi.fn();
const stub: ILogger = {
debug: () => {},
info: () => {},
warn: warnSpy as unknown as ILogger["warn"],
error: () => {},
fatal: () => {},
child() {
return stub;
},
time: () => {},
timeEnd: () => {},
group: () => {},
groupEnd: () => {},
};
setLogger(stub);
}

afterEach(() => {
setLogger(new NullLogger());
});

it("stops bridging past maxDepth (default 16)", () => {
installStubLogger();
// Build a chain of TaskGraphs: g0 (root) -> g1 -> ... -> g17 (innermost).
// Bridging is chained so each gK forwards into gK-1, mirroring how a stack
// of GraphAsTask wrappers nests bridges at run time.
const N = 17;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());

const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
// graphs[i] is the inner subgraph; graphs[i-1] is its parent.
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1]));
}

// Track all task IDs that reach the root (graphs[0]).
const rootIds: string[] = [];
graphs[0].subscribe("task_progress", (id) => rootIds.push(String(id)));

// graphs[16] is bridged at depth 15 (< 16), so its events propagate
// through all 16 hops and must reach the root.
graphs[16].emit("task_progress", "at-cap", 1, "msg");
expect(rootIds).toContain("at-cap");

// graphs[17] is where the depth hits 16 (= maxDepth), so that bridge is
// dropped; its events must NOT reach the root.
graphs[17].emit("task_progress", "over-cap", 1, "msg");
expect(rootIds).not.toContain("over-cap");

// The cap warning was emitted.
expect(warnSpy).toHaveBeenCalled();

unbridges.forEach((off) => off());
});

it("logs warning with depth fields when the cap is hit", () => {
installStubLogger();
const N = 18;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());
const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1]));
}

expect(warnSpy).toHaveBeenCalled();
const firstCall = warnSpy.mock.calls[0];
expect(firstCall[0]).toBe("bridgeSubGraphTaskEvents depth cap hit; dropping bridge");
expect(firstCall[1]).toEqual({ depth: 16, maxDepth: 16 });

unbridges.forEach((off) => off());
});

it("accepts a custom maxDepth override", () => {
installStubLogger();
const N = 6;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());

const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
// Explicitly pass maxDepth=4 to each level. Depth is derived from the
// parent's stamped marker, so the first call to exceed the cap is the
// bridge whose computed depth is 4.
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, 4));
}

expect(warnSpy).toHaveBeenCalled();
const firstCall = warnSpy.mock.calls.find(
(c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge"
);
expect(firstCall).toBeDefined();
expect(firstCall![1]).toEqual({ depth: 4, maxDepth: 4 });

unbridges.forEach((off) => off());
});

it("stamped over-cap subgraph prevents downstream bridges from restarting depth", () => {
installStubLogger();
// Build a 5-level chain with maxDepth=2. Bridges install left-to-right
// (outermost first). The first call to exceed the cap MUST stamp the
// subgraph so the next bridge (whose parent is that subgraph) also sees
// depth >= maxDepth — otherwise the parent's BRIDGE_DEPTH reads as
// undefined, depth resets to 0, and the cap leaks downstream.
const maxDepth = 2;
const N = 5;
const graphs: TaskGraph[] = [];
for (let i = 0; i <= N; i++) graphs.push(new TaskGraph());

const reached: number[] = [];
for (let i = 0; i < graphs.length; i++) {
const depth = i;
graphs[i].subscribe("task_progress", () => {
reached.push(depth);
});
}

const unbridges: Array<() => void> = [];
for (let i = 1; i <= N; i++) {
unbridges.push(bridgeSubGraphTaskEvents(graphs[i], graphs[i - 1], undefined, maxDepth));
}

// graphs[1] and graphs[2] install (depth 0, 1). graphs[3] hits cap and
// stamps itself at maxDepth so graphs[4] and graphs[5] also no-op. Emit
// from the innermost graph: only its own subscriber should fire — every
// bridge i=3..5 short-circuited, so no event propagates outward at all.
graphs[N].emit("task_progress", "inner-id", 42, "msg");

// No bridge from level 2 onward installed, so only the innermost graph
// (depth N) saw the emit. depth=0 (outermost) must NOT be reached.
expect(reached).toEqual([N]);
expect(reached).not.toContain(0);

unbridges.forEach((off) => off());
});

it("warn fires exactly once per parentGraph at over-cap", () => {
installStubLogger();
const parent = new TaskGraph();
// Call 50 times with the same parent at over-cap (depth seeded above cap).
const teardowns: Array<() => void> = [];
for (let i = 0; i < 50; i++) {
const sub = new TaskGraph();
teardowns.push(bridgeSubGraphTaskEvents(sub, parent, 16, 16));
}

const capCalls = warnSpy.mock.calls.filter(
(c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge"
);
expect(capCalls).toHaveLength(1);

teardowns.forEach((off) => off());
});

it("distinct parents at over-cap each warn once", () => {
installStubLogger();
const parentA = new TaskGraph();
const parentB = new TaskGraph();
const subA = new TaskGraph();
const subB = new TaskGraph();

const off1 = bridgeSubGraphTaskEvents(subA, parentA, 16, 16);
const off2 = bridgeSubGraphTaskEvents(subB, parentB, 16, 16);
// Repeat each parent a few more times — should not produce additional warns.
const off3 = bridgeSubGraphTaskEvents(new TaskGraph(), parentA, 16, 16);
const off4 = bridgeSubGraphTaskEvents(new TaskGraph(), parentB, 16, 16);

const capCalls = warnSpy.mock.calls.filter(
(c) => c[0] === "bridgeSubGraphTaskEvents depth cap hit; dropping bridge"
);
expect(capCalls).toHaveLength(2);

off1();
off2();
off3();
off4();
});
});