diff --git a/.server-changes/mollifier-drainer-replay.md b/.server-changes/mollifier-drainer-replay.md new file mode 100644 index 0000000000..a8ff2d4bb0 --- /dev/null +++ b/.server-changes/mollifier-drainer-replay.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 60c234402d..9996eb7b30 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server"; import { PassThrough } from "stream"; import * as Worker from "~/services/worker.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; +import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server"; import { bootstrap } from "./bootstrap"; import { LocaleContextProvider } from "./components/primitives/LocaleProvider"; import { @@ -228,6 +229,7 @@ Worker.init().catch((error) => { }); initMollifierDrainerWorker(); +initMollifierStaleSweepWorker(); bootstrap().catch((error) => { logError(error); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 9e4743d740..a7e9cdef25 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z // Separate switch for the drainer (consumer side) so it can be split // off onto a dedicated worker service. Unset → inherits // TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to - // flip two switches. In multi-replica deployments, set this to "0" - // explicitly on every replica except the one dedicated drainer - // service — otherwise every replica's polling loop races for the - // same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill - // switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a - // no-op because the gate-side singleton refuses to construct a - // buffer when the system is off. + // flip two switches. Multi-replica drainers are correct — `popAndMarkDraining` + // is an atomic ZPOPMIN + status flip in one Lua call, so only one replica + // can win any given entry — but inefficient: polling load (SMEMBERS + + // per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY` + // is per-process so engine load also multiplies. Splitting the drainer + // onto a dedicated worker keeps that traffic off the request-serving + // replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch; + // setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a + // no-op because the gate-side singleton refuses to construct a buffer + // when the system is off. TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"), TRIGGER_MOLLIFIER_REDIS_HOST: z @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500), + // Periodic sweep that scans buffer queue LISTs for entries whose + // dwell exceeds the stale threshold. Independent of the drainer — + // its job is exactly to make a stuck/offline drainer visible to + // ops. Defaults: enabled when the mollifier is enabled, run every + // 5 minutes, alert on anything that's been dwelling for 5+ minutes + // (matches the sweep interval — "anything still here when we + // check" is the simplest threshold that converges). + TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z + .string() + .default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), + TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), + TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce .number() diff --git a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts index 5f985b684c..a26f6c3e74 100644 --- a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEventRepository } from "~/v3/eventRepository/index.server"; +import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server"; import { DefaultQueueManager } from "../concerns/queues.server"; import type { TriggerTaskRequest } from "../types"; @@ -176,6 +177,14 @@ export class TriggerFailedTaskService { event.setAttribute("runId", failedRunFriendlyId); event.failWithError(taskRunError); + // `emitRunFailedEvent: false` because this call site owns the + // trace-event lifecycle via the outer `traceEvent({ + // incomplete: false, isError: true })`. Letting the engine + // emit `runFailed` here would race the + // `completeFailedRunEvent` listener against the outer trace + // event's own completion write for the same (traceId, spanId). + // We re-trigger the alerts side directly after the trace + // event closes, below. return await this.engine.createFailedTaskRun({ friendlyId: failedRunFriendlyId, environment: { @@ -200,12 +209,30 @@ export class TriggerFailedTaskService { spanId: event.spanId, traceContext: traceContext as Record, taskEventStore: store, + emitRunFailedEvent: false, ...(queueName !== undefined && { queue: queueName }), ...(lockedQueueId !== undefined && { lockedQueueId }), }); } ); + // Alerts side of `runFailed` — the engine emit was suppressed + // above so the trace-event completion isn't double-written; we + // still need the alert pipeline to fire so customers' ERROR + // channels see the failure. Best-effort: a failed enqueue logs + // but doesn't block returning the friendlyId, mirroring the + // engine handler's behaviour at runEngineHandlers.server.ts:81. + try { + await PerformTaskRunAlertsService.enqueue(failedRun.id); + } catch (alertsError) { + logger.warn("TriggerFailedTaskService: alert enqueue failed", { + taskId: request.taskId, + friendlyId: failedRun.friendlyId, + error: + alertsError instanceof Error ? alertsError.message : String(alertsError), + }); + } + return failedRun.friendlyId; } catch (createError) { const createErrorMsg = diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts index 486449f8e1..fc75210be3 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts @@ -1,10 +1,15 @@ -import { createHash } from "node:crypto"; -import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker"; +import { MollifierDrainer } from "@trigger.dev/redis-worker"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; +import { engine as runEngine } from "~/v3/runEngine.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { getMollifierBuffer } from "./mollifierBuffer.server"; -import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server"; +import { + createDrainerHandler, + isRetryablePgError, +} from "./mollifierDrainerHandler.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; // Distinct error class for the deterministic "fail loud at boot" throws // below. The bootstrap in `mollifierDrainerWorker.server.ts` catches @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error { } } -function initializeMollifierDrainer(): MollifierDrainer { +function initializeMollifierDrainer(): MollifierDrainer { const buffer = getMollifierBuffer(); if (!buffer) { // Unreachable in normal config: getMollifierDrainer() gates on the @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, }); - // No-op ack handler: the trigger has ALREADY been written to Postgres - // via engine.trigger (dual-write at the call site). Popping + acking - // here proves the dequeue mechanism works end-to-end without duplicating - // the work. A later change replaces this with an engine.trigger replay - // that performs the actual Postgres write. - const drainer = new MollifierDrainer({ + const drainer = new MollifierDrainer({ buffer, - handler: async (input) => { - // Hash the (re-serialised, canonical) payload on the drain side rather - // than on the trigger hot path. Burst-time CPU stays with engine.trigger; - // the drainer is the natural place for the audit-equivalence checksum. - // Re-serialisation is identity for the BufferedTriggerPayload shape - // (only strings/numbers/plain objects), so this hash matches what the - // call site wrote into Redis. - const reserialised = serialiseSnapshot(input.payload); - const payloadHash = createHash("sha256").update(reserialised).digest("hex"); - logger.info("mollifier.drained", { - runId: input.runId, - envId: input.envId, - orgId: input.orgId, - taskId: input.payload.taskId, - attempts: input.attempts, - ageMs: Date.now() - input.createdAt.getTime(), - payloadBytes: reserialised.length, - payloadHash, - }); - }, + handler: createDrainerHandler({ engine: runEngine, prisma }), concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY, maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK, - // A no-op handler shouldn't throw, but if something does (e.g. an - // unexpected deserialise failure), don't loop — let it FAIL terminally - // so the entry is observable in metrics. - isRetryable: () => false, + isRetryable: isRetryablePgError, }); return drainer; @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer // handler registration, leaving a narrow window where a SIGTERM landing // between `start()` and `process.once("SIGTERM", ...)` would skip the // graceful stop. The split is intentional. -export function getMollifierDrainer(): MollifierDrainer | null { +export function getMollifierDrainer(): MollifierDrainer | null { if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null; return singleton("mollifierDrainer", initializeMollifierDrainer); } diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts new file mode 100644 index 0000000000..59d21180cc --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -0,0 +1,228 @@ +import { context, trace, TraceFlags } from "@opentelemetry/api"; +import type { RunEngine } from "@internal/run-engine"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; +import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker"; +import { startSpan } from "~/v3/tracing.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; + +const tracer = trace.getTracer("mollifier-drainer"); + +export function isRetryablePgError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const msg = err.message ?? ""; + // Prisma surfaces P1001 ("Can't reach database server") via two + // different error classes — `PrismaClientKnownRequestError` exposes + // it as `err.code`, `PrismaClientInitializationError` exposes it as + // `err.errorCode`. Check both so reconnection-time errors retry + // regardless of which class fires. + const code = (err as { code?: string }).code; + const errorCode = (err as { errorCode?: string }).errorCode; + if (code === "P2024") return true; + if (code === "P1001" || errorCode === "P1001") return true; + if (msg.includes("Can't reach database server")) return true; + if (msg.includes("Connection lost")) return true; + if (msg.includes("ECONNRESET")) return true; + return false; +} + +export function createDrainerHandler(deps: { + engine: RunEngine; + prisma: PrismaClientOrTransaction; +}): MollifierDrainerHandler { + return async (input) => { + const dwellMs = Date.now() - input.createdAt.getTime(); + + // Re-attach to the trace started by the caller's mollifier.queued span + // (its traceId + spanId were captured into the snapshot at buffer time). + // Without this the drainer would emit mollifier.drained in a brand-new + // trace and the engine.trigger instrumentation would inherit an empty + // active context — leaving the run-detail page with only the root span. + const snapshotTraceId = + typeof input.payload.traceId === "string" ? input.payload.traceId : undefined; + const snapshotSpanId = + typeof input.payload.spanId === "string" ? input.payload.spanId : undefined; + + const parentContext = + snapshotTraceId && snapshotSpanId + ? trace.setSpanContext(context.active(), { + traceId: snapshotTraceId, + spanId: snapshotSpanId, + traceFlags: TraceFlags.SAMPLED, + isRemote: true, + }) + : context.active(); + + // Cancel-wins-over-trigger. If a cancel API call + // landed on this entry while it was QUEUED, the snapshot carries + // `cancelledAt` + `cancelReason`. Skip the normal materialise path + // and write a CANCELED PG row directly. The existing runCancelled + // handler writes the TaskEvent. + const cancelledAtStr = + typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined; + if (cancelledAtStr) { + const cancelReason = + typeof input.payload.cancelReason === "string" + ? input.payload.cancelReason + : "Canceled by user"; + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained.cancelled", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("mollifier.cancel_bifurcation", true); + span.setAttribute("taskRunId", input.runId); + try { + await deps.engine.createCancelledRun( + { + snapshot: input.payload as any, + cancelledAt: new Date(cancelledAtStr), + cancelReason, + }, + deps.prisma, + ); + } catch (err) { + // createCancelledRun throws a conflict when the normal trigger + // replay path won the race and already materialised a live + // (non-CANCELED) row for this friendlyId. Its contract leaves + // the resolution to us: honour the cancel by actually + // cancelling the now-live run. Letting the conflict propagate + // would instead reach the drainer's terminal-failure path + // (isRetryablePgError() is false for it), buffer.fail() the + // entry, and silently lose the cancellation while the run + // keeps executing. + const isConflict = + err instanceof Error && err.message.startsWith("createCancelledRun conflict"); + if (!isConflict) { + throw err; + } + span.setAttribute("mollifier.cancel_conflict", true); + const friendlyId = + typeof input.payload.friendlyId === "string" + ? input.payload.friendlyId + : input.runId; + await deps.engine.cancelRun({ + runId: RunId.fromFriendlyId(friendlyId), + completedAt: new Date(cancelledAtStr), + reason: cancelReason, + }); + } + }); + }); + return; + } + + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("taskRunId", input.runId); + + try { + await deps.engine.trigger(input.payload as any, deps.prisma); + } catch (err) { + // The retryable-PG class re-throws so the drainer's outer + // worker loop can `buffer.requeue` (handled in + // `MollifierDrainer.drainOne`). For non-retryable failures we + // write a terminal SYSTEM_FAILURE row to PG via the engine's + // existing `createFailedTaskRun` (used by batch-trigger for + // the same purpose) so the customer sees the run in their + // dashboard / SDK instead of silently losing it when the + // buffer entry TTLs out. If THAT insert also fails (PG truly + // unreachable), rethrow so the drainer's outer catch falls + // through to its existing `buffer.fail` terminal-marker path. + if (isRetryablePgError(err)) { + throw err; + } + const reason = err instanceof Error ? err.message : String(err); + span.setAttribute("mollifier.terminal_failure_reason", reason); + const snapshot = input.payload as Record; + const env = snapshot.environment as + | { + id: string; + type: any; + project: { id: string }; + organization: { id: string }; + } + | undefined; + if (!env) { + // Snapshot too malformed to even construct a TaskRun row. + // Drainer's outer catch will buffer.fail this entry. + throw err; + } + // Extract batch association from the snapshot if present. + // Without this, a SYSTEM_FAILURE row for a buffered batch + // child won't be linked to its batch, and the batch parent's + // completion tracking can hang indefinitely waiting on a + // child that landed but isn't visible to the batch. + const rawBatch = snapshot.batch; + const batch = + rawBatch && + typeof rawBatch === "object" && + "id" in rawBatch && + typeof (rawBatch as { id: unknown }).id === "string" && + "index" in rawBatch && + typeof (rawBatch as { index: unknown }).index === "number" + ? (rawBatch as { id: string; index: number }) + : undefined; + try { + await deps.engine.createFailedTaskRun({ + friendlyId: input.runId, + environment: env, + taskIdentifier: String(snapshot.taskIdentifier ?? ""), + payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined, + payloadType: + typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined, + error: { + type: "STRING_ERROR", + raw: `Mollifier drainer terminal failure: ${reason}`, + }, + parentTaskRunId: + typeof snapshot.parentTaskRunId === "string" + ? snapshot.parentTaskRunId + : undefined, + rootTaskRunId: + typeof snapshot.rootTaskRunId === "string" + ? snapshot.rootTaskRunId + : undefined, + depth: typeof snapshot.depth === "number" ? snapshot.depth : 0, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true, + batch, + traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined, + spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, + taskEventStore: + typeof snapshot.taskEventStore === "string" + ? snapshot.taskEventStore + : undefined, + queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined, + lockedQueueId: + typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined, + }); + } catch (writeErr) { + // The terminal SYSTEM_FAILURE write itself failed. If it + // failed because PG is transiently unreachable, rethrow the + // *write* error so the drainer requeues — buffer.fail()ing on + // the original non-retryable error would lose the run with no + // PG row ever landing. Once PG recovers the requeued entry + // writes its failure row and the customer sees it. + if (isRetryablePgError(writeErr)) { + span.setAttribute("mollifier.terminal_write_retryable", true); + throw writeErr; + } + // PG reachable but the write was rejected for another reason + // (genuinely bad snapshot). Rethrow the original trigger error + // so the drainer falls back to buffer.fail. + span.setAttribute( + "mollifier.terminal_write_error", + writeErr instanceof Error ? writeErr.message : String(writeErr) + ); + throw err; + } + } + }); + }); + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts new file mode 100644 index 0000000000..e77b0916b5 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts @@ -0,0 +1,233 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { logger as defaultLogger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +import { MollifierStaleSweepState, type StaleSweepStateStore } from "./mollifierStaleSweepState.server"; +import { + recordStaleEntry as defaultRecordStaleEntry, + reportStaleEntrySnapshot as defaultReportStaleEntrySnapshot, +} from "./mollifierTelemetry.server"; + +// One pass of the sweep scans a bounded slice of orgs from the buffer's +// queue LIST, identified by a durable cursor in Redis. Per-env entry +// scan is also bounded so a single pathological env can't extend the +// pass. +const DEFAULT_MAX_ENTRIES_PER_ENV = 1000; +// Max orgs visited per tick. Together with `maxEntriesPerEnv` this +// caps Redis traffic per pass. One "cycle" (visiting every org once) +// takes `ceil(N_orgs / cap)` ticks, after which the cursor wraps and a +// fresh org list is taken. +const DEFAULT_MAX_ORGS_PER_PASS = 100; + +export type StaleSweepConfig = { + // Entries whose dwell exceeds this threshold are flagged stale. Set + // it well below `entryTtlSeconds * 1000` so ops have lead time before + // TTL-induced silent loss; the default (half of entryTtlSeconds) + // matches the cadence in the plan doc. + staleThresholdMs: number; + maxEntriesPerEnv?: number; + // Hard cap on orgs visited per tick. Bounds the per-pass Redis traffic + // and wall-time. Default 100 — at typical fleet sizes one or two + // ticks cover everyone; under incident-scale fan-out a full cycle + // takes a handful of ticks (~minutes) which is still well below the + // staleness signal latency that ops cares about. + maxOrgsPerPass?: number; +}; + +export type StaleSweepDeps = { + getBuffer?: () => MollifierBuffer | null; + // Durable cursor + per-env counts hash. Required: the sweep is + // useless without persistent state across ticks. The webapp wires up + // a real `MollifierStaleSweepState`; tests pass one constructed + // against the test container. + state: StaleSweepStateStore; + // No `envId` arg — `envId` is a high-cardinality metric attribute and + // is intentionally not emitted as a metric label. The structured warn + // log below carries envId for forensic drill-down. + recordStaleEntry?: () => void; + reportStaleEntrySnapshot?: (snapshot: Map) => void; + logger?: { warn: (message: string, fields: Record) => void }; + now?: () => number; +}; + +export type StaleSweepResult = { + orgsScanned: number; + envsScanned: number; + entriesScanned: number; + staleCount: number; +}; + +// Walks a bounded slice of `orgs → envs → entries`, emitting an OTel +// counter tick and a structured warning log for each buffer entry whose +// dwell exceeds the stale threshold. Read-only on the buffer's own +// state; writes only to the sweep's three dedicated keys +// (`mollifier:stale_sweep:*`). The sweep does NOT remove or salvage +// buffer entries; that decision is deferred to a separate retention- +// policy change. The signal here exists so ops sees the drainer falling +// behind well before TTL-induced loss kicks in. +// +// Sharding contract: +// - Cursor starts at 0. On cursor=0 the org list is refreshed by +// snapshotting `buffer.listOrgs()` into the durable LIST — that is +// the cycle's frozen view of orgs to visit. +// - Each tick consumes up to `maxOrgsPerPass` orgs from the LIST, +// advances the cursor, and persists. +// - When the cursor reaches the end of the LIST it wraps to 0; the next +// tick rebuilds the org list, capturing any orgs that joined the +// buffer mid-cycle. +// - The per-env counts HASH carries over across ticks: an env visited +// on tick N and not revisited until tick N+M keeps its last-known +// stale count in the gauge for that window. This is the price of +// sharding — accepted because the alternative (re-scan everything +// every tick) does not bound work. +export async function runStaleSweepOnce( + config: StaleSweepConfig, + deps: StaleSweepDeps, +): Promise { + const getBuffer = deps.getBuffer ?? getMollifierBuffer; + const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry; + const reportSnapshot = + deps.reportStaleEntrySnapshot ?? defaultReportStaleEntrySnapshot; + const log = deps.logger ?? defaultLogger; + const now = (deps.now ?? Date.now)(); + const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV; + const maxOrgsPerPass = config.maxOrgsPerPass ?? DEFAULT_MAX_ORGS_PER_PASS; + + const buffer = getBuffer(); + if (!buffer) { + // Replace any previous snapshot with empty so a previously-paging + // env doesn't stay latched if mollifier is turned off mid-flight. + // Also clear the durable state so a re-enable starts from a clean + // slate instead of resuming on a stale cursor. + await deps.state.clearAll(); + reportSnapshot(new Map()); + return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 }; + } + + let cursor = await deps.state.readCursor(); + if (cursor === 0) { + // Fresh cycle — capture the current set of orgs into the frozen + // LIST. Any orgs that join after this snapshot wait until the next + // cycle to be visited. Acceptable for an observational sweep; the + // staleness signal would only fire on entries that have been + // dwelling for `staleThresholdMs` anyway, so they're not new. + const orgs = await buffer.listOrgs(); + await deps.state.rebuildOrgList(orgs); + } + + const { orgs: slice, total } = await deps.state.readOrgListSlice( + cursor, + maxOrgsPerPass, + ); + + let envsScanned = 0; + let entriesScanned = 0; + let staleCount = 0; + + for (const orgId of slice) { + const envs = await buffer.listEnvsForOrg(orgId); + for (const envId of envs) { + envsScanned += 1; + let envStale = 0; + const entries = await buffer.listEntriesForEnv(envId, maxEntries); + for (const entry of entries) { + entriesScanned += 1; + const dwellMs = now - entry.createdAt.getTime(); + if (dwellMs > config.staleThresholdMs) { + recordStale(); + log.warn("mollifier.stale_entry", { + runId: entry.runId, + envId, + orgId, + dwellMs, + staleThresholdMs: config.staleThresholdMs, + }); + envStale += 1; + } + } + // Persist the per-env count to the durable hash. HSET when stale + // > 0, HDEL when it dropped back to zero — the hash is the source + // of truth for the gauge snapshot below. + await deps.state.setEnvStaleCount(envId, envStale); + // Track that this env was visited during the current cycle. The + // reconcile step at cycle wrap uses this to HDEL counts hash + // entries for envs that fully drained mid-cycle (they disappear + // from listEnvsForOrg, so the inner loop above never reaches them + // and never HDELs their hash field — without reconcile the gauge + // would stay elevated forever). + await deps.state.markEnvVisited(envId); + staleCount += envStale; + } + } + + // Advance the cursor. If the slice consumed the end of the LIST, wrap + // to 0 so the next tick rebuilds the org list and starts a new cycle. + const advanced = cursor + slice.length; + const wrapped = advanced >= total; + const newCursor = wrapped ? 0 : advanced; + await deps.state.writeCursor(newCursor); + + if (wrapped) { + // Cycle ended. HDEL any env still in the counts hash that didn't + // appear in any tick of the just-completed cycle — these are envs + // that fully drained from the buffer mid-cycle and would otherwise + // hold their stale gauge value forever. Also DELs the visited set + // so the next cycle starts clean. + await deps.state.reconcileVisited(); + } + + // Emit the snapshot from the durable hash, which carries values for + // envs visited in earlier ticks too. This is what makes the gauge + // stable across ticks (and across webapp restarts). + const snapshot = await deps.state.readAllEnvStaleCounts(); + reportSnapshot(snapshot); + + return { orgsScanned: slice.length, envsScanned, entriesScanned, staleCount }; +} + +export type StaleSweepIntervalHandle = { + stop: () => Promise; +}; + +// Production wrapper: schedule `runStaleSweepOnce` on a fixed interval. +// One pass at a time — if a sweep is still running when the timer fires +// the next tick is skipped (a backed-up Redis would otherwise queue +// overlapping sweeps that all log the same stale entries). +export function startStaleSweepInterval( + config: StaleSweepConfig & { intervalMs: number }, + deps: StaleSweepDeps, +): StaleSweepIntervalHandle { + let stopped = false; + let inFlight = false; + + const tick = async () => { + if (stopped || inFlight) return; + inFlight = true; + try { + await runStaleSweepOnce(config, deps); + } catch (err) { + const log = deps.logger ?? defaultLogger; + log.warn("mollifier.stale_sweep.failed", { + err: err instanceof Error ? err.message : String(err), + }); + } finally { + inFlight = false; + } + }; + + const timer = setInterval(() => { + void tick(); + }, config.intervalMs); + + return { + stop: async () => { + stopped = true; + clearInterval(timer); + // Close the durable-state Redis client if the deps own a real + // `MollifierStaleSweepState`. Tests may inject a fake without a + // `close()`; guard accordingly. + if (deps.state instanceof MollifierStaleSweepState) { + await deps.state.close(); + } + }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweepState.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweepState.server.ts new file mode 100644 index 0000000000..da44d65808 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweepState.server.ts @@ -0,0 +1,176 @@ +import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis"; +import { Logger } from "@trigger.dev/core/logger"; + +// Durable per-tick state for the sharded stale sweep. Four Redis keys, +// all in the `mollifier:` namespace alongside the buffer's own state: +// +// mollifier:stale_sweep:cursor STRING next position in org_list (0 = fresh cycle) +// mollifier:stale_sweep:org_list LIST org IDs frozen at the start of the cycle +// mollifier:stale_sweep:counts HASH envId -> last-known stale count +// mollifier:stale_sweep:visited SET envIds visited during the current cycle +// +// The state survives webapp restarts: a restarted process picks up the +// cursor where the previous one left off and re-emits the last-known +// gauge values immediately, rather than blinking to zero until the next +// cycle visits each env. +// +// The `visited` set exists to GC the `counts` hash at cycle wrap: an env +// that drains completely between sweep ticks disappears from +// `buffer.listEnvsForOrg`, so the sweep's inner loop never revisits it +// and never HDELs its counts entry. Without the visited-set GC the +// counts hash retains the env's last-known stale count forever and the +// gauge stays permanently elevated. At cursor wrap we diff the hash +// against the cycle's visited set and HDEL the difference. +// +// Storage is owned by this class rather than added to MollifierBuffer +// because the keys are sweep-internal — the buffer abstracts the +// drainer/queue state, this abstracts sweep state. They share a +// namespace prefix but no API surface. + +export interface StaleSweepStateStore { + readCursor(): Promise; + writeCursor(value: number): Promise; + /** Replaces the cycle's frozen org_list. Called at cursor=0. */ + rebuildOrgList(orgs: string[]): Promise; + /** Returns up to `count` org IDs starting at `start`, plus the LIST's total length. */ + readOrgListSlice(start: number, count: number): Promise<{ orgs: string[]; total: number }>; + /** HSET when count > 0, HDEL when count === 0 (so the snapshot reflects current truth). */ + setEnvStaleCount(envId: string, count: number): Promise; + readAllEnvStaleCounts(): Promise>; + /** SADD `envId` to the current cycle's visited set. Called once per env scanned per tick. */ + markEnvVisited(envId: string): Promise; + /** + * HDEL every env in the counts hash that is NOT in the visited set, then + * DEL the visited set. Called when the cursor wraps (cycle ends) so + * envs that fully drained mid-cycle get cleaned out of the gauge. + */ + reconcileVisited(): Promise; + clearAll(): Promise; + close(): Promise; +} + +const CURSOR_KEY = "mollifier:stale_sweep:cursor"; +const ORG_LIST_KEY = "mollifier:stale_sweep:org_list"; +const COUNTS_KEY = "mollifier:stale_sweep:counts"; +const VISITED_KEY = "mollifier:stale_sweep:visited"; + +export class MollifierStaleSweepState implements StaleSweepStateStore { + private readonly redis: Redis; + private readonly logger: Logger; + + constructor(options: { redisOptions: RedisOptions; logger?: Logger }) { + this.logger = options.logger ?? new Logger("MollifierStaleSweepState", "debug"); + this.redis = createRedisClient( + { ...options.redisOptions, maxRetriesPerRequest: 20 }, + { + onError: (error) => { + this.logger.error("MollifierStaleSweepState redis client error:", { error }); + }, + }, + ); + } + + async readCursor(): Promise { + const raw = await this.redis.get(CURSOR_KEY); + if (raw === null) return 0; + const n = Number.parseInt(raw, 10); + return Number.isFinite(n) && n >= 0 ? n : 0; + } + + async writeCursor(value: number): Promise { + await this.redis.set(CURSOR_KEY, String(value)); + } + + async rebuildOrgList(orgs: string[]): Promise { + // DEL + RPUSH in a pipeline — close enough to atomic for an + // observational sweep (the inFlight guard at startStaleSweepInterval + // serialises sweep passes; nothing else writes these keys). + const pipeline = this.redis.pipeline(); + pipeline.del(ORG_LIST_KEY); + if (orgs.length > 0) { + pipeline.rpush(ORG_LIST_KEY, ...orgs); + } + await pipeline.exec(); + } + + async readOrgListSlice( + start: number, + count: number, + ): Promise<{ orgs: string[]; total: number }> { + const pipeline = this.redis.pipeline(); + pipeline.lrange(ORG_LIST_KEY, start, start + count - 1); + pipeline.llen(ORG_LIST_KEY); + const results = await pipeline.exec(); + if (!results) return { orgs: [], total: 0 }; + const [lrangeErr, lrangeRes] = results[0] as [Error | null, string[] | null]; + const [llenErr, llenRes] = results[1] as [Error | null, number | null]; + if (lrangeErr || llenErr) { + this.logger.error("MollifierStaleSweepState.readOrgListSlice failed", { + lrangeErr: lrangeErr?.message, + llenErr: llenErr?.message, + }); + return { orgs: [], total: 0 }; + } + return { orgs: lrangeRes ?? [], total: llenRes ?? 0 }; + } + + async setEnvStaleCount(envId: string, count: number): Promise { + if (count > 0) { + await this.redis.hset(COUNTS_KEY, envId, String(count)); + } else { + await this.redis.hdel(COUNTS_KEY, envId); + } + } + + async readAllEnvStaleCounts(): Promise> { + const raw = await this.redis.hgetall(COUNTS_KEY); + const out = new Map(); + for (const [envId, value] of Object.entries(raw)) { + const n = Number.parseInt(value, 10); + if (Number.isFinite(n)) out.set(envId, n); + } + return out; + } + + async markEnvVisited(envId: string): Promise { + await this.redis.sadd(VISITED_KEY, envId); + } + + async reconcileVisited(): Promise { + // HKEYS + SMEMBERS in a pipeline, then HDEL the difference locally. + // For typical fleet sizes (counts and visited both bounded by the + // count of buffered envs) this is well within a single RTT plus one + // small HDEL. + const pipeline = this.redis.pipeline(); + pipeline.hkeys(COUNTS_KEY); + pipeline.smembers(VISITED_KEY); + const results = await pipeline.exec(); + if (!results) return; + const [hkeysErr, hkeysRes] = results[0] as [Error | null, string[] | null]; + const [smembersErr, smembersRes] = results[1] as [Error | null, string[] | null]; + if (hkeysErr || smembersErr) { + this.logger.error("MollifierStaleSweepState.reconcileVisited failed", { + hkeysErr: hkeysErr?.message, + smembersErr: smembersErr?.message, + }); + return; + } + const hashEnvs = hkeysRes ?? []; + const visited = new Set(smembersRes ?? []); + const orphans = hashEnvs.filter((envId) => !visited.has(envId)); + const cleanup = this.redis.pipeline(); + if (orphans.length > 0) { + cleanup.hdel(COUNTS_KEY, ...orphans); + } + cleanup.del(VISITED_KEY); + await cleanup.exec(); + } + + async clearAll(): Promise { + await this.redis.del(CURSOR_KEY, ORG_LIST_KEY, COUNTS_KEY, VISITED_KEY); + } + + async close(): Promise { + await this.redis.quit(); + } +} diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index 0fe302584c..f9c7ca72f1 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -15,3 +15,87 @@ export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason ...(reason ? { reason } : {}), }); } + +// Counts subscriptions hitting `/realtime/v1/runs/` for a run that +// lives only in the mollifier buffer (no PG row yet). The route opens +// the Electric stream anyway so the eventual drainer-INSERT propagates +// to the client; this counter is the signal of how often customers +// subscribe inside the buffered window. +export const realtimeBufferedSubscriptionsCounter = meter.createCounter( + "mollifier.realtime_subscriptions.buffered", + { + description: + "Realtime subscriptions opened against a runId that exists only in the mollifier buffer", + }, +); + +// No `envId` attribute — `envId` is a banned high-cardinality metric +// label per the repo's OTel rules. The structured warn log emitted +// alongside the counter tick (in `mollifierStaleSweep.server.ts`) +// carries the envId / orgId / runId for forensic drill-down; the +// metric stays an aggregate. +export function recordRealtimeBufferedSubscription(): void { + realtimeBufferedSubscriptionsCounter.add(1); +} + +// Counts buffer entries that have been waiting in the queue ZSET longer +// than the configured stale threshold. Useful for historical "stale +// events over time" views, but not directly alertable on its own — a +// single stuck entry observed by N sweep ticks adds N to the counter, +// so `rate()` over an alerting window reflects (entries × ticks), not +// "entries that are stale right now". +export const staleEntriesCounter = meter.createCounter( + "mollifier.stale_entries", + { + description: + "Mollifier buffer entries whose dwell exceeds the stale threshold (per sweep pass)", + }, +); + +// No `envId` attribute — see comment above. +export function recordStaleEntry(): void { + staleEntriesCounter.add(1); +} + +// Alertable signal: the total count of stale entries observed by the +// latest sweep. The sweep snapshots the full picture on each pass so +// the gauge drops back to 0 when the drainer catches up instead of +// staying latched. Recommended alert: +// mollifier_stale_entries_current > 0 for 5m +export const staleEntriesGauge = meter.createObservableGauge( + "mollifier.stale_entries.current", + { + description: + "Buffer entries whose dwell exceeds the stale threshold, as observed by the latest sweep pass", + }, +); + +let latestStaleTotal = 0; + +export function reportStaleEntrySnapshot(snapshot: Map): void { + // Sum across envs. Per-env breakdown is intentionally NOT emitted as + // a metric label (high-cardinality); the structured warn log lines + // from the sweep carry per-env detail for ops to drill down. + let total = 0; + for (const count of snapshot.values()) { + total += count; + } + latestStaleTotal = total; +} + +meter.addBatchObservableCallback( + (result) => { + result.observe(staleEntriesGauge, latestStaleTotal); + }, + [staleEntriesGauge], +); + +// Electric SQL's shape-stream protocol adds a `handle=` query param on +// every reconnect after the initial GET. Gating the realtime-buffered +// log/counter on its absence keeps the signal at one tick per +// subscription instead of one tick per ~20s live-poll iteration — +// without it the counter would over-count by the long-poll factor. +export function isInitialBufferedSubscriptionRequest(url: string | URL): boolean { + const u = typeof url === "string" ? new URL(url) : url; + return !u.searchParams.has("handle"); +} diff --git a/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts new file mode 100644 index 0000000000..de05ab2467 --- /dev/null +++ b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts @@ -0,0 +1,73 @@ +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { signalsEmitter } from "~/services/signals.server"; +import { + startStaleSweepInterval, + type StaleSweepIntervalHandle, +} from "./mollifier/mollifierStaleSweep.server"; +import { MollifierStaleSweepState } from "./mollifier/mollifierStaleSweepState.server"; + +declare global { + // eslint-disable-next-line no-var + var __mollifierStaleSweepRegistered__: boolean | undefined; + // eslint-disable-next-line no-var + var __mollifierStaleSweepHandle__: StaleSweepIntervalHandle | undefined; +} + +/** + * Bootstraps the mollifier stale-entry sweep. + * + * Independent of the drainer — its purpose is to alert when entries are + * piling up despite the drainer being supposedly healthy, so it runs + * any time the mollifier itself is enabled (gated separately from + * `TRIGGER_MOLLIFIER_DRAINER_ENABLED`). The sweep is read-only: it + * counts and logs stale entries but does not remove or salvage them. + * + * The Remix dev server re-evaluates `entry.server.tsx` on every change, + * so the registration guard + handle cache make the bootstrap + * idempotent across hot reloads. + */ +export function initMollifierStaleSweepWorker(): void { + if (env.TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED !== "1") return; + if (global.__mollifierStaleSweepRegistered__) return; + + logger.debug("Initializing mollifier stale-entry sweep", { + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + // Construct the sweep's durable-state Redis client using the same + // mollifier-Redis credentials as the buffer. Keeping this client + // separate from the buffer's own client keeps state ownership clean: + // the buffer abstracts queue/entry state, this abstracts sweep state. + const state = new MollifierStaleSweepState({ + redisOptions: { + keyPrefix: "", + host: env.TRIGGER_MOLLIFIER_REDIS_HOST, + port: env.TRIGGER_MOLLIFIER_REDIS_PORT, + username: env.TRIGGER_MOLLIFIER_REDIS_USERNAME, + password: env.TRIGGER_MOLLIFIER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + }); + + const handle = startStaleSweepInterval( + { + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }, + { state }, + ); + + // `handle.stop` is now async (it closes the Redis client). The signals + // emitter swallows promise rejections from listeners, so wrap it in a + // void-returning shim to be explicit about discarding the promise. + const onShutdown = (): void => { + void handle.stop(); + }; + signalsEmitter.on("SIGTERM", onShutdown); + signalsEmitter.on("SIGINT", onShutdown); + global.__mollifierStaleSweepRegistered__ = true; + global.__mollifierStaleSweepHandle__ = handle; +} diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts new file mode 100644 index 0000000000..59c11eb14f --- /dev/null +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -0,0 +1,323 @@ +import { describe, expect, it, vi } from "vitest"; +import { trace } from "@opentelemetry/api"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { + createDrainerHandler, + isRetryablePgError, +} from "~/v3/mollifier/mollifierDrainerHandler.server"; + +describe("isRetryablePgError", () => { + it("returns true for P2024 (connection pool timeout)", () => { + const err = Object.assign(new Error("Timed out fetching a new connection"), { + code: "P2024", + }); + expect(isRetryablePgError(err)).toBe(true); + }); + + it("returns true for generic connection-lost messages", () => { + expect(isRetryablePgError(new Error("Connection lost"))).toBe(true); + expect(isRetryablePgError(new Error("Can't reach database server"))).toBe(true); + }); + + it("returns false for validation errors", () => { + expect(isRetryablePgError(new Error("Invalid payload"))).toBe(false); + }); + + it("returns false for non-Error inputs", () => { + expect(isRetryablePgError("string error")).toBe(false); + expect(isRetryablePgError({ message: "object" })).toBe(false); + }); +}); + +describe("createDrainerHandler", () => { + it("invokes engine.trigger with the deserialised snapshot", async () => { + const trigger = vi.fn(async () => ({ friendlyId: "run_x" })); + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", payload: "{}" }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(trigger).toHaveBeenCalledOnce(); + const callArg = trigger.mock.calls[0][0] as { taskIdentifier: string }; + expect(callArg.taskIdentifier).toBe("t"); + }); + + it("re-attaches the snapshot's traceId so engine.trigger inherits the original trace", async () => { + // Captures the active traceId at the moment engine.trigger is invoked. + // Without context propagation it would be a fresh traceId, leaving the + // run-detail page with only the root span. + let observedTraceId: string | undefined; + const trigger = vi.fn(async () => { + observedTraceId = trace.getActiveSpan()?.spanContext().traceId; + return { friendlyId: "run_x" }; + }); + + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + const snapshotTraceId = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + const snapshotSpanId = "bbbbbbbbbbbbbbbb"; + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { + taskIdentifier: "t", + traceId: snapshotTraceId, + spanId: snapshotSpanId, + }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(observedTraceId).toBe(snapshotTraceId); + }); + + it("rethrows retryable PG errors so MollifierDrainer requeues the entry", async () => { + const err = new Error("Can't reach database server"); + const trigger = vi.fn(async () => { + throw err; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("Can't reach database server"); + // Retryable: we do NOT write a SYSTEM_FAILURE row, the entry should + // be requeued for another shot. + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); + + const envFixture = { + id: "env_a", + type: "DEVELOPMENT", + project: { id: "proj_1" }, + organization: { id: "org_1" }, + }; + + it("writes a SYSTEM_FAILURE PG row when engine.trigger fails non-retryably", async () => { + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => ({ + id: "internal", + friendlyId: "run_x", + })); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).resolves.toBeUndefined(); + + expect(trigger).toHaveBeenCalledOnce(); + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + const arg = createFailedTaskRun.mock.calls[0][0] as { error: { raw: string } }; + expect(arg.error.raw).toContain("validation failed"); + }); + + it("propagates the batch association into createFailedTaskRun (so batch parents don't hang on missing children)", async () => { + // Devin's ANALYSIS report on PR #3754: the terminal-failure path + // extracts most snapshot fields (parentTaskRunId, rootTaskRunId, + // depth, etc.) but dropped `batch`. If the original trigger was + // part of a batch, the SYSTEM_FAILURE row isn't associated with + // the batch, so the batch parent's completion-tracking can hang + // indefinitely waiting on a child that landed but isn't linked. + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => ({ + id: "internal", + friendlyId: "run_x", + })); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_batched", + envId: "env_a", + orgId: "org_1", + payload: { + taskIdentifier: "t", + environment: envFixture, + batch: { id: "batch_xyz", index: 7 }, + }, + attempts: 0, + createdAt: new Date(), + } as any), + ).resolves.toBeUndefined(); + + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + const arg = createFailedTaskRun.mock.calls[0][0] as { + batch?: { id: string; index: number }; + }; + expect(arg.batch).toEqual({ id: "batch_xyz", index: 7 }); + }); + + it("rethrows the original error when createFailedTaskRun also fails (PG genuinely unreachable)", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(async () => { + throw new Error("connection refused"); + }); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + // Drainer's outer drainOne loop now decides retry vs buffer.fail. + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + }); + + it("honours the cancel when a buffered cancel races a materialised non-CANCELED row", async () => { + // Cancel-wins-over-trigger. If the normal trigger + // replay path materialised a live PENDING row before the cancel + // bifurcation drained, engine.createCancelledRun throws a conflict — + // its documented contract is that "the caller must decide between + // engine.cancelRun() and skipping". The drainer handler must honour + // the cancel intent by actually cancelling the now-live run; otherwise + // the conflict propagates, isRetryablePgError() returns false, and the + // drainer buffer.fail()s the entry — silently losing the cancellation + // while the run keeps executing. + const friendlyId = RunId.generate().friendlyId; + const createCancelledRun = vi.fn(async () => { + throw new Error( + `createCancelledRun conflict: existing run ${friendlyId} has status PENDING` + ); + }); + const cancelRun = vi.fn(async () => ({ alreadyFinished: false })); + const handler = createDrainerHandler({ + engine: { createCancelledRun, cancelRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: friendlyId, + envId: "env_a", + orgId: "org_1", + payload: { + friendlyId, + taskIdentifier: "t", + environment: envFixture, + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }, + attempts: 0, + createdAt: new Date(), + } as any) + ).resolves.toBeUndefined(); + + // The live run is actually cancelled, by its internal id. + expect(cancelRun).toHaveBeenCalledOnce(); + expect(cancelRun.mock.calls[0][0].runId).toBe(RunId.fromFriendlyId(friendlyId)); + }); + + it("requeues on a transient PG outage during the SYSTEM_FAILURE fallback write", async () => { + // engine.trigger failed non-retryably, so we try to write a terminal + // SYSTEM_FAILURE row. If THAT write fails because PG is transiently + // unreachable, rethrowing the *original* non-retryable error makes the + // drainer buffer.fail() the entry — losing the run with no PG row ever + // landing. Rethrow the retryable write error instead so the drainer + // requeues; once PG recovers the failure row lands and the customer + // sees it. + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => { + throw new Error("Can't reach database server"); + }); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any) + ).rejects.toThrow("Can't reach database server"); + }); + + it("rethrows the original error when the snapshot lacks an environment block", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" /* no environment */ }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/webapp/test/mollifierDrainerWorker.test.ts b/apps/webapp/test/mollifierDrainerWorker.test.ts index e5f38229d8..0d4e931fd8 100644 --- a/apps/webapp/test/mollifierDrainerWorker.test.ts +++ b/apps/webapp/test/mollifierDrainerWorker.test.ts @@ -1,4 +1,17 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; + +// Importing `~/v3/mollifier/mollifierDrainer.server` (below) transitively +// loads `~/v3/runEngine.server`, whose top-level `singleton(...)` call +// eagerly constructs a RunEngine. That spins up Prisma + Redis workers +// that try to connect to localhost — which in CI (no PG, no Redis) +// produces an unhandled `PrismaClientInitializationError` that fails +// the test run even though the assertions all pass. Mocking the +// runEngine module short-circuits the singleton so no worker starts. +vi.mock("~/v3/runEngine.server", () => ({ engine: {} })); +// Same problem: prisma.server.ts's top-level singleton tries to open a +// PG client. The test never makes a query; an empty stub is enough. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + import { MollifierConfigurationError } from "~/v3/mollifier/mollifierDrainer.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts new file mode 100644 index 0000000000..dddaa9d247 --- /dev/null +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -0,0 +1,801 @@ +import { describe, expect, it, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { runStaleSweepOnce } from "~/v3/mollifier/mollifierStaleSweep.server"; +import { MollifierStaleSweepState } from "~/v3/mollifier/mollifierStaleSweepState.server"; + +const SNAPSHOT = { + taskIdentifier: "hello-world", + payload: '{"x":1}', + payloadType: "application/json", + traceContext: {}, +}; + +// In-memory fake state for unit tests that don't have a Redis container. +// The testcontainer tests use a real MollifierStaleSweepState against +// the test Redis instead. +function makeFakeState() { + let cursor = 0; + let orgList: string[] = []; + const counts = new Map(); + let visited = new Set(); + return { + readCursor: async () => cursor, + writeCursor: async (v: number) => { + cursor = v; + }, + rebuildOrgList: async (orgs: string[]) => { + orgList = [...orgs]; + }, + readOrgListSlice: async (start: number, count: number) => ({ + orgs: orgList.slice(start, start + count), + total: orgList.length, + }), + setEnvStaleCount: async (envId: string, count: number) => { + if (count > 0) counts.set(envId, count); + else counts.delete(envId); + }, + readAllEnvStaleCounts: async () => new Map(counts), + markEnvVisited: async (envId: string) => { + visited.add(envId); + }, + reconcileVisited: async () => { + for (const envId of [...counts.keys()]) { + if (!visited.has(envId)) counts.delete(envId); + } + visited = new Set(); + }, + clearAll: async () => { + cursor = 0; + orgList = []; + counts.clear(); + visited = new Set(); + }, + close: async () => {}, + }; +} + +function spyDeps() { + // Counter ticks — metric carries no `envId` label (high-cardinality) + // so the spy is a simple call count. Per-env detail lives on the + // structured warn log and the snapshot map. + let staleEntryCount = 0; + const snapshots: Array> = []; + const warnings: Array<{ message: string; fields: Record }> = []; + return { + get staleEntryCount() { + return staleEntryCount; + }, + snapshots, + warnings, + deps: { + recordStaleEntry: () => { + staleEntryCount += 1; + }, + reportStaleEntrySnapshot: (snapshot: Map) => { + // Clone so post-sweep assertions see what was reported *at that + // call site*, not whatever subsequent passes mutate the source + // map into. + snapshots.push(new Map(snapshot)); + }, + logger: { + warn: (message: string, fields: Record) => { + warnings.push({ message, fields }); + }, + }, + }, + }; +} + +describe("runStaleSweepOnce — unit", () => { + it("returns zeros when the buffer is null", async () => { + // Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer + // singleton is null and the sweep is a no-op. We don't want it to + // emit a metric (or throw) just because mollifier is disabled. + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 1000 }, + { ...spies.deps, getBuffer: () => null, state: makeFakeState() }, + ); + expect(result).toEqual({ + orgsScanned: 0, + envsScanned: 0, + entriesScanned: 0, + staleCount: 0, + }); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); + const snapshots = spies.snapshots; + // An empty snapshot is still reported so any previously-paging env + // (from a prior sweep before mollifier was disabled) clears. + expect(snapshots).toHaveLength(1); + expect(snapshots[0].size).toBe(0); + }); +}); + +describe("runStaleSweepOnce — testcontainers", () => { + redisTest( + "flags every entry whose dwell exceeds the stale threshold", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + // Three entries across two envs in the same org. The sweep below + // runs against a `now` advanced by 5 minutes, so all three have + // dwell ~5min and ALL THREE are stale against a 1-minute + // threshold — there is no "fresh" entry in this scenario. The + // assertions below pin the all-three-stale shape. + await buffer.accept({ + runId: "run_stale_a", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_stale_b", + envId: "env_b", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_stale_c", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + // Yank the system clock forward 5 minutes for the sweep — way + // past the threshold below. The `now` deps seam lets us drive + // the threshold without actually waiting in real time. + const futureNow = Date.now() + 5 * 60 * 1000; + + const spies = spyDeps(); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }, + ); + + expect(result.envsScanned).toBe(2); + expect(result.entriesScanned).toBe(3); + expect(result.staleCount).toBe(3); + // All three entries exceed the threshold; each emits one + // counter tick + one warning. + expect(spies.staleEntryCount).toBe(3); + expect(spies.warnings).toHaveLength(3); + for (const w of spies.warnings) { + expect(w.message).toBe("mollifier.stale_entry"); + expect(w.fields.staleThresholdMs).toBe(60 * 1000); + expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000); + } + // Snapshot drives the alertable gauge — env_a has 2 stale + // entries, env_b has 1. Per-env detail is still passed to + // `reportStaleEntrySnapshot` for forensic value even though the + // gauge itself aggregates the total. + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ + env_a: 2, + env_b: 1, + }); + } finally { + await state.close(); + } + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "snapshot omits envs that have entries but none stale (durable hash HDEL's zeros)", + { timeout: 20_000 }, + async ({ redisOptions }) => { + // Critical for alert behaviour: a previous sweep flagged env_a + // stale, alert fired, drainer caught up. The next sweep must + // remove env_a from the durable counts hash so the gauge drops + // below the alert threshold instead of staying latched at the + // last stale value. With the sharded design the snapshot is + // sourced from the HASH directly — visiting an env with zero + // stale entries HDEL's it, so it's simply absent from the + // snapshot (telemetry sums values, so absence is equivalent to + // zero for the gauge). + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_just_arrived", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const spies = spyDeps(); + await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...spies.deps, getBuffer: () => buffer, state }, + ); + expect(spies.snapshots).toHaveLength(1); + // env_a has entries but none stale → not in the snapshot. + expect(spies.snapshots[0].has("env_a")).toBe(false); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "leaves fresh entries alone (dwell below threshold)", + { timeout: 20_000 }, + async ({ redisOptions }) => { + // Regression guard for the inequality direction. A bug that flipped + // `dwellMs > threshold` to `dwellMs >= threshold` would flag every + // entry the first time the sweep runs after a perfectly synchronised + // accept call — the dashboard would page on every burst. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_fresh_only", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...spies.deps, getBuffer: () => buffer, state }, + ); + expect(result.staleCount).toBe(0); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "shards work across ticks: cursor advances by maxOrgsPerPass and wraps after a full cycle", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // Without sharding the sweep walks every org/env every tick — at + // any meaningful backlog that runs longer than the tick interval + // and the next tick gets dropped by the inFlight guard. Sharding + // splits the work: each tick visits at most `maxOrgsPerPass` orgs, + // advances a durable cursor, and resumes from there next tick. + // Over `ceil(N / cap)` ticks the cycle covers every org. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + for (let i = 0; i < 5; i++) { + await buffer.accept({ + runId: `run_shard_${i}`, + envId: `env_shard_${i}`, + orgId: `org_shard_${i}`, + payload: JSON.stringify(SNAPSHOT), + }); + } + const futureNow = Date.now() + 5 * 60 * 1000; + const spies = spyDeps(); + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 2 }; + const baseDeps = { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }; + + // Tick 1: cursor starts at 0, scans 2 orgs. + const r1 = await runStaleSweepOnce(cfg, baseDeps); + expect(r1.orgsScanned).toBe(2); + expect(spies.snapshots[0].size).toBe(2); + + // Tick 2: cursor was 2, scans 2 more orgs. + const r2 = await runStaleSweepOnce(cfg, baseDeps); + expect(r2.orgsScanned).toBe(2); + // Snapshot is the durable HASH — accumulates across ticks. + expect(spies.snapshots[1].size).toBe(4); + + // Tick 3: cursor was 4, scans the last 1 org and wraps to 0. + const r3 = await runStaleSweepOnce(cfg, baseDeps); + expect(r3.orgsScanned).toBe(1); + expect(spies.snapshots[2].size).toBe(5); + + // Tick 4: cycle complete, cursor is back at 0 — starts over. + const r4 = await runStaleSweepOnce(cfg, baseDeps); + expect(r4.orgsScanned).toBe(2); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "clears an env from the durable snapshot on revisit when it has entries but none currently stale", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // Stale state in the durable hash must be HDEL'd, not just left + // stale, when a previously-flagged env no longer has any entries + // whose dwell exceeds the threshold (drainer caught up, alert + // condition cleared). The same `entry` flips from stale to + // not-stale between two sweep ticks by varying the sweep's `now` + // — tick 1 uses a future clock so the entry is flagged stale; + // tick 2 uses real time so the same entry has near-zero dwell and + // is no longer stale. The env stays in the active set throughout + // (queue still has an entry), so the cursor revisits it and the + // hash field is cleared. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_drain", + envId: "env_drain", + orgId: "org_drain", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const spies = spyDeps(); + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 10 }; + + // Tick 1 with future clock: entry's dwell is 5min vs 1min + // threshold → flagged stale. + await runStaleSweepOnce(cfg, { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }); + expect(spies.snapshots[0].get("env_drain")).toBe(1); + + // Tick 2 with real time: same entry, but its dwell is now ~ms + // vs the same 1min threshold → not stale. The env is revisited + // (cursor wrapped to 0 after tick 1, only 1 org in the list), + // setEnvStaleCount called with 0 → HDEL. + await runStaleSweepOnce(cfg, { + ...spies.deps, + getBuffer: () => buffer, + state, + }); + expect(spies.snapshots[1].has("env_drain")).toBe(false); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "evicts fully-drained envs from the counts hash at cycle wrap (no permanent alert)", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // Devin's BUG report on PR #3754: an env that drains completely + // between sweep ticks disappears from `mollifier:org-envs:${orgId}` + // entirely, so the inner loop at runStaleSweepOnce never visits it + // and `setEnvStaleCount(envId, 0)` (which HDELs the field) is + // never called. The counts hash retains the env's last-known + // stale count forever, the gauge stays elevated, and the + // recommended alert `> 0 for 5m` fires indefinitely. + // + // Fix: at cycle wrap (cursor returned to 0) HDEL any env in the + // counts hash that wasn't visited during the just-completed cycle. + // Verified here by: + // 1. Flagging env_will_drain stale, confirming it's in the hash + // 2. Draining its only entry — now invisible to listEnvsForOrg + // 3. Running a sweep tick that triggers cycle wrap + // 4. Asserting the env is no longer in the snapshot + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_will_drain", + envId: "env_will_drain", + orgId: "org_will_drain", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 10 }; + const spies = spyDeps(); + + // Tick 1: env_will_drain is flagged stale → enters counts hash. + // Cursor wraps to 0 (only 1 org in the list). + await runStaleSweepOnce(cfg, { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }); + expect(spies.snapshots[0].get("env_will_drain")).toBe(1); + + // Drain the only entry. mollifier:queue:env_will_drain is now + // empty, and the buffer's atomic Lua removes env_will_drain + // from `mollifier:org-envs:org_will_drain` (and removes the org + // from `mollifier:orgs` since it has no other envs). The env is + // now invisible to listEnvsForOrg. + const popped = await buffer.pop("env_will_drain"); + expect(popped?.runId).toBe("run_will_drain"); + + // Tick 2: cursor was 0 after tick 1's wrap, so this rebuilds + // the org list (now empty) and immediately wraps again. The + // wrap-handler must HDEL env_will_drain from the counts hash + // because it wasn't in the visited set for this cycle. + await runStaleSweepOnce(cfg, { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }); + expect(spies.snapshots[1].has("env_will_drain")).toBe(false); + // And the durable hash is genuinely empty, not just absent from + // this snapshot. + expect((await state.readAllEnvStaleCounts()).size).toBe(0); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "scans across multiple orgs", + { timeout: 20_000 }, + async ({ redisOptions }) => { + // The drainer pops with org-level fairness, so the sweep must + // walk every org/env to surface stale entries across all of them + // — not just stop at the first env it finds. If a future refactor + // collapsed listOrgs/listEnvsForOrg into a single env-flat list, + // this test catches a regression there. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_x", + envId: "env_x", + orgId: "org_x", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_y", + envId: "env_y", + orgId: "org_y", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...spies.deps, getBuffer: () => buffer, state, now: () => futureNow }, + ); + expect(result.orgsScanned).toBe(2); + expect(result.envsScanned).toBe(2); + expect(result.staleCount).toBe(2); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "state survives process restart: a second state instance picks up the cursor and counts", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // This is the headline reason the sweep state is durable in Redis + // instead of process-local — a webapp restart mid-cycle must not + // re-emit the gauge as fresh-zero for previously-flagged envs nor + // restart the cursor walk from scratch. Simulated here by closing + // state1 (its Redis client quits cleanly) and constructing state2 + // against the same Redis. The cursor + counts that state1 wrote + // are visible to state2 on its first tick. + const buffer = new MollifierBuffer({ redisOptions }); + const state1 = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_a", + envId: "env_a", + orgId: "org_a", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_b", + envId: "env_b", + orgId: "org_b", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 1 }; + const spies1 = spyDeps(); + + // Tick 1 with state1: visits 1 of 2 orgs. + await runStaleSweepOnce(cfg, { + ...spies1.deps, + getBuffer: () => buffer, + state: state1, + now: () => futureNow, + }); + expect(spies1.snapshots[0].size).toBe(1); + } finally { + // Simulate webapp restart: state1's Redis client closes cleanly. + await state1.close(); + } + + // New process boots, constructs a fresh state pointing at the + // same Redis. The cycle's frozen org_list, the cursor, and the + // counts hash are all preserved — state2 picks up at the second + // org of the cycle. + const state2 = new MollifierStaleSweepState({ redisOptions }); + try { + const futureNow = Date.now() + 5 * 60 * 1000; + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 1 }; + const spies2 = spyDeps(); + + await runStaleSweepOnce(cfg, { + ...spies2.deps, + getBuffer: () => buffer, + state: state2, + now: () => futureNow, + }); + // Snapshot now has BOTH envs: the one tick 1 flagged (still in + // the counts hash from state1) plus the one tick 2 just flagged. + // A non-durable design would show only the second. + expect(spies2.snapshots[0].size).toBe(2); + } finally { + await state2.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "cycle wrap rebuilds the org list, so orgs that joined mid-cycle get visited on the next cycle", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // The docstring promises "orgs joining mid-cycle wait until the + // next cycle to be visited." The mechanism is rebuildOrgList at + // cursor=0: a fresh snapshot of buffer.listOrgs() replaces the + // previous frozen LIST. Verified here by adding a third org + // between cycles and asserting it shows up only in the next + // cycle's snapshot. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_init_a", + envId: "env_init_a", + orgId: "org_init_a", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_init_b", + envId: "env_init_b", + orgId: "org_init_b", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const spies = spyDeps(); + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 10 }; + const baseDeps = { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }; + + // Tick 1: cycle 1. Visits both initial orgs; cursor wraps to 0. + await runStaleSweepOnce(cfg, baseDeps); + expect(spies.snapshots[0].size).toBe(2); + + // Mid-flight: a third org joins the buffer. It must NOT have + // been part of cycle 1's frozen LIST. + await buffer.accept({ + runId: "run_mid", + envId: "env_mid", + orgId: "org_mid", + payload: JSON.stringify(SNAPSHOT), + }); + + // Tick 2: cycle 2 begins (cursor was 0 after tick 1's wrap). + // rebuildOrgList captures all 3 orgs; this tick visits all 3. + const r2 = await runStaleSweepOnce(cfg, baseDeps); + expect(r2.orgsScanned).toBe(3); + expect(spies.snapshots[1].size).toBe(3); + expect(spies.snapshots[1].has("env_mid")).toBe(true); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "empty buffer (no orgs) advances cleanly with zero work and an empty snapshot", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // `mollifier:orgs` is empty (no entries ever accepted, or every + // entry has been drained). The sweep must handle the boundary: + // rebuildOrgList with [], readOrgListSlice returns total=0, + // the org loop is skipped, and the cursor stays at 0 instead of + // tripping the wrap math. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 10 }, + { ...spies.deps, getBuffer: () => buffer, state }, + ); + expect(result).toEqual({ + orgsScanned: 0, + envsScanned: 0, + entriesScanned: 0, + staleCount: 0, + }); + expect(spies.snapshots).toHaveLength(1); + expect(spies.snapshots[0].size).toBe(0); + // Cursor stayed at 0 — nothing to advance through. + expect(await state.readCursor()).toBe(0); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); + + redisTest( + "buffer-null branch wipes the durable state so a re-enable starts fresh", + { timeout: 30_000 }, + async ({ redisOptions }) => { + // The unit test above asserts the snapshot is empty when the + // buffer is null, but doesn't verify the durable state was + // actually cleared. Without clearAll the next re-enable would + // resume on a stale cursor + carry over a stale counts hash. + const buffer = new MollifierBuffer({ redisOptions }); + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await buffer.accept({ + runId: "run_seed", + envId: "env_seed", + orgId: "org_seed", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const cfg = { staleThresholdMs: 60 * 1000, maxOrgsPerPass: 10 }; + const spies = spyDeps(); + + // Tick 1: populate state. + await runStaleSweepOnce(cfg, { + ...spies.deps, + getBuffer: () => buffer, + state, + now: () => futureNow, + }); + expect(spies.snapshots[0].size).toBe(1); + expect((await state.readAllEnvStaleCounts()).size).toBe(1); + + // Tick 2: mollifier flips OFF — getBuffer returns null. The + // sweep must clear the durable state. + await runStaleSweepOnce(cfg, { + ...spies.deps, + getBuffer: () => null, + state, + }); + expect(spies.snapshots[1].size).toBe(0); + expect((await state.readAllEnvStaleCounts()).size).toBe(0); + expect(await state.readCursor()).toBe(0); + } finally { + await state.close(); + await buffer.close(); + } + }, + ); +}); + +describe("MollifierStaleSweepState — direct unit tests", () => { + redisTest("readCursor returns 0 when the key is absent", { timeout: 20_000 }, async ({ redisOptions }) => { + const state = new MollifierStaleSweepState({ redisOptions }); + try { + expect(await state.readCursor()).toBe(0); + } finally { + await state.close(); + } + }); + + redisTest( + "writeCursor + readCursor round-trip; readCursor parses a non-numeric value as 0", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await state.writeCursor(42); + expect(await state.readCursor()).toBe(42); + + // Defensive: a corrupted/garbage value must not throw or + // propagate NaN into the sweep's cursor arithmetic. + await state["redis"].set("mollifier:stale_sweep:cursor", "not-a-number"); + expect(await state.readCursor()).toBe(0); + } finally { + await state.close(); + } + }, + ); + + redisTest( + "rebuildOrgList replaces the previous list (DEL + RPUSH, in order)", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await state.rebuildOrgList(["org_a", "org_b", "org_c"]); + let slice = await state.readOrgListSlice(0, 10); + expect(slice.total).toBe(3); + expect(slice.orgs).toEqual(["org_a", "org_b", "org_c"]); + + // Replacement, not append. + await state.rebuildOrgList(["org_x"]); + slice = await state.readOrgListSlice(0, 10); + expect(slice.total).toBe(1); + expect(slice.orgs).toEqual(["org_x"]); + + // Empty rebuild leaves the list empty (DEL fires, no RPUSH). + await state.rebuildOrgList([]); + slice = await state.readOrgListSlice(0, 10); + expect(slice.total).toBe(0); + expect(slice.orgs).toEqual([]); + } finally { + await state.close(); + } + }, + ); + + redisTest( + "setEnvStaleCount HSETs when count > 0 and HDELs when count === 0", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await state.setEnvStaleCount("env_a", 3); + await state.setEnvStaleCount("env_b", 1); + let counts = await state.readAllEnvStaleCounts(); + expect(Object.fromEntries(counts)).toEqual({ env_a: 3, env_b: 1 }); + + // Zero clears the field (HDEL), not stores 0. + await state.setEnvStaleCount("env_a", 0); + counts = await state.readAllEnvStaleCounts(); + expect(Object.fromEntries(counts)).toEqual({ env_b: 1 }); + expect(counts.has("env_a")).toBe(false); + } finally { + await state.close(); + } + }, + ); + + redisTest( + "clearAll DELs cursor, org_list, and counts in one call", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const state = new MollifierStaleSweepState({ redisOptions }); + try { + await state.writeCursor(7); + await state.rebuildOrgList(["org_a", "org_b"]); + await state.setEnvStaleCount("env_a", 5); + + await state.clearAll(); + + expect(await state.readCursor()).toBe(0); + expect((await state.readOrgListSlice(0, 10)).total).toBe(0); + expect((await state.readAllEnvStaleCounts()).size).toBe(0); + } finally { + await state.close(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index da42247111..cab2925872 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -450,6 +450,177 @@ export class RunEngine { //MARK: - Run functions + /** + * Writes a TaskRun row in CANCELED state directly, bypassing the trigger + * pipeline. Used by the mollifier drainer when a cancel API call lands on + * a buffered run before it materialises. + * + * Skips: queue insertion (no execution), waitpoint creation (the + * mollifier gate refuses to buffer triggerAndWait children, so a + * cancelled buffered run never has a waiting parent to unblock), + * concurrency reservation. Emits `runCancelled` so the existing + * TaskEvent handler writes the cancellation event row — the only side + * effect PG-side cancel has today per audit. + * + * Idempotent: if a row with the same friendlyId already exists (double + * drainer pop after requeue), Prisma's P2002 unique-constraint violation + * is caught and the existing row is returned. The duplicate runCancelled + * emission is skipped — the original drain's emit already wrote the + * TaskEvent. + */ + async createCancelledRun( + { + snapshot, + cancelledAt, + cancelReason, + }: { + snapshot: TriggerParams; + cancelledAt: Date; + cancelReason: string; + }, + tx?: PrismaClientOrTransaction, + ): Promise { + const prisma = tx ?? this.prisma; + return startSpan(this.tracer, "createCancelledRun", async (span) => { + span.setAttribute("friendlyId", snapshot.friendlyId); + span.setAttribute("taskIdentifier", snapshot.taskIdentifier); + const id = RunId.fromFriendlyId(snapshot.friendlyId); + const error: TaskRunError = { type: "STRING_ERROR", raw: cancelReason }; + + try { + const taskRun = await prisma.taskRun.create({ + data: { + id, + engine: "V2", + status: "CANCELED", + friendlyId: snapshot.friendlyId, + runtimeEnvironmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + organizationId: snapshot.environment.organization.id, + projectId: snapshot.environment.project.id, + idempotencyKey: snapshot.idempotencyKey, + idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt, + idempotencyKeyOptions: snapshot.idempotencyKeyOptions, + taskIdentifier: snapshot.taskIdentifier, + payload: snapshot.payload, + payloadType: snapshot.payloadType, + context: snapshot.context, + traceContext: snapshot.traceContext, + traceId: snapshot.traceId, + spanId: snapshot.spanId, + parentSpanId: snapshot.parentSpanId, + lockedToVersionId: snapshot.lockedToVersionId, + taskVersion: snapshot.taskVersion, + sdkVersion: snapshot.sdkVersion, + cliVersion: snapshot.cliVersion, + concurrencyKey: snapshot.concurrencyKey, + queue: snapshot.queue, + lockedQueueId: snapshot.lockedQueueId, + workerQueue: snapshot.workerQueue, + isTest: snapshot.isTest, + taskEventStore: snapshot.taskEventStore, + // Defensive: the snapshot comes from a cjson-encoded buffer + // payload, where empty Lua tables encode as `{}` not `[]`. If + // the drainer pops a buffered run with no tags, snapshot.tags + // will be an empty object, which Prisma misreads as a relation + // update op. Normalise to a real array (or undefined for the + // empty case). + runTags: Array.isArray(snapshot.tags) && snapshot.tags.length > 0 + ? snapshot.tags + : undefined, + oneTimeUseToken: snapshot.oneTimeUseToken, + parentTaskRunId: snapshot.parentTaskRunId, + rootTaskRunId: snapshot.rootTaskRunId, + replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId, + batchId: snapshot.batch?.id, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion, + depth: snapshot.depth, + seedMetadata: snapshot.seedMetadata, + seedMetadataType: snapshot.seedMetadataType, + metadata: snapshot.metadata, + metadataType: snapshot.metadataType, + machinePreset: snapshot.machine, + scheduleId: snapshot.scheduleId, + scheduleInstanceId: snapshot.scheduleInstanceId, + createdAt: snapshot.createdAt, + bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined, + planType: snapshot.planType, + realtimeStreamsVersion: snapshot.realtimeStreamsVersion, + streamBasinName: snapshot.streamBasinName, + annotations: snapshot.annotations, + completedAt: cancelledAt, + updatedAt: cancelledAt, + error: error as unknown as Prisma.InputJsonValue, + attemptNumber: 0, + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run cancelled before materialisation", + runStatus: "CANCELED", + environmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + projectId: snapshot.environment.project.id, + organizationId: snapshot.environment.organization.id, + }, + }, + }, + }); + + this.eventBus.emit("runCancelled", { + time: cancelledAt, + run: { + id: taskRun.id, + status: taskRun.status, + friendlyId: taskRun.friendlyId, + spanId: taskRun.spanId, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + error, + updatedAt: taskRun.updatedAt, + attemptNumber: taskRun.attemptNumber ?? 0, + }, + organization: { id: snapshot.environment.organization.id }, + project: { id: snapshot.environment.project.id }, + environment: { id: snapshot.environment.id }, + }); + + return taskRun; + } catch (err) { + // P2002 = unique constraint violation. Double-pop after a drainer + // requeue can reach this. Idempotent: return the existing row + // without re-emitting. + if ( + err instanceof Prisma.PrismaClientKnownRequestError && + err.code === "P2002" + ) { + this.logger.info( + "createCancelledRun: row already exists, returning existing (idempotent)", + { friendlyId: snapshot.friendlyId }, + ); + const existing = await prisma.taskRun.findFirst({ where: { id } }); + if (existing) { + // Only treat the conflict as idempotent when the existing + // row is ALREADY canceled. If a non-canceled row landed + // first (e.g. the drainer's normal `engine.trigger` replay + // path raced ahead of the cancel) we surface a conflict + // rather than silently reporting "cancelled" — the run is + // genuinely live and the caller must decide between + // engine.cancelRun() and skipping. + if (existing.status === "CANCELED") { + return existing; + } + throw new Error( + `createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}`, + ); + } + } + throw err; + } + }); + } + /** "Triggers" one run. */ async trigger( { @@ -881,6 +1052,7 @@ export class RunEngine { taskEventStore, queue: queueOverride, lockedQueueId: lockedQueueIdOverride, + emitRunFailedEvent = true, }: { friendlyId: string; environment: { @@ -908,6 +1080,19 @@ export class RunEngine { queue?: string; /** Resolved TaskQueue.id when the task is locked to a specific queue. */ lockedQueueId?: string; + /** + * Whether to emit the `runFailed` engine-bus event. Defaults to true. + * + * Set to `false` when the caller is ALREADY managing the trace-event + * lifecycle for this run via `repository.traceEvent({ incomplete: false, + * isError: true, ... })`. In that path the outer trace event handles + * span completion itself; emitting `runFailed` from here causes the + * `runFailed` → `completeFailedRunEvent` handler to write a second + * completion row for the same (traceId, spanId), racing with the + * outer trace event's own write. The alert side of `runFailed` is + * preserved by emitting from the caller after `traceEvent` returns. + */ + emitRunFailedEvent?: boolean; }): Promise { return startSpan( this.tracer, @@ -983,6 +1168,57 @@ export class RunEngine { }); } + // Emit `runFailed` so the alert pipeline picks up the + // SYSTEM_FAILURE row and the event-store handler writes the + // completion event into the trace. Without this the mollifier + // drainer's terminal failures (and batch-trigger's + // exceed-limit failures) land in PG silently — visible in the + // dashboard list but never reaching customers' configured + // ERROR alert channels. + // + // Gated by `emitRunFailedEvent` so call sites that already wrap + // this inside `repository.traceEvent({ incomplete: false, + // isError: true })` can opt out — the outer trace event writes + // the completion row itself, and a second write via + // `completeFailedRunEvent` would race against it. Callers that + // disable the emit are responsible for triggering the alerts + // side themselves (e.g. by calling + // `PerformTaskRunAlertsService.enqueue` directly after the + // trace event closes). + if (!emitRunFailedEvent) { + return taskRun; + } + this.eventBus.emit("runFailed", { + time: taskRun.completedAt ?? new Date(), + run: { + id: taskRun.id, + status: taskRun.status, + spanId: taskRun.spanId, + error, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + updatedAt: taskRun.updatedAt, + // This row never attempted execution — it's a synthesised + // terminal failure. The alert payload's `attemptNumber=0` + // is the signal downstream consumers can use to + // distinguish a never-ran failure from a run that + // exhausted its retries. + attemptNumber: 0, + usageDurationMs: 0, + costInCents: 0, + }, + organization: { + id: environment.organization.id, + }, + project: { + id: environment.project.id, + }, + environment: { + id: environment.id, + }, + }); + return taskRun; }, { diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts new file mode 100644 index 0000000000..f825cdcf8c --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -0,0 +1,294 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +function freshRunId() { + return RunId.generate().friendlyId; +} +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import type { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function baseEngineOptions(redisOptions: Parameters[0]["queue"]["redis"]) { + return { + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x" as const, + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }; +} + +// engine.createCancelledRun writes a CANCELED +// TaskRun row directly from a buffer snapshot. Verifies the bypass- +// queue / bypass-waitpoint / emit-runCancelled contract. +describe("RunEngine.createCancelledRun", () => { + containerTest( + "writes CANCELED PG row with snapshot fields, completedAt, error", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const cancelledAt = new Date("2026-05-20T12:00:00.000Z"); + const cancelReason = "Canceled by user"; + + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: '{"hello":"world"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: ["test-tag"], + }, + cancelledAt, + cancelReason, + }); + + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + expect(result.id).toBe(RunId.fromFriendlyId(friendlyId)); + expect(result.completedAt?.toISOString()).toBe(cancelledAt.toISOString()); + expect(result.taskIdentifier).toBe("test-task"); + expect(result.runTags).toEqual(["test-tag"]); + expect(result.payload).toBe('{"hello":"world"}'); + const err = result.error as { type?: string; raw?: string }; + expect(err.type).toBe("STRING_ERROR"); + expect(err.raw).toBe(cancelReason); + + // Verify the PG row is canonical (findFirst returns the row). + const stored = await prisma.taskRun.findFirst({ + where: { friendlyId }, + }); + expect(stored).not.toBeNull(); + expect(stored!.status).toBe("CANCELED"); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "emits runCancelled with correct payload", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const cancelledAt = new Date(); + const cancelReason = "Test cancel"; + const friendlyId = freshRunId(); + await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000cccc000000000000", + spanId: "dddd000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt, + cancelReason, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]!.run.status).toBe("CANCELED"); + expect(captured[0]!.run.friendlyId).toBe(friendlyId); + expect(captured[0]!.run.error).toEqual({ type: "STRING_ERROR", raw: cancelReason }); + expect(captured[0]!.organization.id).toBe(env.organization.id); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "idempotent on double-pop: second call returns existing row without re-emitting", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const snapshot = { + friendlyId: freshRunId(), + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000eeee000000000000", + spanId: "ffff000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }; + const cancelledAt = new Date(); + const cancelReason = "Test idempotent"; + + const first = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + const second = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + + expect(second.id).toBe(first.id); + // Only the first call's emit fired; the P2002 path skips re-emission. + expect(captured).toHaveLength(1); + } finally { + await engine.quit(); + } + }, + ); + + // Regression: cjson encodes empty Lua tables as `{}`, not `[]`. When + // the drainer pops a buffered run that never had a tag set, the + // deserialised snapshot's `tags` field is an empty object. The old + // implementation passed it straight into Prisma's `runTags:` field; + // Prisma misread the object as a relation update op and threw + // `Argument 'set' is missing`. The drainer caught the error and + // marked the buffer entry FAILED — so the CANCELED PG row never + // landed. + containerTest( + "tolerates snapshot.tags being an empty object (cjson edge case)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + // Cast through unknown to simulate the cjson-decode output shape + // for an empty Lua table — TypeScript's snapshot type says + // string[], but the buffer Lua delivers {} for the empty case. + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000abcd000000000000", + spanId: "1234000000000000", + queue: "task/test-task", + isTest: false, + tags: {} as unknown as string[], + }, + cancelledAt: new Date(), + cancelReason: "Cancelled — empty tags", + }); + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + // Prisma normalises the absent-tags case to either [] or null + // depending on the column default; assert it's an empty array. + expect(result.runTags).toEqual([]); + } finally { + await engine.quit(); + } + }, + ); + + // Regression: the P2002-on-id idempotency path used to return ANY + // existing row, which would silently report success even if a live + // (non-CANCELED) row landed first. The guard now requires the + // existing row's status to be CANCELED; anything else surfaces a + // conflict so the caller can route to engine.cancelRun() or skip. + containerTest( + "P2002 conflict with non-CANCELED existing row throws (does not silently succeed)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const id = RunId.fromFriendlyId(friendlyId); + + // Plant a live (non-CANCELED) row with the same id so the + // cancelled-run INSERT hits P2002 and the guard finds a row + // that ISN'T CANCELED. + await prisma.taskRun.create({ + data: { + id, + friendlyId, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + status: "PENDING", + runtimeEnvironmentId: env.id, + projectId: env.project.id, + organizationId: env.organizationId, + queue: "task/test-task", + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + engine: "V2", + }, + }); + + await expect( + engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt: new Date(), + cancelReason: "Should not silently overwrite a live row", + }), + ).rejects.toThrow(/createCancelledRun conflict.*PENDING/); + } finally { + await engine.quit(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts new file mode 100644 index 0000000000..84d33baa87 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts @@ -0,0 +1,176 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunEngine.createFailedTaskRun", () => { + containerTest("emits runFailed so the alert pipeline wakes up", async ({ prisma, redisOptions }) => { + // The mollifier drainer (and batch-trigger over-limit path) call + // createFailedTaskRun to write a terminal SYSTEM_FAILURE PG row + // for runs that never actually executed. Without an explicit + // runFailed emit, the row lands silently — the + // runEngineHandlers' `runFailed` listener (which enqueues + // PerformTaskRunAlertsService) never fires, so customers' + // configured TASK_RUN alert channels miss the failure entirely. + // + // Regression intent: if the emit is removed or moved out of + // createFailedTaskRun's success path, this test fails. The + // shape assertions pin the fields the alert delivery service + // reads from the event payload (run.id, run.status, error, + // attemptNumber=0 as the never-ran-marker). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const taskIdentifier = "drainer-terminal-test"; + + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + error: { + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + }); + + expect(failed.status).toBe("SYSTEM_FAILURE"); + + expect(failedEvents).toHaveLength(1); + const event = failedEvents[0]; + expect(event.run.id).toBe(failed.id); + expect(event.run.status).toBe("SYSTEM_FAILURE"); + expect(event.run.spanId).toBe("fedcba9876543210"); + // attemptNumber=0 is the marker that the run never executed — + // it's a synthesised terminal failure, not an exhausted-retries + // failure. Downstream consumers can use this to distinguish. + expect(event.run.attemptNumber).toBe(0); + expect(event.run.usageDurationMs).toBe(0); + expect(event.run.costInCents).toBe(0); + expect(event.run.error).toEqual({ + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }); + expect(event.organization.id).toBe(authenticatedEnvironment.organization.id); + expect(event.project.id).toBe(authenticatedEnvironment.project.id); + expect(event.environment.id).toBe(authenticatedEnvironment.id); + } finally { + await engine.quit(); + } + }); + + // The TriggerFailedTaskService.call() path wraps createFailedTaskRun + // inside `repository.traceEvent({ incomplete: false, isError: true })` + // which already writes the completion row for the (traceId, spanId). + // Emitting `runFailed` from here would cause the + // `completeFailedRunEvent` handler to race a second write against + // the same span — the `emitRunFailedEvent: false` opt-out is what + // suppresses the emit. The PG row + alert side stay correct because + // the caller enqueues `PerformTaskRunAlertsService.enqueue(run.id)` + // directly after the trace event closes. + containerTest( + "emitRunFailedEvent: false suppresses the bus emit but still creates the PG row", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50 }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier: "outer-trace-event-test", + payload: "{}", + payloadType: "application/json", + error: { type: "STRING_ERROR", raw: "outer trace event manages span" }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + emitRunFailedEvent: false, + }); + + // PG row landed (caller still gets a usable TaskRun). + expect(failed.status).toBe("SYSTEM_FAILURE"); + expect(failed.friendlyId).toBe(friendlyId); + + // Bus emit was suppressed. + expect(failedEvents).toHaveLength(0); + } finally { + await engine.quit(); + } + }, + ); +});