Skip to content

Reap stale observations & serialize concurrent writes per CIK#175

Merged
sroussey merged 12 commits into
mainfrom
claude/elegant-edison-344cc7
Jun 30, 2026
Merged

Reap stale observations & serialize concurrent writes per CIK#175
sroussey merged 12 commits into
mainfrom
claude/elegant-edison-344cc7

Conversation

@sroussey

Copy link
Copy Markdown
Contributor

Summary

This PR implements two critical data-integrity features for concurrent filing processing:

  1. Stale observation reaping: After S-1 extraction completes, delete observations from prior runs that were not re-observed in the current run (orphaned by section failures or reclassification). A "reap gate" blocks the reaper if any section failed, preventing silent data loss.

  2. Per-CIK write serialization: Add a keyed mutex to SpacReportWriter and temporal repos (CrowdfundingTemporalRepo, RegAOfferingRepo, InvestmentOfferingRepo, PortalRepo) to serialize read-derive-write cycles on the same CIK, preventing lost updates and history-chain forks when concurrent filings are processed.

Key Changes

  • New reaping infrastructure (src/resolver/reapStaleObservations.ts, .test.ts):

    • reapStaleObservations() deletes observations with created_at < run_start for a filing+extractor pair
    • hasBlockingSectionFailure() gates the reaper: blocks if any section recorded a zero-row failure code (MODEL_INVALID_OUTPUT, MODEL_EMPTY, LOW_CONFIDENCE_ALL, UNVERIFIED_SOURCE_SPAN)
    • Integrated into ProcessAccessionDocFormTask post-extraction
  • New keyed-mutex utility (src/util/KeyedMutex.ts, .test.ts):

    • Per-key AsyncMutex with refcounting and auto-eviction at zero refs
    • Enables bounded concurrent processing of different keys while serializing same-key access
  • Temporal repo locking:

    • CrowdfundingTemporalRepo, RegAOfferingRepo, InvestmentOfferingRepo, PortalRepo now wrap read-guard-write cycles in KeyedMutex.lock()
    • Shared isStaleByAsOf() guard prevents out-of-order filing dates from regressing mutable "current" rows
  • SpacReportWriter serialization (src/storage/spac/SpacReportWriter.ts):

    • Module-scoped cikWriteMutexes map serializes all record* methods per CIK
    • Prevents interleaved read-derive-write cycles that could lose updates or fork history
  • Observation idempotency (src/resolver/EntityObserver.ts):

    • observePerson() and observeCompany() now call removePriorPersonJunctions() / removePriorCompanyJunctions() before re-observing
    • Ensures address/phone co-occurrence counts stay stable on replay (not inflated)
  • Upsert-by-natural-key robustness (PersonObservationRepo, CompanyObservationRepo):

    • Catch UNIQUE constraint errors and retry with a fresh insert (handles race where prior observation was deleted between query and update)
  • New test coverage:

    • ProcessAccessionDocFormTask.reapgate.test.ts: comprehensive reaping scenarios (full prospectus, management-only, transient failures, blocking dead-letters)
    • BackfillMergerProxiesTask.test.ts: backfill of merger proxies ingested before their SPAC S-1
    • SpacReportWriter.test.ts: concurrent same-CIK writes serialize correctly
    • CrowdfundingTemporalRepo.test.ts: same-millisecond stale snapshots don't collide
    • RegAOfferingRepo.test.ts, InvestmentOfferingRepo.test.ts, PortalRepo.test.ts: saveOfferingAsOf() / saveInvestmentOfferingAsOf() / savePortalAsOf() guard against stale writes
  • CLI & utility improvements:

    • runCommand() wrapper for error handling in query/resolve commands
    • parseDate() extended to handle compact `yyyyMMdd

https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR

claude added 12 commits June 24, 2026 22:34
…rite race

db reset: resetAllDatabases() truncated only 45 of 75 registered repositories,
silently leaving 30 tables (all SPAC, XBRL, observation provenance, section16,
form144, offering, dead-letter, family-tier, etc.) fully populated after a
"reset" — producing a corrupt mix of stale derived rows whose referenced
canonical entities had been truncated. Add the 30 missing deleteAll() calls and
a source-parity test (resetAllDatabases.test.ts) that fails if a DefaultDI repo
token is ever added without a matching reset, mirroring form-wiring.test.ts.

SPAC write race: every SpacReportWriter.record* method is an unsynchronised
read-derive-write over a CIK's spac/spac_deal/spac_history rows, but the form
tasks map over filings with concurrencyLimit > 1, so two same-CIK filings
interleave their recompute+rebuild+snapshot cycles — lost-updating the derived
row and forking the history chain (two open rows). Add a module-scoped,
refcounted per-CIK AsyncMutex (writers are constructed fresh per call, so an
instance field would not serialise across them) wrapping all five record*
methods, mirroring the resolver per-key mutex. Adds a regression test that
asserts the critical sections never overlap.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
…t race

The observation natural key (accession_number, extractor_id, observation_index)
was registered as a NON-unique index, so the schema's "is UNIQUE" docstring was
a lie at the storage layer and upsertByNaturalKey (query-then-insert, no mutex)
let two concurrent inserts for one logical mention both write a row.

- Move the natural-key tuple from indexes to uniqueIndexes in DefaultDI and
  TestingDI (both observation tables), matching the canonical-tier wiring.
- upsertByNaturalKey now recovers from a lost insert race: on a UNIQUE violation
  it re-reads the winning row and merges onto it, so both callers converge on
  one observation (mirrors the resolver's create-then-requery pattern).
- Relocate isUniqueConstraintError from resolver/ to util/ (it is a pure,
  backend-agnostic helper) so storage can use it without a storage->resolver
  backward dependency; update the 6 resolver imports.
- Test storage now mirrors the unique index and a new test pins the
  same-natural-key concurrent-insert convergence (fails under the old config).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
- parseDate: the yyyyMMdd branch scrambled fields (e.g. "20230415" -> year 15);
  fix the regex + route it through the year-first branch, correct the dd-MM-yyyy
  comment to match the actual MM-dd-yyyy behavior, and add the first tests.
- Form 144: wire in the dead `firstOf` guard at the two access sites so a
  repeated securitiesInformation/broker element can't null the proposed-sale
  block.
- SecFetchJob: a per-attempt timeout was classified non-retriable ("timed out"
  didn't match the "timeout" pattern), so stalled fetches failed on the first
  timeout; key retry off the timeout controller directly and broaden the
  message pattern. Also give combineSignals a cleanup() so its fallback path
  stops leaking an abort listener per attempt onto the long-lived signal.
- sectionRunner: a "<section>-partial" dead letter was never resolvable and
  lingered on the version-gated retry worklist forever; reconcile it (record on
  drop, resolve on a clean run).
- spacRollup: stop summing redemption-typed events on top of the deal column
  (the deal column is the sole source) — closes a latent double-count.
- spacDealGrouping: same-filing_date merger/redemption ties now resolve
  deterministically (form precedence then accession) instead of by the backend's
  unspecified row order.
- RetryDeadLettersTask: isolate each accession so one hard parse/store error no
  longer abandons the rest of the sweep; report a failed count.
- fetchAndStoreSubmission: rethrow TaskAbortedError instead of recording the
  interrupted CIK as a failure (mirrors the facts sibling).
- bootstrap skip-set: only treat successfully-processed CIKs as done so a
  transient failure is retried on the next non-force bootstrap.
- StoreCikLastUpdatedTask: progress is over rows, not a batch counter divided by
  the row count (the bar never advanced).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
- query: wrap every action in runCommand so a bad --format or repo failure
  renders as a clean "x <message>" with exit code 1 instead of an uncaught
  stack-trace dump (verified end to end).
- resolve/canonical/sponsor-family/underwriter-family: stop calling
  process.exit(1) mid-command — it bypassed the sec.ts finally that stops the
  fetch queue and closes the DB/PG pool. resolve now runs under runCommand and
  throws; the others set process.exitCode + return. Also give `sec resolve
  --kind person --all` the same per-observation try/catch the company branch
  has, so one bad row can't abort the batch.
- version coverage resolver: throw instead of process.exit, and honor
  --format json (the resolver branch always printed human text).
- init: the first-run wizard pointed at non-existent `sec bootstrap cik` /
  `bootstrap index`; point at the real `bootstrap download ciks` /
  `bootstrap ingest cik-names`.
- GlobalOptions: drop --json / --verbose / --no-color, which were parsed and
  advertised in --help but never consumed (only --dry-run is wired); update the
  tests that pinned the dead flags.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
… Crowdfunding history PK

Form D mutable-current row: the InvestmentOffering row (keyed cik+file_number)
had no out-of-order guard and processFormD dropped filing_date entirely, so a
back-catalog older D / D-A overwrote the current row with stale industry_group /
security-type flags / date_of_first_sale — the regression the temporal contract
forbids and that every sibling mutable-current repo (RegA/Portal/Crowdfunding)
already guards. Add an `as_of` column, thread filing_date through processFormD ->
processOffering, and skip the mutable write when the incoming filing is older
than the row's anchor (an undated filing is treated as stale against a dated
row). The append-only history is always written. Each Form D fully restates the
offering, so no field-merge is needed — only the date guard. New regression test.

Crowdfunding history PK: history rows were keyed (cik, valid_from) where
valid_from = new Date().toISOString() (ms resolution), so two snapshots for one
CIK landing in the same millisecond — realistic under a batch re-process that
writes a closed-immediately snapshot per filing — collided and the second
silently overwrote the first, dropping a history version. Add the source
`accession_number` to the row and the PK (meaningful provenance + a guaranteed
cross-filing disambiguator), threaded via saveCrowdfundingWithHistory's options;
Form C passes it. New regression test (fails under the old 2-tuple PK).

No DB migration needed (no persistent database yet).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
…potent junctions

Observation rows are upserted by positional (accession, extractor_id,
observation_index), so a re-extraction that yields FEWER entities — or
reclassifies one company->person (a different table at the same index) —
overwrites the live rows but never deletes the now-stale high-index / wrong-kind
rows. Those orphans stay joined to canonical entities through their identity
links, manufacturing entities the filing no longer describes (and a later
`sec resolve` would re-link them). This violated the documented "replays are
idempotent and order-safe" invariant for the observation/canonical tier.

- ProcessAccessionDocFormTask captures runStart before extraction and, after a
  successful run, calls reapStaleObservations(accession, extractor_id, runStart):
  any observation for that filing+extractor not refreshed this run (created_at <
  runStart) is removed along with its identity links (all resolver versions),
  provenance, and — via each link — its address/phone junction contributions.
  Re-observed rows keep their stable observation_id, so the beneficial-ownership
  / related-party / spac_merger_extraction FKs that reference it stay valid.
  Centralized in the dispatcher, so no form-storage module changes.
- EntityObserver now decrements an observation's prior junction contribution
  before re-recording, so a replay nets out to the same co-occurrence count
  instead of a blind +1 (the entangled junction-idempotency Major).
- New repo primitives: junction removeObservation (decrement/delete); identity
  link listForObservation + deleteForObservation; provenance deleteForObservation;
  observation listByAccessionAndExtractor + deleteByObservationId.
- Tests: a smaller re-extraction leaves no orphan observations/links/junctions;
  a company->person reclassification removes the stale-kind orphan; junction
  counts stay at 1 across repeated replays. All verified to fail when the
  respective fix is neutered.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
processMergerProxy gates on a known SPAC and silently no-ops when the `spac` row
does not yet exist (e.g. a DEFM14A ingested before its S-1). The dispatcher then
records a success:true run, so the normal unprocessed-run sweep excludes the
filing forever — the merger target / PIPE for that deal is dropped, and unlike
the analogous redemption gate there was no recovery path.

Add BackfillMergerProxiesTask + `sec spac backfill-merger-proxies`, mirroring
BackfillRedemptionsTask: enumerate known-SPAC merger proxies (the 14A/14C merger
+ revised-proxy forms, derived from FORM_TO_EXTRACTOR_ID) from the bootstrapped
filing metadata and re-process those that still lack a spac_merger_extraction
row, so they extract now that a spac row exists. Skipping already-extracted
proxies avoids redundant AI re-runs. Per-filing failures are isolated. Adds a
selection test and documents the command in CLAUDE.md.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
…ling's facts

XbrlFactRepo.replaceForAccession did deleteSearch then putBulk with no
transaction, so a putBulk rejection (e.g. a schema/maxLength overflow on one row
aborting the batch) after the delete left the filing with ZERO facts — and the
"never throws" caller (extractAndStoreXbrl) swallowed it as a NO_XBRL success,
masking the loss.

Reorder to write-first: putBulk the new rows (upsert by the (accession_number,
fact_index) PK), THEN delete only the rows a prior longer extract left whose
fact_index the new set omits. A putBulk failure now throws before any delete,
leaving the prior facts intact; the storage abstraction exposes no transaction,
so the worst residual case (a failure during the stale-tail delete) is a
superset the next re-extract cleans up — never data loss. The existing
stale-row-clearing test still holds; a new test pins that a putBulk rejection
keeps the prior facts (it fails under the old delete-first order with count 0).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
…ed this run

reapStaleObservations runs in ProcessAccessionDocFormTask's success branch and
deletes any observation for (accession, extractor) not refreshed this run. But
sectionRunner swallows a section's zero-row outcomes into a dead-letter WITHOUT
aborting the filing, so the processor still returns success. A section that
transiently fails on a re-extraction (network blip / rate limit / empty or
truncated model response) writes nothing, and the reaper then deletes that
section's still-valid prior observations (plus identity links and junction
counts) as false orphans — silent data loss.

Gate the reap on a new hasBlockingSectionFailure() policy: skip it whenever a
section recorded a fresh dead-letter THIS run for any reason that means it
yielded no rows (MODEL_INVALID_OUTPUT, MODEL_EMPTY, LOW_CONFIDENCE_ALL, a
whole-section UNVERIFIED_SOURCE_SPAN). A genuinely-absent section
(SECTION_NOT_FOUND) and a partial success (`-partial` marker) do not block, so
the reaper still removes real orphans on a clean run. If the dead-letter check
itself errors, default to NOT reaping — retaining a stale superset is
recoverable; deleting valid rows is not.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
…bort handling

Three isolated correctness fixes from the review pass:

- parseDate accepted any digit-count match, so "20251301" (month 13) or
  "2025-12-32" parsed to a bogus { month: "13" } / { day: "32" } and corrupted
  date ordering / as_of guards downstream. Reject out-of-range months (1-12)
  and days (1-31).

- mergerFormRank gave PRER (preliminary-revised) the same rank as DEFR, so a
  same-date PRER tiebreak outranked the definitive DEFM. Rank by the actual
  filing lifecycle PREM < PRER < DEFM < DEFR so a definitive proxy supersedes a
  preliminary-revised one (only a definitive-revised DEFR outranks DEFM).

- BackfillMergerProxiesTask / BackfillRedemptionsTask swallowed every per-filing
  error in their sweep, including TaskAbortedError — so a cooperative
  cancellation kept grinding through the whole worklist. Check context.signal
  before each filing and rethrow TaskAbortedError, mirroring RetryDeadLettersTask.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
…ile_number)

The as_of staleness guard in Form_D.storage did an unsynchronized
get -> compare -> save of the mutable current offering row. The form tasks map
over a CIK's filings with concurrencyLimit 5/10, so a D and D/A for the same
(cik, file_number) processed concurrently could both read the same prior row,
both decide they were not stale, and let the older filing's write land last —
regressing the current row (the same lost-update class the SPAC writer fixed
with a per-CIK lock).

Move the read-guard-write into InvestmentOfferingRepo.saveInvestmentOfferingAsOf
and run it inside a per-(cik, file_number) lock. Introduce a reusable KeyedMutex
primitive (generalizing the hand-rolled withCikLock / resolver per-key mutex)
rather than copy the pattern a third time.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
Form D was the only mutable "current" row whose out-of-order as_of guard ran
inside a per-key lock. Its siblings — RegAOffering (1-A / 1-K / 1-Z), Portal
(CFPORTAL) and Crowdfunding (Form C family) — did the same get -> compare ->
build -> save unsynchronized. Because the form tasks map over a CIK's filings
with concurrencyLimit 5/10, two filings for the same key processed concurrently
could both read the same prior row, both decide they were not stale, and let the
older filing's write land last — regressing the current row (and, for CFPORTAL,
potentially resurrecting a withdrawn portal).

Bring them all into alignment:

- Extract the shared staleness predicate into util/asOfGuard.ts (isStaleByAsOf),
  the single source of the "" / null / undated-incoming semantics.
- Add atomic, per-key guarded-write methods that read-merge-write inside a
  KeyedMutex, taking a builder that receives the row read inside the lock so the
  per-form field merges (1-K/1-Z carry no tier/SIC; C-AR carries no portal CIK)
  stay correct:
    - InvestmentOfferingRepo.saveInvestmentOfferingAsOf (refactored to the
      builder shape + shared predicate)
    - RegAOfferingRepo.saveOfferingAsOf
    - PortalRepo.savePortalAsOf
    - CrowdfundingTemporalRepo.saveCurrentByFilingDate (filing_date is its
      as-of marker; still always writes the history snapshot via skipMutableUpdate)
- Rewire Form_D / Form_1_A / Form_1_K / Form_1_Z / Form_CFPORTAL / Form_C to the
  new methods; behavior is otherwise unchanged.

Adds repo-level concurrency tests (newer filing wins regardless of submission
order) and a unit test for the shared predicate. Also strips a stray NUL byte
that a prior commit left in an InvestmentOfferingRepo comment.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018GW5p9FW5RPHXPjRwvwywR
@sroussey sroussey self-assigned this Jun 30, 2026
@sroussey sroussey merged commit 7a0b20b into main Jun 30, 2026
1 check passed
sroussey pushed a commit that referenced this pull request Jul 2, 2026
Correctness:
- parseDate: restore the literal year via setUTCFullYear before the calendar
  probe. Date.UTC remaps years 0-99 to 1900-1999, so a valid 4-digit year like
  '0099-01-01' was wrongly rejected as an invalid calendar date. Regression test
  added; Feb-30-style rollover detection is unaffected.

Resilience regression from #178's stricter parseDate throw:
- FetchQuarterlyIndexTask / FetchQuarterlyFormIdxTask: wrap the per-row
  secDate() in try/catch so a single calendar-invalid EDGAR 'Date Filed' row
  is skipped+warned instead of aborting the whole quarter's batch (parseDate
  now throws where it previously rolled forward).

Cleanup:
- XbrlFactRepo.clearForAccession delegates to replaceForAccession(acc, [],
  { intentionalClear: true }), removing a byte-identical duplicate delete loop.

Skipped (documented in review): the 4-way junction-repo copy-paste (would need
a shared CanonicalJunctionRepo base — larger refactor beyond this diff); the
CLI --date throw (operator fail-fast is acceptable); wiring clearForAccession
into xbrlEnrichment (pre-existing behavior #177 deliberately left unchanged);
and 5 concurrency observations that are pre-existing #175 reap-path limitations
or documented single-process scope, not wave-2 regressions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants