diff --git a/.github/workflows/publish-preview.yml b/.github/workflows/publish-preview.yml index 471d3dcee..911610aa6 100644 --- a/.github/workflows/publish-preview.yml +++ b/.github/workflows/publish-preview.yml @@ -19,7 +19,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (`@sqlite.org/sqlite-wasm`), + # making `bun i` produce a tree where tsgo could not resolve the + # type-only import in `packages/sqlite/src/storage/_sqlite/browser.ts`. + bun-version: "1.3.11" - run: bun i - run: bun run build - run: bunx pkg-pr-new publish './packages/util' './packages/storage' './packages/job-queue' './packages/task-graph' './packages/knowledge-base' './packages/tasks' './packages/ai' './packages/ai-provider' './packages/workglow' './examples/cli' diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8a4cee8a0..8304113df 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,7 +27,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - run: bun run build - name: Upload build artifacts @@ -67,7 +71,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -91,7 +99,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -115,7 +127,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -135,7 +151,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -155,7 +175,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -179,7 +203,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -207,7 +235,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -239,7 +271,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -271,7 +307,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -299,7 +339,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -327,7 +371,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download build artifacts uses: actions/download-artifact@v8 @@ -365,7 +413,11 @@ jobs: node-version: 24 - uses: oven-sh/setup-bun@v2 with: - bun-version: latest + # Pinned: a recent `latest` regressed the isolated-linker symlink + # creation for scoped packages with periods (e.g. `@sqlite.org/sqlite-wasm`), + # producing a tree where tsgo cannot resolve type-only imports in + # `packages/sqlite/src/storage/_sqlite/browser.ts`. Bump deliberately. + bun-version: "1.3.11" - run: bun i - name: Download Vitest coverage fragments run: mkdir -p coverage-parts diff --git a/packages/ai/src/provider/AiProviderRegistry.ts b/packages/ai/src/provider/AiProviderRegistry.ts index a773bd906..fc5bef94a 100644 --- a/packages/ai/src/provider/AiProviderRegistry.ts +++ b/packages/ai/src/provider/AiProviderRegistry.ts @@ -6,11 +6,7 @@ import { TaskInput, TaskOutput } from "@workglow/task-graph"; import type { StreamEvent } from "@workglow/task-graph"; -import { - createServiceToken, - globalServiceRegistry, - WORKER_MANAGER, -} from "@workglow/util/worker"; +import { createServiceToken, globalServiceRegistry, WORKER_MANAGER } from "@workglow/util/worker"; import type { JsonSchema, ServiceRegistry } from "@workglow/util/worker"; import { DirectExecutionStrategy } from "../execution/DirectExecutionStrategy"; import type { IAiExecutionStrategy, AiStrategyResolver } from "../execution/IAiExecutionStrategy"; diff --git a/packages/knowledge-base/src/knowledge-base/ScopedTabularStorage.ts b/packages/knowledge-base/src/knowledge-base/ScopedTabularStorage.ts index 0455509fe..6bc87581b 100644 --- a/packages/knowledge-base/src/knowledge-base/ScopedTabularStorage.ts +++ b/packages/knowledge-base/src/knowledge-base/ScopedTabularStorage.ts @@ -226,6 +226,45 @@ export class ScopedTabularStorage< }, options); } + withTransaction(fn: (tx: this) => Promise): Promise { + // Two responsibilities here: + // 1. Forward the inner's `tx` proxy into `fn`, so writes from the + // callback go through the transaction-bound resources rather than + // re-entering the public mutex path on the parent (which would + // deadlock on single-connection backends, or run on the wrong + // connection on real `pg.Pool`). + // 2. Defer events emitted by the scoped wrapper until the inner + // transaction actually commits — Scoped emits on its own emitter + // (separate from the inner's `put` events), so the inner's + // deferred-event flush does not cover them. + const deferred: Array<[TabularEventName, unknown[]]> = []; + return this.inner + .withTransaction((innerTx: AnyTabularStorage) => { + const txWrapper = new ScopedTabularStorage< + Schema, + PrimaryKeyNames, + Entity, + PrimaryKey, + InsertType + >(innerTx, this.kbId); + // Override the tx wrapper's emitter to buffer instead of fan out; + // listeners on the original wrapper see events only after COMMIT. + (txWrapper as unknown as { events: { emit: (n: string, ...a: unknown[]) => void } }).events = + { + emit: (name: string, ...args: unknown[]) => { + deferred.push([name as TabularEventName, args]); + }, + }; + return fn(txWrapper as unknown as this); + }) + .then((result) => { + for (const [name, args] of deferred) { + (this.events.emit as (n: TabularEventName, ...a: unknown[]) => void)(name, ...args); + } + return result; + }); + } + // Lifecycle — no-op for shared storage async setupDatabase(): Promise { // No-op: shared storage lifecycle is managed externally diff --git a/packages/mcp/src/util/_server-registry/McpServerRegistry.ts b/packages/mcp/src/util/_server-registry/McpServerRegistry.ts index 7ccbdad73..bf5c9e5f6 100644 --- a/packages/mcp/src/util/_server-registry/McpServerRegistry.ts +++ b/packages/mcp/src/util/_server-registry/McpServerRegistry.ts @@ -111,14 +111,8 @@ function compactMcpServer(value: unknown): string | undefined { * Registers the MCP server default factories and the "mcp-server" input resolver/compactor * on the given registry. Called by `bootstrapWorkglow` and `createOrchestrationContext`. */ -export function registerMcpServerDefaults( - registry: ServiceRegistry = globalServiceRegistry -): void { - registry.registerIfAbsent( - MCP_SERVERS, - (): Map => new Map(), - true - ); +export function registerMcpServerDefaults(registry: ServiceRegistry = globalServiceRegistry): void { + registry.registerIfAbsent(MCP_SERVERS, (): Map => new Map(), true); registry.registerIfAbsent( MCP_SERVER_REPOSITORY, (): McpServerRepository => new InMemoryMcpServerRepository(), diff --git a/packages/postgres/src/storage/PostgresTabularStorage.ts b/packages/postgres/src/storage/PostgresTabularStorage.ts index 12c04ee6a..0e32f32e6 100644 --- a/packages/postgres/src/storage/PostgresTabularStorage.ts +++ b/packages/postgres/src/storage/PostgresTabularStorage.ts @@ -494,16 +494,13 @@ export class PostgresTabularStorage< } /** - * Stores or updates a row in the database. - * Uses UPSERT (INSERT ... ON CONFLICT DO UPDATE) for atomic operations. - * - * @param entity - The entity to store (may be missing auto-generated keys) - * @returns The entity with any server-generated fields updated - * @emits "put" event with the updated entity when successful + * Builds the parameterized `INSERT … ON CONFLICT … RETURNING *` SQL for a + * single entity, applying the auto-generated-key policy. Extracted so + * {@link put} and {@link putBulk} can share the column/parameter logic + * while routing the actual query through different connections (the pool + * vs. a transaction-bound client). */ - async put(entity: InsertType): Promise { - const db = this.db; - + private buildPutSql(entity: InsertType): { sql: string; params: ValueOptionType[] } { // Determine which columns to include in INSERT const columnsToInsert: string[] = []; const paramsToInsert: ValueOptionType[] = []; @@ -568,7 +565,7 @@ export class PostgresTabularStorage< valueColumns.length > 0 ? ` ON CONFLICT (${this.primaryKeyColumnList('"')}) DO UPDATE - SET + SET ${(valueColumns as string[]) .map((col) => { const colIdx = columnsToInsert.indexOf(String(col)); @@ -585,105 +582,367 @@ export class PostgresTabularStorage< RETURNING * `; - const params = paramsToInsert; - const result = await db.query(sql, params); + return { sql, params: paramsToInsert }; + } - const updatedEntity = result.rows[0] as Entity; - // Convert blob fields from SQL to JS values - const updatedRecord = updatedEntity as Record; + /** Hydrate a row returned by Postgres back into entity-shaped JS values. */ + private hydrateRow(row: unknown): Entity { + const entity = row as Entity; + const record = entity as Record; for (const key in this.schema.properties) { - updatedRecord[key] = this.sqlToJsValue(key, updatedRecord[key] as ValueOptionType); + record[key] = this.sqlToJsValue(key, record[key] as ValueOptionType); } + return entity; + } + + /** + * Acquires a client that BEGIN/COMMIT-style transactions can safely run on. + * + * - Real `pg.Pool` exposes `connect()`, which dedicates a client for the + * transaction duration. + * - PGlite-style single-connection wrappers (PGLitePool, raw PGlite) + * serialize every `query()` through one underlying session, so issuing + * the BEGIN/COMMIT pair directly on `this.db.query` is equivalent. + * - Anything else — a custom no-`connect()` adapter that fans queries + * across sessions — would dispatch the BEGIN and the bracketed queries + * to different sessions, silently breaking atomicity. We refuse to + * operate on it rather than provide a false guarantee. + * + * Whitelist mirrors {@link + * PostgresRateLimiterStorage.tryReserveExecution}: the rate limiter has + * the same atomicity requirement, and divergence here would let a pool + * type be safe in one place and unsafe in another. + * + * Caller MUST invoke `release()` on the returned client. The no-op release + * for single-connection wrappers is safe. + */ + private async acquireConnection(): Promise<{ + query: Pool["query"]; + release: () => void; + }> { + const supportsConnect = + typeof (this.db as unknown as { connect?: unknown }).connect === "function"; + if (supportsConnect) { + return await ( + this.db as unknown as { + connect: () => Promise<{ query: Pool["query"]; release: () => void }>; + } + ).connect(); + } + + // Without connect() we route BEGIN/COMMIT through this.db.query directly. + // That is only safe if every query funnels through one underlying session. + // Recognize: + // - PGLitePool — our own wrapper class; constructor name preserved. + // - PGlite — third-party, ships minified so `constructor.name` can + // be obfuscated (observed: "q"). Detect via duck-typing on methods + // PGlite uniquely exposes (`waitReady` Promise + `exec`). + const dbAny = this.db as unknown as { + waitReady?: unknown; + exec?: unknown; + constructor?: { name?: string }; + }; + const ctorName = dbAny.constructor?.name; + const looksLikePGlite = typeof dbAny.exec === "function" && dbAny.waitReady !== undefined; + const looksLikePGLitePool = ctorName === "PGLitePool"; + if (!looksLikePGlite && !looksLikePGLitePool) { + throw new Error( + `PostgresTabularStorage.putBulk requires a pg.Pool with connect() or a known single-connection wrapper (PGLitePool, PGlite); got ${ctorName ?? typeof this.db}. A multi-connection pool without connect() would dispatch BEGIN and the bracketed INSERTs to different sessions, breaking atomicity.` + ); + } + return { + query: this.db.query.bind(this.db) as Pool["query"], + release: () => {}, + }; + } + + /** + * Per-instance promise-chain mutex. Only meaningful on single-connection + * backends (PGlite / PGLitePool) — there `withTransaction` runs on the + * shared session and we need to keep external callers from slipping into + * the open transaction. On a real `pg.Pool` (anything exposing + * `connect()`) the mutex would serialize independent reads/writes that + * Postgres is happy to fan across separate pool clients, turning the + * pool's main benefit into a per-instance bottleneck — so we short-circuit + * to a no-op on that path. Real-pool isolation comes from + * `withTransaction` dedicating its own client via `pool.connect()`. + * + * The Proxy returned by {@link createTxView} routes back to the private + * `_*Internal` methods directly, so calls made *through* the `tx` handle + * inside `fn` do not deadlock against the mutex held by `withTransaction`. + */ + private mutexChain: Promise = Promise.resolve(); + private get serializeOps(): boolean { + return typeof (this.db as unknown as { connect?: unknown }).connect !== "function"; + } + private async mutex(fn: () => Promise): Promise { + if (!this.serializeOps) return fn(); + const prev = this.mutexChain; + let release!: () => void; + this.mutexChain = new Promise((resolve) => { + release = resolve; + }); + await prev; + try { + return await fn(); + } finally { + release(); + } + } - this.events.emit("put", updatedEntity); + /** + * True while the parent instance is between `BEGIN` and `COMMIT`/`ROLLBACK`. + * Used only to fail fast when `fn` captures the *original* storage instead + * of the `tx` handle and tries to recursively call `withTransaction` — + * that would deadlock against its own mutex on the PGlite path. Calls + * routed through `tx` hit the proxy's `withTransaction` override before + * reaching here. + */ + private inTransaction: boolean = false; + + /** + * Emits a `put` event. Overridden on the {@link createTxView} proxy to + * push into a per-transaction buffer instead, so listeners never observe + * rows that are about to roll back. + */ + protected emitPut(entity: Entity): void { + this.events.emit("put", entity); + } + + /** + * Stores or updates a row in the database. + * Uses UPSERT (INSERT ... ON CONFLICT DO UPDATE) for atomic operations. + * + * @param entity - The entity to store (may be missing auto-generated keys) + * @returns The entity with any server-generated fields updated + * @emits "put" event with the updated entity when successful (deferred until + * commit if inside a {@link withTransaction}) + */ + async put(entity: InsertType): Promise { + return this.mutex(() => this._putInternal(entity)); + } + + private async _putInternal(entity: InsertType): Promise { + const { sql, params } = this.buildPutSql(entity); + const result = await this.db.query(sql, params); + const updatedEntity = this.hydrateRow(result.rows[0]); + this.emitPut(updatedEntity); return updatedEntity; } /** - * Stores multiple rows in the database in a bulk operation. - * Uses individual put calls to ensure auto-generated keys are handled correctly. + * Stores multiple rows atomically inside a single `BEGIN` / `COMMIT`. All + * inserts share one connection, which (a) reduces pool churn vs. a + * `Promise.all` fan-out, (b) gives all-or-nothing semantics, and (c) lets + * Postgres group the writes into a single commit with one fsync rather than + * one per row. * - * @param entities - Array of entities to store (may be missing auto-generated keys) - * @returns Array of entities with any server-generated fields updated - * @emits "put" event for each entity stored + * Per-row UPSERT — instead of a multi-VALUES insert — keeps the + * auto-generated key policy and `RETURNING *` shape identical to {@link put} + * and stays correct when a batch mixes rows that include the auto-gen key + * with rows that omit it. + * + * `put` events are deferred until after `COMMIT` so listeners do not see + * rows that are about to roll back. When this call is nested inside a + * {@link withTransaction}, deferral extends to that outer commit. */ async putBulk(entities: InsertType[]): Promise { - if (entities.length === 0) return []; - - // Use individual put calls to ensure auto-generated keys are handled correctly - return await Promise.all(entities.map((entity) => this.put(entity))); + return this.mutex(() => this._putBulkInternal(entities)); + } - /* Original bulk implementation - keeping for reference but using simpler approach above + private async _putBulkInternal(entities: InsertType[]): Promise { if (entities.length === 0) return []; - const db = this.db; + // Already inside an outer transaction (called via the `tx` view inside + // `withTransaction`)? Skip our own BEGIN/COMMIT — Postgres `BEGIN` + // inside an active transaction is a warning + no-op, and the inner + // `COMMIT` would commit the OUTER transaction prematurely. Run the + // inserts directly on `this.db`, which the proxy has already swapped to + // the transaction-bound client (real pool) or the shared session + // (PGlite). + if (this.inTransaction) { + const updated: Entity[] = []; + for (const entity of entities) { + const { sql, params } = this.buildPutSql(entity); + const result = await this.db.query(sql, params); + updated.push(this.hydrateRow(result.rows[0])); + } + for (const entity of updated) this.emitPut(entity); + return updated; + } - // Prepare all parameters and build VALUES clause - const allParams: any[] = []; - const valuesPerRow = this.primaryKeyColumns().length + this.valueColumns().length; - let paramIndex = 1; + const conn = await this.acquireConnection(); + const updatedEntities: Entity[] = []; + try { + await conn.query("BEGIN"); + try { + for (const entity of entities) { + const { sql, params } = this.buildPutSql(entity); + const result = await conn.query(sql, params); + updatedEntities.push(this.hydrateRow(result.rows[0])); + } + await conn.query("COMMIT"); + } catch (err) { + try { + await conn.query("ROLLBACK"); + } catch { + // prefer the original error if rollback fails + } + throw err; + } + } finally { + conn.release(); + } - // Build the VALUES clauses - one for each entity - const valuesClauses = entities - .map((entity) => { - const { key, value } = this.separateKeyValueFromCombined(entity); - const primaryKeyParams = this.getPrimaryKeyAsOrderedArray(key); - const valueParams = this.getValueAsOrderedArray(value); - const entityParams = [...primaryKeyParams, ...valueParams]; - - // Add all parameters for this entity to the flat array - allParams.push(...entityParams); - - // Create placeholders for this row using PostgreSQL $1, $2, etc. - const placeholders = Array(valuesPerRow) - .fill(0) - .map(() => `$${paramIndex++}`) - .join(", "); - return `(${placeholders})`; - }) - .join(", "); + for (const entity of updatedEntities) this.emitPut(entity); + return updatedEntities; + } - const sql = ` - INSERT INTO "${this.table}" ( - ${this.primaryKeyColumnList('"')} ${this.valueColumnList() ? ", " + this.valueColumnList('"') : ""} - ) - VALUES ${valuesClauses} - ${ - !this.valueColumnList() - ? "" - : ` - ON CONFLICT (${this.primaryKeyColumnList('"')}) DO UPDATE - SET - ${(this.valueColumns() as string[]) - .map((col) => { - // For the UPDATE part, we need to reference the excluded values - return `"${col}" = EXCLUDED."${col}"`; - }) - .join(", ")} - ` - } - RETURNING * - `; + /** + * Build a Proxy view of `this` for the `withTransaction` callback. The + * proxy: + * + * - Swaps `db` for the transaction-bound handle so every query inside + * `fn` runs on it: the dedicated client returned by `pool.connect()` + * for a real `pg.Pool`, or the shared session for PGlite/PGLitePool. + * - Routes any public method `foo` whose private sibling `_fooInternal` + * exists to that sibling, so calls made through `tx` bypass the + * mutex (PGlite path) and do not deadlock. The naming convention is + * the only sync mechanism — adding a public method with a matching + * `_fooInternal` is enough; no explicit map to keep in step. + * - Reports `inTransaction === true`, which is what + * {@link _putBulkInternal} keys off to skip its own BEGIN/COMMIT + * and run on the swapped `db` directly. + * - Overrides {@link emitPut} to queue events on a per-transaction + * buffer; the outer `withTransaction` flushes that buffer after + * `COMMIT` (or discards on `ROLLBACK`). + * - Throws on nested `withTransaction` — Postgres has no autonomous + * `BEGIN`. Use SAVEPOINT directly for nested rollback boundaries. + */ + private createTxView( + txDb: { query: Pool["query"] }, + deferredPutEvents: Entity[] + ): this { + const target = this; + return new Proxy(target, { + get(t, prop, receiver) { + if (prop === "withTransaction") { + return () => { + throw new Error( + "PostgresTabularStorage.withTransaction does not support nesting. " + + "Use SAVEPOINT directly or refactor to a single transaction." + ); + }; + } + if (prop === "db") return txDb; + if (prop === "inTransaction") return true; + if (prop === "emitPut") { + return (entity: Entity) => deferredPutEvents.push(entity); + } + if (typeof prop === "string") { + const internal = (t as unknown as Record)[`_${prop}Internal`]; + if (typeof internal === "function") { + return (...args: unknown[]) => + (internal as (...a: unknown[]) => unknown).apply(receiver, args); + } + } + const value = Reflect.get(t, prop, receiver); + return typeof value === "function" ? value.bind(receiver) : value; + }, + }) as this; + } - const result = await db.query(sql, allParams); + /** + * Runs `fn` inside a single Postgres transaction. + * + * **Real `pg.Pool`** — acquires a dedicated client via `pool.connect()`, + * runs `BEGIN`/`COMMIT`/`ROLLBACK` on that client, and routes every + * query inside `fn` through it. The parent storage instance keeps fanning + * external traffic across the pool, so external callers run *in parallel* + * with the open transaction (no per-instance mutex). This is the natural + * Postgres concurrency model. + * + * **PGlite / PGLitePool** — single underlying session: the parent's mutex + * is acquired for the duration of `fn` so external callers queue behind + * the transaction instead of slipping into it. `BEGIN`/`COMMIT` run on + * the shared `this.db`. + * + * `put` events emitted from inside `fn` (whether via `tx.put`, + * `tx.putBulk`, or any other writer) are buffered on a per-transaction + * queue and flushed to the parent's event emitter after `COMMIT`. If `fn` + * throws, the buffer is discarded along with the rolled-back rows so + * listeners never observe writes that did not actually commit. + * + * Recursive calls — either through the original instance or through `tx` + * — throw rather than reusing the outer transaction implicitly. Use + * SAVEPOINT directly for nested rollback boundaries. + */ + override async withTransaction(fn: (tx: this) => Promise): Promise { + const supportsConnect = + typeof (this.db as unknown as { connect?: unknown }).connect === "function"; + + if (supportsConnect) { + // Real pg.Pool: dedicate a client to this transaction; the parent's + // pool stays available for external callers, who run in parallel on + // other clients. We deliberately do NOT set `this.inTransaction` here + // — it would make external `_putBulkInternal` calls short-circuit + // their own BEGIN/COMMIT even though they are not actually nested. + // Nested calls via a captured original `this` simply acquire another + // pool client and run as an independent transaction; Postgres handles + // the concurrency, so no nesting guard is required on this path. + const client = await ( + this.db as unknown as { + connect: () => Promise<{ query: Pool["query"]; release: () => void }>; + } + ).connect(); + try { + return await this.runInTransaction(fn, { query: client.query.bind(client) }); + } finally { + client.release(); + } + } - const updatedEntities = result.rows.map((row) => { - const entity = row as Entity; - // Convert blob fields from SQL to JS values - for (const key in this.schema.properties) { - // @ts-ignore - entity[key] = this.sqlToJsValue(key, entity[key]); + // PGlite/PGLitePool: single underlying session. Serialize against + // external callers via the mutex, and set `inTransaction` so that a + // nested `withTransaction` invoked via the original (rather than `tx`) + // throws instead of deadlocking on its own mutex. + if (this.inTransaction) { + throw new Error( + "PostgresTabularStorage.withTransaction does not support nesting. " + + "Use SAVEPOINT directly or refactor to a single transaction." + ); + } + return this.mutex(async () => { + this.inTransaction = true; + try { + return await this.runInTransaction(fn, { query: this.db.query.bind(this.db) }); + } finally { + this.inTransaction = false; } - return entity; }); + } - // Emit events for each entity - for (const entity of updatedEntities) { - this.events.emit("put", entity); + private async runInTransaction( + fn: (tx: this) => Promise, + txDb: { query: Pool["query"] } + ): Promise { + const deferredPutEvents: Entity[] = []; + await txDb.query("BEGIN"); + let result: T; + try { + result = await fn(this.createTxView(txDb, deferredPutEvents)); + await txDb.query("COMMIT"); + } catch (err) { + try { + await txDb.query("ROLLBACK"); + } catch { + // prefer the original error if rollback fails + } + throw err; } - - return updatedEntities; - */ + // Flush deferred events only on commit success. + for (const entity of deferredPutEvents) this.events.emit("put", entity); + return result; } /** @@ -694,6 +953,10 @@ export class PostgresTabularStorage< * @emits "get" event with the key when successful */ async get(key: PrimaryKey): Promise { + return this.mutex(() => this._getInternal(key)); + } + + private async _getInternal(key: PrimaryKey): Promise { const db = this.db; const whereClauses = (this.primaryKeyColumns() as string[]) .map((discriminatorKey, i) => `"${discriminatorKey}" = $${i + 1}`) @@ -725,6 +988,10 @@ export class PostgresTabularStorage< * @emits "delete" event with the key when successful */ async delete(value: PrimaryKey | Entity): Promise { + return this.mutex(() => this._deleteInternal(value)); + } + + private async _deleteInternal(value: PrimaryKey | Entity): Promise { const db = this.db; const { key } = this.separateKeyValueFromCombined(value as Entity); const whereClauses = (this.primaryKeyColumns() as string[]) @@ -742,6 +1009,10 @@ export class PostgresTabularStorage< * @returns Promise resolving to an array of entries or undefined if not found */ async getAll(options?: QueryOptions): Promise { + return this.mutex(() => this._getAllInternal(options)); + } + + private async _getAllInternal(options?: QueryOptions): Promise { this.validateGetAllOptions(options); const db = this.db; let sql = `SELECT * FROM "${this.table}"`; @@ -782,6 +1053,10 @@ export class PostgresTabularStorage< * @emits "clearall" event when successful */ async deleteAll(): Promise { + return this.mutex(() => this._deleteAllInternal()); + } + + private async _deleteAllInternal(): Promise { const db = this.db; await db.query(`DELETE FROM "${this.table}"`); this.events.emit("clearall"); @@ -793,6 +1068,10 @@ export class PostgresTabularStorage< * @returns Promise resolving to the count of stored items */ async size(): Promise { + return this.mutex(() => this._sizeInternal()); + } + + private async _sizeInternal(): Promise { const db = this.db; const result = await db.query(`SELECT COUNT(*) FROM "${this.table}"`); return parseInt(result.rows[0].count, 10); @@ -802,8 +1081,12 @@ export class PostgresTabularStorage< * Counts rows matching the specified search criteria. */ override async count(criteria?: SearchCriteria): Promise { + return this.mutex(() => this._countInternal(criteria)); + } + + private async _countInternal(criteria?: SearchCriteria): Promise { if (!criteria || Object.keys(criteria).length === 0) { - return await this.size(); + return await this._sizeInternal(); } this.validateQueryParams(criteria); @@ -823,6 +1106,10 @@ export class PostgresTabularStorage< * @returns Array of entities or undefined if no records found */ async getBulk(offset: number, limit: number): Promise { + return this.mutex(() => this._getBulkInternal(offset, limit)); + } + + private async _getBulkInternal(offset: number, limit: number): Promise { const db = this.db; const orderByClause = this.primaryKeyColumns() .map((col) => `"${String(col)}"`) @@ -894,6 +1181,10 @@ export class PostgresTabularStorage< * @param criteria - Object with column names as keys and values or SearchConditions */ async deleteSearch(criteria: DeleteSearchCriteria): Promise { + return this.mutex(() => this._deleteSearchInternal(criteria)); + } + + private async _deleteSearchInternal(criteria: DeleteSearchCriteria): Promise { const criteriaKeys = Object.keys(criteria) as Array; if (criteriaKeys.length === 0) { return; @@ -915,6 +1206,13 @@ export class PostgresTabularStorage< async query( criteria: SearchCriteria, options?: QueryOptions + ): Promise { + return this.mutex(() => this._queryInternal(criteria, options)); + } + + private async _queryInternal( + criteria: SearchCriteria, + options?: QueryOptions ): Promise { this.validateQueryParams(criteria, options); const db = this.db; @@ -966,6 +1264,13 @@ export class PostgresTabularStorage< override async queryIndex( criteria: SearchCriteria, options: CoveringIndexQueryOptions + ): Promise[]> { + return this.mutex(() => this._queryIndexInternal(criteria, options)); + } + + private async _queryIndexInternal( + criteria: SearchCriteria, + options: CoveringIndexQueryOptions ): Promise[]> { this.validateSelect(options); this.validateQueryParams(criteria, options); diff --git a/packages/sqlite/src/storage/SqliteTabularStorage.ts b/packages/sqlite/src/storage/SqliteTabularStorage.ts index 42e37f11d..dc5876cf9 100644 --- a/packages/sqlite/src/storage/SqliteTabularStorage.ts +++ b/packages/sqlite/src/storage/SqliteTabularStorage.ts @@ -382,12 +382,18 @@ export class SqliteTabularStorage< } /** - * Stores a key-value pair in the database - * @param entity - The entity to store (may be missing auto-generated keys) - * @returns The entity with any server-generated fields updated - * @emits 'put' event when successful + * Synchronously inserts a single entity using prepared `INSERT OR REPLACE … + * RETURNING *`. Extracted so {@link putBulk} can call it inside a single + * `db.transaction(...)` — better-sqlite3 transactions require a sync body, + * and every step here (key generation, statement prep, bind, RETURNING) is + * synchronous in our SQLite drivers. + * + * @param entity - The entity to insert (may be missing auto-generated keys) + * @param emitEvent - When false, suppresses the `put` event so the caller + * can defer emission until the surrounding transaction + * has actually committed. */ - async put(entity: InsertType): Promise { + private executePutSync(entity: InsertType, emitEvent: boolean = true): Entity { const db = this.db; let entityToInsert = entity as unknown as Entity; @@ -570,96 +576,191 @@ export class SqliteTabularStorage< updatedRecord[k] = this.sqlToJsValue(k, updatedRecord[k] as ValueOptionType); } - this.events.emit("put", updatedEntity); + if (emitEvent) this.emitPut(updatedEntity); return updatedEntity; } /** - * Stores multiple key-value pairs in the database in a bulk operation - * @param entities - Array of entities to store (may be missing auto-generated keys) - * @returns Array of entities with any server-generated fields updated - * @emits 'put' event for each entity stored + * Emits a `put` event. Overridden on the {@link createTxView} proxy to push + * into a per-transaction buffer instead, so listeners never observe rows + * that are about to roll back. + */ + protected emitPut(entity: Entity): void { + this.events.emit("put", entity); + } + + /** + * Stores a key-value pair in the database + * @param entity - The entity to store (may be missing auto-generated keys) + * @returns The entity with any server-generated fields updated + * @emits 'put' event when successful (deferred until commit if inside withTransaction) + */ + async put(entity: InsertType): Promise { + return this.mutex(() => this._putInternal(entity)); + } + + private async _putInternal(entity: InsertType): Promise { + return this.executePutSync(entity); + } + + /** + * Stores multiple entities in a single SQL transaction. + * + * Wrapping the per-row `INSERT OR REPLACE … RETURNING *` calls in + * {@link Sqlite.Database.transaction} collapses N fsyncs into one + * `COMMIT`, which is the dominant cost of SQLite writes — typically a + * 100×–1000× speedup over Promise-fanned individual inserts. + * + * `put` events are deferred until after the transaction commits so listeners + * never observe rows that are about to roll back. When this call is nested + * inside a {@link withTransaction}, deferral extends to that outer commit. */ async putBulk(entities: InsertType[]): Promise { + return this.mutex(() => this._putBulkInternal(entities)); + } + + private async _putBulkInternal(entities: InsertType[]): Promise { if (entities.length === 0) return []; - // Use individual put calls to ensure auto-generated keys are handled correctly - // Each put() call will handle auto-generated keys appropriately - return await Promise.all(entities.map((entity) => this.put(entity))); + const updatedEntities: Entity[] = []; + const transaction = this.db.transaction((items: InsertType[]) => { + for (const item of items) { + updatedEntities.push(this.executePutSync(item, false)); + } + }); + transaction(entities); - /* Original bulk implementation - keeping for reference but using simpler approach above - const db = this.db; + for (const entity of updatedEntities) this.emitPut(entity); + return updatedEntities; + } - // For SQLite bulk inserts with RETURNING, we need to do them individually - // or use a transaction with multiple INSERT statements - const updatedEntities: Entity[] = []; + /** + * Per-instance promise-chain mutex. Every public read/write method awaits + * the previous holder before running and yields the lock when it settles; + * `withTransaction` holds it for the duration of the user's callback so + * concurrent calls from outside `fn` queue behind the transaction instead + * of slipping into it. + * + * The Proxy returned by {@link createTxView} routes back to the private + * `_*Internal` methods directly, so calls made *through* the `tx` handle + * inside `fn` do not deadlock against the mutex held by `withTransaction`. + */ + private mutexChain: Promise = Promise.resolve(); + private async mutex(fn: () => Promise): Promise { + const prev = this.mutexChain; + let release!: () => void; + this.mutexChain = new Promise((resolve) => { + release = resolve; + }); + await prev; + try { + return await fn(); + } finally { + release(); + } + } - // Use a transaction for better performance - const transaction = db.transaction((entitiesToInsert: any[]) => { - for (const entity of entitiesToInsert) { - const { key, value } = this.separateKeyValueFromCombined(entity); - const sql = ` - INSERT OR REPLACE INTO \`${ - this.table - }\` (${this.primaryKeyColumnList()} ${this.valueColumnList() ? ", " + this.valueColumnList() : ""}) - VALUES ( - ${this.primaryKeyColumns() - .map(() => "?") - .join(", ")} - ${ - this.valueColumns().length > 0 - ? ", " + - this.valueColumns() - .map(() => "?") - .join(", ") - : "" - } - ) - RETURNING * - `; - const stmt = db.prepare(sql); - const primaryKeyParams = this.getPrimaryKeyAsOrderedArray(key); - const valueParams = this.getValueAsOrderedArray(value); - const params = [...primaryKeyParams, ...valueParams]; - - // Ensure all params are SQLite-compatible (same validation as put method) - for (let i = 0; i < params.length; i++) { - let param = params[i]; - if (param === undefined) { - params[i] = null; - } else if (param !== null && typeof param === "object") { - // TypeScript now knows param is an object (not null), so we can use instanceof - const paramObj: object = param as object; - if ( - !(paramObj instanceof Uint8Array) && - (typeof Buffer === "undefined" || !(paramObj instanceof Buffer)) - ) { - params[i] = JSON.stringify(paramObj) as ValueOptionType; - } + /** + * Build a Proxy view of `this` for the `withTransaction` callback. The + * proxy: + * + * - Routes any public method `foo` whose private sibling `_fooInternal` + * exists to that sibling, so calls made *through* `tx` bypass the + * mutex held by `withTransaction` and do not deadlock. The naming + * convention is the only sync mechanism — adding a new public method + * with a matching `_fooInternal` is enough; no map to keep in step. + * - Overrides {@link emitPut} so events emitted inside `fn` queue on a + * per-transaction buffer instead of firing immediately, then get + * flushed by `withTransaction` after `COMMIT` (or discarded on + * `ROLLBACK`). + * - Throws on nested `withTransaction` — SQLite has no autonomous + * `BEGIN`. Use SAVEPOINT directly for nested rollback boundaries. + */ + private createTxView(deferredPutEvents: Entity[]): this { + const target = this; + return new Proxy(target, { + get(t, prop, receiver) { + if (prop === "withTransaction") { + return () => { + throw new Error( + "SqliteTabularStorage.withTransaction does not support nesting. " + + "Run nested rollback boundaries with SAVEPOINT directly, or refactor to a single transaction." + ); + }; + } + if (prop === "emitPut") { + return (entity: Entity) => deferredPutEvents.push(entity); + } + if (typeof prop === "string") { + const internal = (t as unknown as Record)[`_${prop}Internal`]; + if (typeof internal === "function") { + return (...args: unknown[]) => + (internal as (...a: unknown[]) => unknown).apply(receiver, args); } } + const value = Reflect.get(t, prop, receiver); + return typeof value === "function" ? value.bind(receiver) : value; + }, + }) as this; + } - // @ts-ignore - const updatedEntity = stmt.get(...params) as Entity; + /** + * Runs `fn` inside a single SQLite transaction. Uses raw `BEGIN` / + * `COMMIT` / `ROLLBACK` rather than {@link Sqlite.Database.transaction} + * because `fn` is async — better-sqlite3's transaction wrapper requires a + * synchronous body. + * + * Concurrent ops on the same storage instance from *outside* `fn` queue + * on this storage's mutex until the transaction commits or rolls back, so + * unrelated work cannot accidentally run inside the open transaction. + * The `tx` handle passed to `fn` is a Proxy that routes back to internal + * (unlocked) implementations, so calls *through* `tx` inside `fn` do not + * deadlock against the mutex. + * + * Nested `withTransaction` calls on `tx` throw rather than reusing the + * outer transaction implicitly. Use {@link Sqlite.Database} `SAVEPOINT`s + * directly if you need nested rollback boundaries. + */ + /** + * True while the parent instance is between `BEGIN` and `COMMIT`/`ROLLBACK`. + * Used only to fail fast when `fn` captures the *original* storage instead + * of the `tx` handle and tries to recursively call `withTransaction` — + * that would deadlock against its own mutex. Calls routed through `tx` + * hit the proxy's `withTransaction` override before reaching here. + */ + private inTransaction: boolean = false; - // Convert all columns according to schema - for (const k in this.schema.properties) { - // @ts-ignore - updatedEntity[k] = this.sqlToJsValue(k, updatedEntity[k]); + override async withTransaction(fn: (tx: this) => Promise): Promise { + if (this.inTransaction) { + throw new Error( + "SqliteTabularStorage.withTransaction does not support nesting. " + + "Run nested rollback boundaries with SAVEPOINT directly, or refactor to a single transaction." + ); + } + return this.mutex(async () => { + const deferredPutEvents: Entity[] = []; + this.inTransaction = true; + try { + this.db.exec("BEGIN"); + let result: T; + try { + result = await fn(this.createTxView(deferredPutEvents)); + this.db.exec("COMMIT"); + } catch (err) { + try { + this.db.exec("ROLLBACK"); + } catch { + // prefer the original error if rollback fails + } + throw err; } - - updatedEntities.push(updatedEntity); + // Flush deferred events only on commit success. + for (const entity of deferredPutEvents) this.events.emit("put", entity); + return result; + } finally { + this.inTransaction = false; } }); - - transaction(entities); - - for (const entity of updatedEntities) { - this.events.emit("put", entity); - } - - return updatedEntities; - */ } /** @@ -669,6 +770,10 @@ export class SqliteTabularStorage< * @emits 'get' event when successful */ async get(key: PrimaryKey): Promise { + return this.mutex(() => this._getInternal(key)); + } + + private async _getInternal(key: PrimaryKey): Promise { const db = this.db; const whereClauses = (this.primaryKeyColumns() as string[]) .map((key) => `\`${key}\` = ?`) @@ -700,6 +805,10 @@ export class SqliteTabularStorage< * @emits 'delete' event when successful */ async delete(key: PrimaryKey): Promise { + return this.mutex(() => this._deleteInternal(key)); + } + + private async _deleteInternal(key: PrimaryKey): Promise { const db = this.db; const whereClauses = (this.primaryKeyColumns() as string[]) .map((key) => `${key} = ?`) @@ -717,6 +826,10 @@ export class SqliteTabularStorage< * @returns Promise resolving to an array of entries or undefined if not found */ async getAll(options?: QueryOptions): Promise { + return this.mutex(() => this._getAllInternal(options)); + } + + private async _getAllInternal(options?: QueryOptions): Promise { this.validateGetAllOptions(options); const db = this.db; let sql = `SELECT * FROM \`${this.table}\``; @@ -759,6 +872,10 @@ export class SqliteTabularStorage< * @emits 'clearall' event when successful */ async deleteAll(): Promise { + return this.mutex(() => this._deleteAllInternal()); + } + + private async _deleteAllInternal(): Promise { const db = this.db; db.exec(`DELETE FROM \`${this.table}\``); this.events.emit("clearall"); @@ -769,6 +886,10 @@ export class SqliteTabularStorage< * @returns The count of entries */ async size(): Promise { + return this.mutex(() => this._sizeInternal()); + } + + private async _sizeInternal(): Promise { const db = this.db; const stmt = db.prepare(` SELECT COUNT(*) AS count FROM \`${this.table}\` @@ -780,8 +901,12 @@ export class SqliteTabularStorage< * Counts rows matching the specified search criteria. */ override async count(criteria?: SearchCriteria): Promise { + return this.mutex(() => this._countInternal(criteria)); + } + + private async _countInternal(criteria?: SearchCriteria): Promise { if (!criteria || Object.keys(criteria).length === 0) { - return await this.size(); + return await this._sizeInternal(); } this.validateQueryParams(criteria); @@ -800,6 +925,10 @@ export class SqliteTabularStorage< * @returns Array of entities or undefined if no records found */ async getBulk(offset: number, limit: number): Promise { + return this.mutex(() => this._getBulkInternal(offset, limit)); + } + + private async _getBulkInternal(offset: number, limit: number): Promise { const db = this.db; const orderByClause = this.primaryKeyColumns() .map((col) => `\`${String(col)}\``) @@ -869,6 +998,10 @@ export class SqliteTabularStorage< * @param criteria - Object with column names as keys and values or SearchConditions */ async deleteSearch(criteria: DeleteSearchCriteria): Promise { + return this.mutex(() => this._deleteSearchInternal(criteria)); + } + + private async _deleteSearchInternal(criteria: DeleteSearchCriteria): Promise { const criteriaKeys = Object.keys(criteria) as Array; if (criteriaKeys.length === 0) { return; @@ -892,6 +1025,13 @@ export class SqliteTabularStorage< async query( criteria: SearchCriteria, options?: QueryOptions + ): Promise { + return this.mutex(() => this._queryInternal(criteria, options)); + } + + private async _queryInternal( + criteria: SearchCriteria, + options?: QueryOptions ): Promise { this.validateQueryParams(criteria, options); const db = this.db; @@ -948,6 +1088,13 @@ export class SqliteTabularStorage< override async queryIndex( criteria: SearchCriteria, options: CoveringIndexQueryOptions + ): Promise[]> { + return this.mutex(() => this._queryIndexInternal(criteria, options)); + } + + private async _queryIndexInternal( + criteria: SearchCriteria, + options: CoveringIndexQueryOptions ): Promise[]> { this.validateSelect(options); this.validateQueryParams(criteria, options); diff --git a/packages/storage/src/tabular/BaseTabularStorage.ts b/packages/storage/src/tabular/BaseTabularStorage.ts index 02d9f9b84..49b95b8b0 100644 --- a/packages/storage/src/tabular/BaseTabularStorage.ts +++ b/packages/storage/src/tabular/BaseTabularStorage.ts @@ -727,6 +727,15 @@ export abstract class BaseTabularStorage< ); } + /** + * Runs `fn` inside a transaction. The default implementation provides no + * rollback semantics — it simply invokes `fn(this)`. Concrete subclasses + * with native transaction support (SQLite, PostgreSQL) override this. + */ + async withTransaction(fn: (tx: this) => Promise): Promise { + return await fn(this); + } + /** * Sets up the database/storage for the repository. * Must be called before using any other methods (except for in-memory implementations). diff --git a/packages/storage/src/tabular/CachedTabularStorage.ts b/packages/storage/src/tabular/CachedTabularStorage.ts index befb107f0..a37e78c08 100644 --- a/packages/storage/src/tabular/CachedTabularStorage.ts +++ b/packages/storage/src/tabular/CachedTabularStorage.ts @@ -328,6 +328,31 @@ export class CachedTabularStorage< await this.cache.deleteSearch(criteria); } + /** + * Runs `fn` inside the durable store's transaction. The cache layer is + * intentionally bypassed for the transaction's duration — coordinating + * two-phase commit between the durable store and an in-memory cache is + * out of scope, and callers asking for `withTransaction` are asking for + * atomicity, which only the durable can provide. Inside `fn`, reads and + * writes go straight through the durable's transaction handle. + * + * After `fn` resolves and the durable commits, the cache may hold stale + * rows for any keys the transaction mutated; subsequent reads through + * this wrapper repopulate the cache on miss, so callers who need the + * cache hot for those rows should issue a read-through after the + * transaction resolves. Inheriting `BaseTabularStorage`'s no-op default + * here would silently lose the rollback / atomicity guarantee, since the + * default just runs `fn(this)` against the cached wrapper itself. + */ + override async withTransaction(fn: (tx: this) => Promise): Promise { + await this.initializeCache(); + return this.durable.withTransaction( + fn as unknown as ( + tx: ITabularStorage + ) => Promise + ); + } + /** * Invalidates the cache by clearing it and resetting initialization flag */ diff --git a/packages/storage/src/tabular/ITabularStorage.ts b/packages/storage/src/tabular/ITabularStorage.ts index db27fa60f..6670434cb 100644 --- a/packages/storage/src/tabular/ITabularStorage.ts +++ b/packages/storage/src/tabular/ITabularStorage.ts @@ -175,6 +175,16 @@ export interface ITabularStorage< > { // Core methods put(value: InsertType): Promise; + /** + * Stores multiple entities in a single bulk operation. + * + * **Ordering guarantee:** the returned array is in the same order as the + * input — `result[i]` always corresponds to `values[i]`. Callers may rely on + * this to align bulk inserts with parallel arrays (e.g. chunks paired with + * embeddings). Backends are responsible for preserving the order even when + * the underlying engine does not formally guarantee it (see each backend's + * implementation). + */ putBulk(values: InsertType[]): Promise; get(key: PrimaryKey): Promise; delete(key: PrimaryKey | Entity): Promise; @@ -287,6 +297,56 @@ export interface ITabularStorage< options?: TabularSubscribeOptions ): () => void; + /** + * Runs `fn` inside a single transaction. If `fn` throws, all writes performed + * inside it are rolled back; otherwise they commit atomically. Mutation + * events (e.g. `put`) emitted inside `fn` are buffered and delivered after + * the transaction commits, so listeners never observe rows that are about + * to roll back. + * + * Backends differ in how strong the guarantee is: + * - **SQLite**: real `BEGIN` / `COMMIT` / `ROLLBACK`. + * - **PostgreSQL**: real `BEGIN` / `COMMIT` / `ROLLBACK`. On a real + * `pg.Pool` (anything exposing `connect()`) the implementation + * dedicates a client via `pool.connect()` and runs the transaction on + * that client, leaving the parent's pool free for external traffic + * in parallel. On single-connection wrappers (PGLitePool, raw PGlite) + * the transaction runs on the shared session and concurrent calls on + * the same instance are serialized behind a per-instance mutex so + * they cannot slip into the open transaction. + * - **Supabase, in-memory, file system, IndexedDB**: best-effort. The + * callback runs to completion and rejection propagates, but partial + * writes are not rolled back because the backend does not expose a + * transaction surface usable by this API. + * + * **Concurrency contract:** + * - On backends with native transaction support (SQLite, PostgreSQL), + * concurrent calls on the same storage instance are isolated from the + * open transaction: SQLite and the single-connection Postgres path + * serialize them through a per-instance mutex; the real-pool Postgres + * path runs them on independent pool clients in parallel. Either way, + * unrelated writes never accidentally commit or roll back along with + * `fn`. + * - On best-effort backends concurrent writes have no atomicity barrier + * to begin with — the contract on those backends is "runs `fn`", not + * "isolates `fn`". + * + * The `tx` handle passed to `fn` is **not** the same object as `this` for + * backends with native transaction support — it is a Proxy that routes + * writes through the transaction-bound resources (the dedicated client on + * real `pg.Pool`, the bypass-mutex internal methods on SQLite/PGlite) and + * routes events through the transaction's deferred-emit queue. Callers + * MUST use `tx` for everything inside `fn`. Capturing the outer `this` and + * calling methods on it from inside `fn` will deadlock against the held + * mutex (single-connection backends) or run on the wrong connection + * (`pg.Pool`), and is unsupported. + * + * Nested `withTransaction` calls — either via the original instance or + * via `tx` — throw rather than reusing the outer transaction implicitly. + * Use SAVEPOINT directly if you need nested rollback boundaries. + */ + withTransaction(fn: (tx: this) => Promise): Promise; + /** * Sets up the database/storage for the repository. * Must be called before using any other methods (except for in-memory implementations). @@ -300,6 +360,10 @@ export interface ITabularStorage< [Symbol.asyncDispose](): Promise; } -export type AnyTabularStorage = Omit, "queryIndex"> & { +export type AnyTabularStorage = Omit< + ITabularStorage, + "queryIndex" | "withTransaction" +> & { queryIndex(criteria: any, options: any): Promise; + withTransaction(fn: (tx: any) => Promise): Promise; }; diff --git a/packages/storage/src/tabular/TelemetryTabularStorage.ts b/packages/storage/src/tabular/TelemetryTabularStorage.ts index 108f9613b..84ea2f1b2 100644 --- a/packages/storage/src/tabular/TelemetryTabularStorage.ts +++ b/packages/storage/src/tabular/TelemetryTabularStorage.ts @@ -136,6 +136,24 @@ export class TelemetryTabularStorage< return this.inner.subscribeToChanges(callback, options); } + withTransaction(fn: (tx: this) => Promise): Promise { + // Construct a tx-bound telemetry wrapper that forwards to the inner's + // transaction handle (`innerTx`) — the proxy from the inner's + // `createTxView`. Passing the outer `this` would re-enter `inner.put()` + // through the public, mutex-acquiring path and deadlock against the held + // mutex on single-connection backends, or run on the wrong connection on + // real `pg.Pool`. + return traced("workglow.storage.tabular.withTransaction", this.storageName, () => + this.inner.withTransaction((innerTx) => { + const txWrapper = new TelemetryTabularStorage( + this.storageName, + innerTx as ITabularStorage + ) as this; + return fn(txWrapper); + }) + ); + } + setupDatabase(): Promise { return this.inner.setupDatabase(); } diff --git a/packages/storage/src/vector/IVectorStorage.ts b/packages/storage/src/vector/IVectorStorage.ts index a3598a755..97ca435ec 100644 --- a/packages/storage/src/vector/IVectorStorage.ts +++ b/packages/storage/src/vector/IVectorStorage.ts @@ -20,8 +20,12 @@ import type { TabularEventListeners, } from "../tabular/ITabularStorage"; -export type AnyVectorStorage = Omit, "queryIndex"> & { +export type AnyVectorStorage = Omit< + IVectorStorage, + "queryIndex" | "withTransaction" +> & { queryIndex(criteria: any, options: any): Promise; + withTransaction(fn: (tx: any) => Promise): Promise; }; /** diff --git a/packages/supabase/src/storage/SupabaseTabularStorage.ts b/packages/supabase/src/storage/SupabaseTabularStorage.ts index 87015a138..18189ac79 100644 --- a/packages/supabase/src/storage/SupabaseTabularStorage.ts +++ b/packages/supabase/src/storage/SupabaseTabularStorage.ts @@ -5,7 +5,7 @@ */ import type { RealtimeChannel, SupabaseClient } from "@supabase/supabase-js"; -import { createServiceToken } from "@workglow/util"; +import { createServiceToken, uuid4 } from "@workglow/util"; import { DataPortSchemaObject, FromSchema, @@ -389,48 +389,81 @@ export class SupabaseTabularStorage< * @returns The entity with any server-generated fields updated * @emits "put" event with the updated entity when successful */ - async put(entity: InsertType): Promise { - // Handle auto-generated keys - let entityToInsert = { ...entity }; + /** + * Apply the auto-generated-key policy and the optional-field-to-null + * normalization that Supabase's REST upsert expects. Shared by {@link put} + * and {@link putBulk} so a row goes over the wire identically regardless of + * whether it was inserted alone or as part of a batch. + * + * For UUID-strategy auto-gen keys we generate the UUID client-side rather + * than letting `gen_random_uuid()` fill it server-side. Otherwise the input + * row has no primary key, and {@link putBulk}'s ordering-preservation step + * (re-keying the response by PK) cannot work — `result[i]` would no longer + * be guaranteed to correspond to `values[i]`. SQLite's UUID path uses the + * same trick. + */ + private normalizeForUpsert(entity: InsertType): Record { + const entityToInsert = { ...entity } as Record; if (this.hasAutoGeneratedKey() && this.autoGeneratedKeyName) { const keyName = String(this.autoGeneratedKeyName); - const entityRecord = entity as Record; - const clientProvidedValue = entityRecord[keyName]; + const clientProvidedValue = entityToInsert[keyName]; const hasClientValue = clientProvidedValue !== undefined && clientProvidedValue !== null; - let shouldOmitKey = false; + let shouldFillKey = false; if (this.clientProvidedKeys === "never") { - // Never use client value, let database generate - shouldOmitKey = true; + shouldFillKey = true; } else if (this.clientProvidedKeys === "always") { if (!hasClientValue) { throw new Error( `Auto-generated key "${keyName}" is required when clientProvidedKeys is "always"` ); } - shouldOmitKey = false; + shouldFillKey = false; } else { - // "if-missing" - omit key if client didn't provide it - shouldOmitKey = !hasClientValue; + shouldFillKey = !hasClientValue; } - if (shouldOmitKey) { - // Omit the auto-generated key so Supabase generates it - delete (entityToInsert as Record)[keyName]; + if (shouldFillKey) { + if (this.autoGeneratedKeyStrategy === "uuid") { + // Generate client-side so the input carries a PK and the bulk + // response can be re-aligned to input order. + entityToInsert[keyName] = uuid4(); + } else { + // Autoincrement integer keys cannot be generated client-side; let + // Postgres fill via SERIAL. putBulk falls back to response order in + // this case (and Postgres preserves INSERT ... RETURNING order in + // practice). + delete entityToInsert[keyName]; + } } } // Normalize optional fields: convert undefined to null for optional fields - const normalizedEntity = { ...entityToInsert } as Record; const requiredSet = new Set(this.valueSchema.required ?? []); for (const key in this.valueSchema.properties) { - if (!(key in normalizedEntity) || normalizedEntity[key] === undefined) { + if (!(key in entityToInsert) || entityToInsert[key] === undefined) { if (!requiredSet.has(key)) { - normalizedEntity[key] = null; + entityToInsert[key] = null; } } } + + return entityToInsert; + } + + /** Hydrate a row returned by Supabase back into entity-shaped JS values. */ + private hydrateRow(row: unknown): Entity { + const entity = row as Entity; + const record = entity as Record; + for (const key in this.schema.properties) { + record[key] = this.sqlToJsValue(key, record[key] as ValueOptionType); + } + return entity; + } + + async put(entity: InsertType): Promise { + const normalizedEntity = this.normalizeForUpsert(entity); const { data, error } = await this.client .from(this.table) .upsert(normalizedEntity, { onConflict: this.primaryKeyColumnList() }) @@ -438,31 +471,96 @@ export class SupabaseTabularStorage< .single(); if (error) throw error; - const updatedEntity = data as Entity; - - // Convert all columns from SQL to JS values - const updatedRecord = updatedEntity as Record; - for (const key in this.schema.properties) { - updatedRecord[key] = this.sqlToJsValue(key, updatedRecord[key] as ValueOptionType); - } + const updatedEntity = this.hydrateRow(data); this.events.emit("put", updatedEntity); return updatedEntity; } /** - * Stores multiple rows in the database in a bulk operation. - * Uses individual put calls to ensure auto-generated keys are handled correctly. + * Stores multiple rows by issuing a single PostgREST `upsert` carrying every + * row in one request. PostgREST runs the underlying `INSERT ... ON CONFLICT` + * inside one server-side transaction, so this is both atomic and a single + * round trip — far cheaper than `Promise.all` fanning out one HTTPS request + * per row. + * + * **Ordering:** {@link normalizeForUpsert} fills UUID auto-gen keys + * client-side, so for the common UUID case every input row carries a PK + * and the response is re-aligned to input order by primary-key match. The + * only remaining case where ordering relies on Postgres's `INSERT ... + * RETURNING` row order (stable in practice but not formally contracted) is + * autoincrement-integer auto-gen keys, where the database has to assign + * the key. * - * @param entities - Array of entities to store (may be missing auto-generated keys) - * @returns Array of entities with any server-generated fields updated - * @emits "put" event for each entity stored + * `put` events are deferred until after the upsert resolves so listeners do + * not see rows from a request that ultimately failed. */ async putBulk(entities: InsertType[]): Promise { if (entities.length === 0) return []; - // Use individual put calls to ensure auto-generated keys are handled correctly - return await Promise.all(entities.map((entity) => this.put(entity))); + const normalizedEntities = entities.map((entity) => this.normalizeForUpsert(entity)); + const { data, error } = await this.client + .from(this.table) + .upsert(normalizedEntities, { onConflict: this.primaryKeyColumnList() }) + .select(); + + if (error) throw error; + if (!data) return []; + + const returnedRows = (data as unknown[]).map((row) => this.hydrateRow(row)); + + // Reorder by primary key when every input row carries its full PK. With an + // auto-generated key omitted from the input, we have no key to match on, + // so trust the response order (Postgres's INSERT ... RETURNING preserves + // VALUES order in practice). + const orderedEntities = this.alignBulkResponseToInputOrder( + normalizedEntities, + returnedRows + ); + + for (const entity of orderedEntities) { + this.events.emit("put", entity); + } + return orderedEntities; + } + + /** + * If every input row supplies its full primary key, build a PK index over + * the response and emit rows in input order. Otherwise return the response + * as-is — there is no key to match on, so we have to trust the backend. + */ + private alignBulkResponseToInputOrder( + inputs: ReadonlyArray>, + responseRows: Entity[] + ): Entity[] { + if (inputs.length !== responseRows.length) { + // Server returned a different cardinality (e.g. dedup); fall back to as-is. + return responseRows; + } + const pkColumns = this.primaryKeyColumns().map(String); + for (const input of inputs) { + for (const col of pkColumns) { + if (input[col] === undefined || input[col] === null) { + return responseRows; + } + } + } + + const fingerprint = (row: Record): string => + pkColumns.map((c) => String(row[c])).join(""); + + const responseByPk = new Map(); + for (const row of responseRows) { + responseByPk.set(fingerprint(row as unknown as Record), row); + } + + const ordered: Entity[] = []; + for (const input of inputs) { + const match = responseByPk.get(fingerprint(input)); + if (!match) return responseRows; // unexpected; fall back + ordered.push(match); + } + return ordered; } /** diff --git a/packages/task-graph/src/task-graph/TransformRegistry.ts b/packages/task-graph/src/task-graph/TransformRegistry.ts index b42f740af..372bb0886 100644 --- a/packages/task-graph/src/task-graph/TransformRegistry.ts +++ b/packages/task-graph/src/task-graph/TransformRegistry.ts @@ -26,9 +26,7 @@ export const TRANSFORM_DEFS = createServiceToken> * Registers the transform defs map default factory on the given registry. * Called by `bootstrapWorkglow` and `createOrchestrationContext`. */ -export function registerTransformDefaults( - registry: ServiceRegistry = globalServiceRegistry -): void { +export function registerTransformDefaults(registry: ServiceRegistry = globalServiceRegistry): void { registry.registerIfAbsent( TRANSFORM_DEFS, (): Map> => TransformRegistry.all, diff --git a/packages/test/src/test/storage-tabular/PostgresTabularStorage.integration.test.ts b/packages/test/src/test/storage-tabular/PostgresTabularStorage.integration.test.ts index 80fb3c70b..b87c3ff41 100644 --- a/packages/test/src/test/storage-tabular/PostgresTabularStorage.integration.test.ts +++ b/packages/test/src/test/storage-tabular/PostgresTabularStorage.integration.test.ts @@ -8,7 +8,7 @@ import { PGlite } from "@electric-sql/pglite"; import { PostgresTabularStorage } from "@workglow/postgres/storage"; import { setLogger, uuid4 } from "@workglow/util"; import type { Pool } from "pg"; -import { afterAll, describe } from "vitest"; +import { afterAll, describe, expect, it } from "vitest"; import { getTestingLogger } from "../../binding/TestingLogger"; import { AllTypesPrimaryKeyNames, @@ -82,4 +82,205 @@ describe("PostgresTabularStorage", () => { UuidPrimaryKeyNames ) ); + + describe("withTransaction", () => { + async function makeTxStorage() { + const storage = new PostgresTabularStorage< + typeof CompoundSchema, + typeof CompoundPrimaryKeyNames + >( + db, + `tx_test_${uuid4().replace(/-/g, "_")}`, + CompoundSchema, + CompoundPrimaryKeyNames + ); + await storage.setupDatabase(); + return storage; + } + + it("rolls back all writes when the callback throws", async () => { + const storage = await makeTxStorage(); + + await expect( + storage.withTransaction(async (tx) => { + await tx.put({ name: "a", type: "x", option: "v1", success: true }); + await tx.put({ name: "b", type: "x", option: "v2", success: true }); + throw new Error("boom"); + }) + ).rejects.toThrow("boom"); + + expect(await storage.get({ name: "a", type: "x" })).toBeUndefined(); + expect(await storage.get({ name: "b", type: "x" })).toBeUndefined(); + }); + + it("defers put events until COMMIT and discards them on ROLLBACK", async () => { + const storage = await makeTxStorage(); + const observed: string[] = []; + storage.on("put", (entity) => observed.push(entity.name)); + + await storage.withTransaction(async (tx) => { + await tx.put({ name: "committed", type: "x", option: "v", success: true }); + expect(observed).toEqual([]); + }); + expect(observed).toEqual(["committed"]); + + observed.length = 0; + + await expect( + storage.withTransaction(async (tx) => { + await tx.put({ name: "doomed", type: "x", option: "v", success: true }); + throw new Error("rollback"); + }) + ).rejects.toThrow("rollback"); + + expect(observed).toEqual([]); + }); + + it("supports tx.putBulk inside withTransaction without a nested BEGIN/COMMIT", async () => { + const storage = await makeTxStorage(); + + await storage.withTransaction(async (tx) => { + await tx.putBulk([ + { name: "bulk1", type: "x", option: "a", success: true }, + { name: "bulk2", type: "x", option: "b", success: true }, + ]); + }); + + expect((await storage.get({ name: "bulk1", type: "x" }))?.option).toEqual("a"); + expect((await storage.get({ name: "bulk2", type: "x" }))?.option).toEqual("b"); + }); + + it("rolls back tx.putBulk writes when the surrounding callback throws", async () => { + const storage = await makeTxStorage(); + + await expect( + storage.withTransaction(async (tx) => { + await tx.putBulk([ + { name: "drop1", type: "x", option: "a", success: true }, + { name: "drop2", type: "x", option: "b", success: true }, + ]); + throw new Error("rollback bulk"); + }) + ).rejects.toThrow("rollback bulk"); + + expect(await storage.get({ name: "drop1", type: "x" })).toBeUndefined(); + expect(await storage.get({ name: "drop2", type: "x" })).toBeUndefined(); + }); + }); + + // The "real pg.Pool" path of withTransaction acquires a dedicated client via + // `pool.connect()` and runs BEGIN/COMMIT on that client, leaving the parent's + // pool free for external traffic. To exercise this code path without spinning + // up a real Postgres in CI, we wrap PGlite with a thin adapter that exposes + // `connect()`. The adapter routes every "client" query back to the same + // PGlite session — that's not a faithful reproduction of pg.Pool's per-client + // session isolation, but it is enough to verify (a) the dedicated-client + // code path executes, (b) BEGIN/COMMIT/ROLLBACK plumbing works, and (c) the + // parent's mutex is not held during `fn`, so external callers run in + // parallel. + describe("withTransaction (dedicated-client path via fake pool)", () => { + function makeFakePool(pglite: PGlite): unknown { + return new Proxy(pglite, { + get(target, prop, receiver) { + if (prop === "connect") { + return async () => ({ + query: (pglite.query as (...args: unknown[]) => unknown).bind(pglite), + release: () => {}, + }); + } + const value = Reflect.get(target, prop, receiver); + return typeof value === "function" ? value.bind(target) : value; + }, + }); + } + + async function makePoolStorage() { + const pglite = new PGlite(); + const fakePool = makeFakePool(pglite) as Pool; + const storage = new PostgresTabularStorage< + typeof CompoundSchema, + typeof CompoundPrimaryKeyNames + >( + fakePool, + `tx_pool_test_${uuid4().replace(/-/g, "_")}`, + CompoundSchema, + CompoundPrimaryKeyNames + ); + await storage.setupDatabase(); + return { storage, pglite }; + } + + it("commits writes via the dedicated client", async () => { + const { storage, pglite } = await makePoolStorage(); + try { + await storage.withTransaction(async (tx) => { + await tx.put({ name: "p1", type: "x", option: "v1", success: true }); + await tx.put({ name: "p2", type: "x", option: "v2", success: true }); + }); + expect((await storage.get({ name: "p1", type: "x" }))?.option).toEqual("v1"); + expect((await storage.get({ name: "p2", type: "x" }))?.option).toEqual("v2"); + } finally { + await pglite.close(); + } + }); + + it("rolls back when the callback throws", async () => { + const { storage, pglite } = await makePoolStorage(); + try { + await expect( + storage.withTransaction(async (tx) => { + await tx.put({ name: "r1", type: "x", option: "v", success: true }); + throw new Error("boom"); + }) + ).rejects.toThrow("boom"); + expect(await storage.get({ name: "r1", type: "x" })).toBeUndefined(); + } finally { + await pglite.close(); + } + }); + + it("does not block external put() on the parent's mutex while fn is awaiting", async () => { + // This is the property the dedicated-client path exists to provide. The + // parent's mutex must NOT be held while `withTransaction`'s `fn` awaits, + // so external callers run in parallel. We verify it by holding `fn` open + // until an external `storage.put()` resolves: if the mutex were held, + // the external put would queue and never complete before we release. + const { storage, pglite } = await makePoolStorage(); + try { + let releaseInner!: () => void; + const inner = new Promise((r) => { + releaseInner = r; + }); + + const txP = storage.withTransaction(async (tx) => { + await tx.put({ name: "inside-tx", type: "x", option: "tx", success: true }); + await inner; + }); + + // Yield enough microtasks for `withTransaction` to enter `fn`. + await Promise.resolve(); + await Promise.resolve(); + + // External put on the same instance — would deadlock against a held + // mutex, completes quickly when no mutex is held. + let externalPutDone = false; + const externalP = storage + .put({ name: "external", type: "x", option: "ext", success: true }) + .then((r) => { + externalPutDone = true; + return r; + }); + + // Wait long enough for the external put to round-trip through PGlite. + await new Promise((r) => setTimeout(r, 50)); + expect(externalPutDone).toBe(true); + + releaseInner(); + await txP; + await externalP; + } finally { + await pglite.close(); + } + }); + }); }); diff --git a/packages/test/src/test/storage-tabular/SqliteTabularStorage.integration.test.ts b/packages/test/src/test/storage-tabular/SqliteTabularStorage.integration.test.ts index f8b2b656f..4326544f4 100644 --- a/packages/test/src/test/storage-tabular/SqliteTabularStorage.integration.test.ts +++ b/packages/test/src/test/storage-tabular/SqliteTabularStorage.integration.test.ts @@ -128,3 +128,115 @@ describe("SqliteTabularStorage.queryIndex", () => { expect(rows).toEqual([]); }); }); + +describe("SqliteTabularStorage.withTransaction", () => { + async function makeStorage() { + const storage = new SqliteTabularStorage( + ":memory:", + `tx_test_${uuid4().replace(/-/g, "_")}`, + CompoundSchema, + CompoundPrimaryKeyNames + ); + await storage.setupDatabase(); + return storage; + } + + it("rolls back all writes when the callback throws", async () => { + const storage = await makeStorage(); + + await expect( + storage.withTransaction(async (tx) => { + await tx.put({ name: "a", type: "x", option: "v1", success: true }); + await tx.put({ name: "b", type: "x", option: "v2", success: true }); + throw new Error("boom"); + }) + ).rejects.toThrow("boom"); + + expect(await storage.get({ name: "a", type: "x" })).toBeUndefined(); + expect(await storage.get({ name: "b", type: "x" })).toBeUndefined(); + }); + + it("rejects nested withTransaction calls instead of corrupting the outer transaction", async () => { + const storage = await makeStorage(); + + await expect( + storage.withTransaction(async (tx) => { + await tx.put({ name: "outer", type: "x", option: "v", success: true }); + await tx.withTransaction(async () => { + // unreachable + }); + }) + ).rejects.toThrow(/does not support nesting/); + + // Outer transaction was rolled back as a result of the nested-call error. + expect(await storage.get({ name: "outer", type: "x" })).toBeUndefined(); + }); + + it("defers put events until COMMIT and discards them on ROLLBACK", async () => { + const storage = await makeStorage(); + const observed: string[] = []; + storage.on("put", (entity) => observed.push(entity.name)); + + await storage.withTransaction(async (tx) => { + await tx.put({ name: "committed", type: "x", option: "v", success: true }); + // No events should have been observed yet + expect(observed).toEqual([]); + }); + expect(observed).toEqual(["committed"]); + + observed.length = 0; + + await expect( + storage.withTransaction(async (tx) => { + await tx.put({ name: "doomed", type: "x", option: "v", success: true }); + throw new Error("rollback"); + }) + ).rejects.toThrow("rollback"); + + // The doomed put should never have been observed by listeners. + expect(observed).toEqual([]); + }); + + it("serializes external put() calls behind a running withTransaction (rollback case)", async () => { + // The concurrency contract: while withTransaction is awaiting its callback, + // an unrelated `storage.put()` invoked from outside `fn` must NOT slip into + // the open transaction. The mutex enforces this — the external put queues + // until COMMIT/ROLLBACK releases the lock. + const storage = await makeStorage(); + + let releaseInner: () => void = () => {}; + const innerCanFinish = new Promise((resolve) => { + releaseInner = resolve; + }); + + const txPromise = storage.withTransaction(async (tx) => { + await tx.put({ name: "tx-row", type: "x", option: "tx", success: true }); + // Hold the transaction open until the external caller has been queued. + await innerCanFinish; + throw new Error("rollback"); + }); + + // Yield enough microtasks for the external put() to observe withTransaction's mutex hold. + await Promise.resolve(); + await Promise.resolve(); + + const externalPut = storage.put({ + name: "external-row", + type: "x", + option: "external", + success: true, + }); + + // Let the inner callback throw, triggering ROLLBACK. + releaseInner(); + await expect(txPromise).rejects.toThrow("rollback"); + + // External put runs *after* ROLLBACK and is its own committed write. + await externalPut; + + // tx-row was rolled back, external-row was committed independently. + expect(await storage.get({ name: "tx-row", type: "x" })).toBeUndefined(); + const ext = await storage.get({ name: "external-row", type: "x" }); + expect(ext?.option).toEqual("external"); + }); +}); diff --git a/packages/test/src/test/storage-tabular/genericTabularStorageTests.ts b/packages/test/src/test/storage-tabular/genericTabularStorageTests.ts index 37227c9b6..016bc3b32 100644 --- a/packages/test/src/test/storage-tabular/genericTabularStorageTests.ts +++ b/packages/test/src/test/storage-tabular/genericTabularStorageTests.ts @@ -226,6 +226,47 @@ export function runGenericTabularStorageTests( expect(Array.isArray(returned)).toBe(true); expect(returned.length).toEqual(0); }); + + it("should return putBulk() entities in the same order as the input", async () => { + // Inputs deliberately interleave name/type combos so any backend that + // returns rows in PK order (rather than input order) would mismatch. + const entities = [ + { name: "key3", type: "string3", option: "third", success: true }, + { name: "key1", type: "string1", option: "first", success: false }, + { name: "key2", type: "string2", option: "second", success: true }, + ]; + + const returned = await repository.putBulk(entities); + + expect(returned).toHaveLength(entities.length); + for (let i = 0; i < entities.length; i++) { + expect(returned[i].name).toEqual(entities[i].name); + expect(returned[i].type).toEqual(entities[i].type); + expect(returned[i].option).toEqual(entities[i].option); + } + }); + + describe("withTransaction", () => { + it("should commit writes performed inside a successful transaction", async () => { + await repository.withTransaction(async (tx) => { + await tx.put({ name: "tx1", type: "ok", option: "committed", success: true }); + await tx.put({ name: "tx2", type: "ok", option: "committed", success: true }); + }); + + const r1 = await repository.get({ name: "tx1", type: "ok" }); + const r2 = await repository.get({ name: "tx2", type: "ok" }); + expect(r1?.option).toEqual("committed"); + expect(r2?.option).toEqual("committed"); + }); + + it("should propagate the result of the transaction callback", async () => { + const result = await repository.withTransaction(async (tx) => { + await tx.put({ name: "tx", type: "result", option: "val", success: true }); + return 42; + }); + expect(result).toEqual(42); + }); + }); }); // Only run compound index tests if createCompoundRepository is provided @@ -2034,6 +2075,25 @@ export function runAutoGeneratedKeyTests( expect(retrieved!.title).toBe("Retrievable"); expect(retrieved!.content).toBe("Can be found"); }); + + it("should return putBulk entities with auto-generated UUIDs in input order", async () => { + // Distinct titles let us assert positional alignment without relying on PKs + // (since the PKs are server- or client-generated and unknown to the caller). + const entities = [ + { title: "first", content: "1" }, + { title: "second", content: "2" }, + { title: "third", content: "3" }, + ]; + + const results = await repository.putBulk(entities as any); + + expect(results).toHaveLength(entities.length); + for (let i = 0; i < entities.length; i++) { + expect(results[i].title).toBe(entities[i].title); + expect(results[i].content).toBe(entities[i].content); + expect(typeof results[i].id).toBe("string"); + } + }); }); }); } diff --git a/packages/util/src/media/imageHydrationResolver.ts b/packages/util/src/media/imageHydrationResolver.ts index 8b6286b8b..af1e60b58 100644 --- a/packages/util/src/media/imageHydrationResolver.ts +++ b/packages/util/src/media/imageHydrationResolver.ts @@ -34,9 +34,7 @@ async function resolveImage( * Registers the "image" input resolver on the given registry. * Called by `bootstrapWorkglow` and `createOrchestrationContext`. */ -export function registerImageDefaults( - registry: ServiceRegistry = globalServiceRegistry -): void { +export function registerImageDefaults(registry: ServiceRegistry = globalServiceRegistry): void { registerInputResolver("image", resolveImage, registry); } diff --git a/packages/util/src/telemetry/TelemetryRegistry.ts b/packages/util/src/telemetry/TelemetryRegistry.ts index 66b279234..9a3ca6cc3 100644 --- a/packages/util/src/telemetry/TelemetryRegistry.ts +++ b/packages/util/src/telemetry/TelemetryRegistry.ts @@ -38,9 +38,7 @@ function createDefaultTelemetryProvider(): ITelemetryProvider { * Registers the default telemetry provider factory on the given registry. * Called by `bootstrapWorkglow` / `createOrchestrationContext`. */ -export function registerTelemetryDefaults( - registry: ServiceRegistry = globalServiceRegistry -): void { +export function registerTelemetryDefaults(registry: ServiceRegistry = globalServiceRegistry): void { registry.registerIfAbsent(TELEMETRY_PROVIDER, createDefaultTelemetryProvider, true); }