Skip to content

Commit 9953be3

Browse files
connortsui20claude
andcommitted
[claude] chore(benchmarks-migrate): post-alpha cleanup nits (#7671)
Six small fixes left over from the v3 migration alpha. All paths relative to `benchmarks-website/migrate/` unless noted. ## Fixes - **Scale-factor canonicalization** (`src/classifier.rs::bin_compression_size`, `src/migrate.rs::migrate_file_sizes`, helper in `src/v2.rs`): both paths now route the v2 SF string through `canonical_scale_factor`, which parses to `f64` and formats with no trailing zeros. Without this, `"1"` vs `"1.0"` and `"10"` vs `"10.0"` would produce different `dataset_variant` strings and prevent the data.json.gz and file-sizes-*.json.gz rows from sharing a `measurement_id`. - **Summary counter timing** (`src/migrate.rs::run`): per-fact counters used to be set from accumulator length *before* the flush, so a flush failure would print a summary that lied. Refactored into a `flush_all` helper that bumps `summary.<fact>_inserted` from the flushed `RecordBatch::num_rows()` only after each `Appender::append_record_batch` succeeds. - **Empty-string normalization in commits** (`src/commits.rs`, `benchmarks-website/server/src/schema.rs`, `benchmarks-website/server/src/api.rs`): `message`, `author_name`/`email`, `committer_name`/`email` now bind as `Option<String>` and store SQL `NULL` when v2 supplied an empty or whitespace-only string. Schema columns made nullable; server reads use `COALESCE(c.message, '')` so the existing `String` decoder still works. - **Orphan WAL cleanup** (`src/migrate.rs::open_target_db`): the existing code already attempts `remove_if_exists` on the `.wal` regardless of whether the main file was present; pinned the behavior with a regression test that stages an orphan `.wal` (no main file) and asserts the orphan bytes don't survive `open_target_db`. - **Random-access dataset extraction** (`src/classifier.rs::bin_random_access`): 4-part records `random-access/<dataset>/<pattern>/<format>-tokio-local-disk` continue to extract `dataset/pattern` from the raw name. 2-part legacy records carry no dataset and used to render under the placeholder `"random access"`; they're now dropped to keep the v3 dataset column meaningful. - **`migrate_file_sizes` dataset fallback** (`src/migrate.rs::migrate_file_sizes`): when the matrix id stripped from `file-sizes-<id>.json.gz` isn't on the `KNOWN_FILE_SIZES_SUITES` allowlist, the fallback now emits `unknown:<id>` so the UI clearly flags it instead of presenting it as a real dataset. ## Tests Each fix has a focused regression test (`rstest` parametrization where useful): - `tests/classifier.rs::compression_size_scale_factor_canonicalizes` covering `"1"`, `"1.0"`, `"10"`, `"10.0"`, `"0.1"`, whitespace, and `""`. - `tests/classifier.rs::unmapped_records_yield_none` extended with `random_access_2_part_legacy` and `random_access_3_part`. - `migrate::tests::flush_all_does_not_overcount_on_failure` (private unit test that drops `compression_times` to force the second flush to fail and asserts only the queries counter is set). - `tests/end_to_end.rs::summary_counts_match_actual_rows_on_success` (sister invariant for the success path). - `tests/end_to_end.rs::empty_author_email_stored_as_null`. - `tests/end_to_end.rs::open_target_db_removes_orphan_wal`. - `tests/end_to_end.rs::file_sizes_unknown_id_falls_back_to_unknown_prefix` and `file_sizes_known_id_uses_id_directly`. - `tests/end_to_end.rs::compression_size_data_and_file_sizes_merge_with_canonical_sf` (cross-path SF canonicalization end to end). ## Verification - `cargo build -p vortex-bench-migrate` — clean. - `cargo test -p vortex-bench-migrate` — 7 unit + 46 classifier + 12 end-to-end tests all pass. - `cargo test -p vortex-bench-server` — 6 unit + 10 ingest + 6 web_ui tests pass; schema and `COALESCE` changes are server-safe. - `cargo clippy -p vortex-bench-migrate --all-targets` — clean. - `cargo fmt` on changed files (nightly fmt unavailable in this sandbox; ran with stable, which is a no-op for the imports-granularity options the repo's `rustfmt.toml` gates on nightly). - Skipped `./scripts/public-api.sh`: migrate is a leaf binary outside the public-api lockfile set, and the only newly `pub` item is the internal `canonical_scale_factor` helper. Signed-off-by: Claude <noreply@anthropic.com> --- _Generated by [Claude Code](https://claude.ai/code/session_012XyYJRpcGFxmJXdTJuW8Ff)_ --------- Signed-off-by: Claude <noreply@anthropic.com> Co-authored-by: Claude <noreply@anthropic.com> Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent e3f1247 commit 9953be3

9 files changed

Lines changed: 463 additions & 107 deletions

File tree

benchmarks-website/migrate/src/classifier.rs

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ pub enum V3Bin {
398398
pub fn classify(record: &V2Record) -> Option<V3Bin> {
399399
let cls = classify_v2(record)?;
400400
match &cls.group {
401-
V2Group::RandomAccess => bin_random_access(&cls, record),
401+
V2Group::RandomAccess => bin_random_access(record),
402402
V2Group::Compression => bin_compression_time(&cls, record),
403403
V2Group::CompressionSize => bin_compression_size(&cls, record),
404404
V2Group::Query { .. } => bin_query(&cls, record),
@@ -537,7 +537,16 @@ pub fn classify_outcome(record: &V2Record) -> Outcome {
537537
return Outcome::Skip(Skip::DerivedRatio);
538538
}
539539
let bin = match &cls.group {
540-
V2Group::RandomAccess => bin_random_access(&cls, record),
540+
V2Group::RandomAccess => match bin_random_access(record) {
541+
Some(b) => Some(b),
542+
// Legacy 2-part `random-access/<format>-…` records carry
543+
// no dataset and are intentionally dropped by
544+
// `bin_random_access`. Route them to Skip so the
545+
// `Outcome::Unknown` arm below — and the 5%
546+
// uncategorized gate in `migrate::run` — don't trip on
547+
// them.
548+
None => return Outcome::Skip(Skip::UnsupportedShape),
549+
},
541550
V2Group::Compression => bin_compression_time(&cls, record),
542551
V2Group::CompressionSize => bin_compression_size(&cls, record),
543552
V2Group::Query { .. } => bin_query(&cls, record),
@@ -556,34 +565,34 @@ pub fn classify_outcome(record: &V2Record) -> Outcome {
556565
Outcome::Bin(bin)
557566
}
558567

559-
fn bin_random_access(cls: &V2Classification, record: &V2Record) -> Option<V3Bin> {
560-
// v2 chart name shape: "RANDOM ACCESS" or "DATASET/PATTERN" (uppercase).
561-
// We store it as the v3 dataset value verbatim, lowercased so
562-
// `/api/groups` returns canonical lowercase names.
563-
let dataset = cls.chart.to_lowercase();
564-
if dataset.is_empty() {
565-
return None;
566-
}
567-
// Pull format from the raw, pre-rename v2 name so v3 stores the
568-
// canonical `Format::name()` string (matching what the v3 live
569-
// emitter writes). Raw shape is
568+
fn bin_random_access(record: &V2Record) -> Option<V3Bin> {
569+
// Pull dataset and format from the raw, pre-rename v2 name so v3
570+
// stores meaningful values. Raw shape is
570571
// `random-access/<dataset>/<pattern>/<format>-tokio-local-disk`
571-
// (4-part) or `random-access/<format>-tokio-local-disk` (2-part
572-
// legacy). After stripping the `-tokio-local-disk` suffix, map the
573-
// v2 random-access ext label (`vortex`, from `Format::ext()`) to
574-
// the canonical name (`vortex-file-compressed`, from
575-
// `Format::name()`). `parquet` and `lance` match between ext and
576-
// name. The `vortex` ext is shared by both `OnDiskVortex` (name
572+
// (4-part). 2-part legacy records (`random-access/<format>-…`)
573+
// carry no dataset and historically rendered as the placeholder
574+
// string "RANDOM ACCESS"; drop them rather than emit a fake
575+
// dataset. Deriving from the raw name (rather than `cls.chart`)
576+
// also keeps this independent of v2's `normalizeChartName`.
577+
//
578+
// After stripping the `-tokio-local-disk` suffix, map the v2
579+
// random-access ext label (`vortex`, from `Format::ext()`) to the
580+
// canonical name (`vortex-file-compressed`, from `Format::name()`).
581+
// `parquet` and `lance` match between ext and name. The `vortex`
582+
// ext is shared by both `OnDiskVortex` (name
577583
// `vortex-file-compressed`) and `VortexCompact` (name
578584
// `vortex-compact`), but v2's random-access bench only emitted
579585
// `OnDiskVortex`, so mapping to `vortex-file-compressed` is
580586
// correct for all historical data.
581587
let parts: Vec<&str> = record.name.split('/').collect();
582-
let raw = match parts.len() {
583-
4 => parts[3],
584-
2 => parts[1],
585-
_ => return None,
586-
};
588+
if parts.len() != 4 {
589+
return None;
590+
}
591+
if parts[1].is_empty() || parts[2].is_empty() {
592+
return None;
593+
}
594+
let dataset = format!("{}/{}", parts[1], parts[2]).to_lowercase();
595+
let raw = parts[3];
587596
if raw.is_empty() || raw == "default" {
588597
return None;
589598
}
@@ -668,15 +677,20 @@ fn bin_compression_size(cls: &V2Classification, record: &V2Record) -> Option<V3B
668677
}
669678
// Mirror the file-sizes ingest path's dataset_variant derivation
670679
// (see `migrate::migrate_file_sizes`): pull the SF out of the v2
671-
// record's `dataset` object when present, drop empty / "1.0".
672-
// Without this both code paths produce the same `mid` only by
673-
// accident, so SF=10 file-sizes rows wouldn't merge with the
674-
// matching data.json.gz "vortex size/tpch" rows.
675-
let dataset_variant = record
676-
.dataset
677-
.as_ref()
678-
.and_then(|d| crate::v2::dataset_scale_factor(d, dataset.as_str()))
679-
.filter(|s| !s.is_empty() && s.as_str() != "1.0");
680+
// record's `dataset` object when present and run it through
681+
// `canonical_scale_factor` so `"1"`, `"1.0"`, `"10"` and `"10.0"`
682+
// collapse to one canonical form. Without this both code paths
683+
// produce the same `mid` only by accident, so SF=10 file-sizes
684+
// rows wouldn't merge with the matching data.json.gz
685+
// "vortex size/tpch" rows when one side wrote `"10"` and the
686+
// other wrote `"10.0"`.
687+
let dataset_variant = crate::v2::canonical_scale_factor(
688+
record
689+
.dataset
690+
.as_ref()
691+
.and_then(|d| crate::v2::dataset_scale_factor(d, dataset.as_str()))
692+
.as_deref(),
693+
);
680694
Some(V3Bin::CompressionSize {
681695
dataset,
682696
dataset_variant,

benchmarks-website/migrate/src/commits.rs

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,20 @@ use duckdb::params;
1111

1212
use crate::v2::V2Commit;
1313

14-
/// Insert a v3 `commits` row for one v2 commit. Missing fields are
15-
/// filled with the empty string, matching the v3 schema's `NOT NULL`
16-
/// constraints; the call site logs a warning for each fallback so
17-
/// the operator can spot bad inputs.
14+
/// Insert a v3 `commits` row for one v2 commit. `tree_sha` and `url`
15+
/// remain required and use a warning-bearing empty-string fallback;
16+
/// the human-input fields (message, author/committer name and email)
17+
/// are nullable in the v3 schema, so empty / missing values map to
18+
/// SQL `NULL` instead of an empty string the UI would render as a
19+
/// blank cell.
1820
pub fn upsert_commit(tx: &Transaction<'_>, commit: &V2Commit) -> Result<UpsertOutcome> {
1921
let mut warnings = Vec::new();
2022
let timestamp = require_field(&commit.timestamp, "timestamp", &commit.id, &mut warnings);
21-
let message = require_field(&commit.message, "message", &commit.id, &mut warnings);
22-
let author_name = require_field(
23-
&commit.author.as_ref().and_then(|p| p.name.clone()),
24-
"author.name",
25-
&commit.id,
26-
&mut warnings,
27-
);
28-
let author_email = require_field(
29-
&commit.author.as_ref().and_then(|p| p.email.clone()),
30-
"author.email",
31-
&commit.id,
32-
&mut warnings,
33-
);
34-
let committer_name = require_field(
35-
&commit.committer.as_ref().and_then(|p| p.name.clone()),
36-
"committer.name",
37-
&commit.id,
38-
&mut warnings,
39-
);
40-
let committer_email = require_field(
41-
&commit.committer.as_ref().and_then(|p| p.email.clone()),
42-
"committer.email",
43-
&commit.id,
44-
&mut warnings,
45-
);
23+
let message = optional_field(&commit.message);
24+
let author_name = optional_field(&commit.author.as_ref().and_then(|p| p.name.clone()));
25+
let author_email = optional_field(&commit.author.as_ref().and_then(|p| p.email.clone()));
26+
let committer_name = optional_field(&commit.committer.as_ref().and_then(|p| p.name.clone()));
27+
let committer_email = optional_field(&commit.committer.as_ref().and_then(|p| p.email.clone()));
4628
let tree_sha = require_field(&commit.tree_id, "tree_id", &commit.id, &mut warnings);
4729
let url = require_field(&commit.url, "url", &commit.id, &mut warnings);
4830

@@ -93,6 +75,19 @@ fn require_field(
9375
}
9476
}
9577

78+
/// Coerce a v2-supplied `Option<String>` into a SQL-bindable
79+
/// `Option<String>`, treating an empty / whitespace-only value as
80+
/// missing. v2 sometimes wrote `""` for blank author / committer /
81+
/// message fields; storing those as actual `NULL` lets the UI
82+
/// distinguish "missing metadata" from "deliberately blank".
83+
fn optional_field(field: &Option<String>) -> Option<String> {
84+
field
85+
.as_deref()
86+
.map(str::trim)
87+
.filter(|s| !s.is_empty())
88+
.map(str::to_string)
89+
}
90+
9691
/// Per-call warning bag returned to the caller for logging.
9792
#[derive(Debug, Default)]
9893
pub struct UpsertOutcome {

benchmarks-website/migrate/src/migrate.rs

Lines changed: 126 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@ use vortex_utils::aliases::hash_map::HashMap;
4949
use crate::classifier;
5050
use crate::classifier::V3Bin;
5151
use crate::commits::upsert_commit;
52+
use crate::source::KNOWN_FILE_SIZES_SUITES;
5253
use crate::source::Source;
5354
use crate::v2::V2Commit;
5455
use crate::v2::V2FileSize;
5556
use crate::v2::V2Record;
57+
use crate::v2::canonical_scale_factor;
5658
use crate::v2::index_commits;
5759
use crate::v2::runtime_as_i64;
5860
use crate::v2::value_as_f64;
@@ -171,27 +173,46 @@ pub fn run(source: &Source, target: &Path) -> Result<MigrationSummary> {
171173
}
172174

173175
info!("Flushing accumulators to DuckDB");
174-
summary.query_inserted = q.measurement_id.len() as u64;
175-
summary.compression_time_inserted = ct.measurement_id.len() as u64;
176-
summary.random_access_inserted = ra.measurement_id.len() as u64;
177-
summary.compression_size_inserted = cs.rows.len() as u64;
178-
179-
flush(&conn, "query_measurements", build_query_batch(q)?)?;
180-
flush(
181-
&conn,
182-
"compression_times",
183-
build_compression_time_batch(ct)?,
184-
)?;
185-
flush(&conn, "random_access_times", build_random_access_batch(ra)?)?;
186-
flush(
187-
&conn,
188-
"compression_sizes",
189-
build_compression_size_batch(cs)?,
190-
)?;
176+
flush_all(&conn, q, ct, ra, cs, &mut summary)?;
191177

192178
Ok(summary)
193179
}
194180

181+
/// Flush each accumulator's batch and bump the matching per-fact
182+
/// summary counter only AFTER the flush succeeds. This way a flush
183+
/// failure leaves the counter at zero (or its previous value) rather
184+
/// than reporting rows that never landed in DuckDB.
185+
fn flush_all(
186+
conn: &Connection,
187+
q: QueryAccum,
188+
ct: CompressionTimeAccum,
189+
ra: RandomAccessAccum,
190+
cs: CompressionSizeAccum,
191+
summary: &mut MigrationSummary,
192+
) -> Result<()> {
193+
let batch = build_query_batch(q)?;
194+
let n = batch.num_rows() as u64;
195+
flush(conn, "query_measurements", batch)?;
196+
summary.query_inserted = n;
197+
198+
let batch = build_compression_time_batch(ct)?;
199+
let n = batch.num_rows() as u64;
200+
flush(conn, "compression_times", batch)?;
201+
summary.compression_time_inserted = n;
202+
203+
let batch = build_random_access_batch(ra)?;
204+
let n = batch.num_rows() as u64;
205+
flush(conn, "random_access_times", batch)?;
206+
summary.random_access_inserted = n;
207+
208+
let batch = build_compression_size_batch(cs)?;
209+
let n = batch.num_rows() as u64;
210+
flush(conn, "compression_sizes", batch)?;
211+
summary.compression_size_inserted = n;
212+
213+
Ok(())
214+
}
215+
195216
fn read_commits(source: &Source) -> Result<BTreeMap<String, V2Commit>> {
196217
let reader = source.open_commits_jsonl()?;
197218
let mut commits: Vec<V2Commit> = Vec::new();
@@ -409,11 +430,19 @@ fn migrate_file_sizes(
409430
cs: &mut CompressionSizeAccum,
410431
) -> Result<()> {
411432
let reader = source.open_file_sizes(name)?;
412-
let dataset_fallback = name
413-
.strip_prefix("file-sizes-")
414-
.and_then(|s| s.strip_suffix(".json.gz"))
415-
.unwrap_or(name)
416-
.to_string();
433+
// Prefix unknown-id fallbacks with `unknown:` so they're clearly
434+
// labeled in the UI rather than masquerading as a dataset name.
435+
let dataset_fallback = {
436+
let stripped = name
437+
.strip_prefix("file-sizes-")
438+
.and_then(|s| s.strip_suffix(".json.gz"))
439+
.unwrap_or(name);
440+
if KNOWN_FILE_SIZES_SUITES.contains(&stripped) {
441+
stripped.to_string()
442+
} else {
443+
format!("unknown:{stripped}")
444+
}
445+
};
417446
let started = Instant::now();
418447
let mut last_log = Instant::now();
419448
for line in reader.lines() {
@@ -438,11 +467,10 @@ fn migrate_file_sizes(
438467
} else {
439468
sz.benchmark.clone()
440469
};
441-
let dataset_variant = sz
442-
.scale_factor
443-
.as_ref()
444-
.filter(|s| !s.is_empty() && s.as_str() != "1.0")
445-
.cloned();
470+
// Run SF through canonical_scale_factor so `"1"`, `"1.0"`, `"10"`
471+
// and `"10.0"` collapse to one form, matching what
472+
// `bin_compression_size` writes for the data.json.gz path.
473+
let dataset_variant = canonical_scale_factor(sz.scale_factor.as_deref());
446474
let csr = CompressionSize {
447475
commit_sha: sz.commit_id.clone(),
448476
dataset,
@@ -834,3 +862,74 @@ impl std::fmt::Display for MigrationSummary {
834862
Ok(())
835863
}
836864
}
865+
866+
#[cfg(test)]
867+
mod tests {
868+
use vortex_bench_server::records::QueryMeasurement;
869+
870+
use super::*;
871+
872+
fn open_db_without(table: &str) -> (tempfile::TempDir, Connection) {
873+
let dir = tempfile::TempDir::new().unwrap();
874+
let path = dir.path().join("v3.duckdb");
875+
let conn = open_target_db(&path).unwrap();
876+
conn.execute_batch(&format!("DROP TABLE {table}")).unwrap();
877+
(dir, conn)
878+
}
879+
880+
fn one_query_row() -> QueryMeasurement {
881+
QueryMeasurement {
882+
commit_sha: "deadbeef".into(),
883+
dataset: "clickbench".into(),
884+
dataset_variant: None,
885+
scale_factor: None,
886+
query_idx: 7,
887+
storage: "nvme".into(),
888+
engine: "datafusion".into(),
889+
format: "parquet".into(),
890+
value_ns: 100,
891+
all_runtimes_ns: vec![100],
892+
peak_physical: None,
893+
peak_virtual: None,
894+
physical_delta: None,
895+
virtual_delta: None,
896+
env_triple: None,
897+
}
898+
}
899+
900+
#[test]
901+
fn flush_all_does_not_overcount_on_failure() {
902+
// Drop `compression_times` before flushing so the second
903+
// flush in `flush_all` fails. The first (queries) succeeded,
904+
// so its counter must be set; the failed table's counter and
905+
// every later table's counter must stay at zero.
906+
let (_dir, conn) = open_db_without("compression_times");
907+
908+
let mut summary = MigrationSummary::default();
909+
let mut q = QueryAccum::default();
910+
let qm = one_query_row();
911+
let mid = vortex_bench_server::db::measurement_id_query(&qm);
912+
q.push(mid, qm, &mut summary);
913+
914+
let ct = CompressionTimeAccum::default();
915+
let ra = RandomAccessAccum::default();
916+
let cs = CompressionSizeAccum::default();
917+
918+
let result = flush_all(&conn, q, ct, ra, cs, &mut summary);
919+
assert!(result.is_err(), "expected flush to fail on missing table");
920+
921+
assert_eq!(
922+
summary.query_inserted, 1,
923+
"query flushed before the failure must be counted"
924+
);
925+
assert_eq!(
926+
summary.compression_time_inserted, 0,
927+
"failed flush must not bump the counter"
928+
);
929+
assert_eq!(summary.random_access_inserted, 0, "later flushes never ran");
930+
assert_eq!(
931+
summary.compression_size_inserted, 0,
932+
"later flushes never ran"
933+
);
934+
}
935+
}

benchmarks-website/migrate/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ fn open_s3(name: &str) -> Result<Box<dyn Read + Send>> {
126126
/// The post-bench `file-sizes` step uploads `file-sizes-${{ matrix.id
127127
/// }}.json.gz`, so this list must match those IDs verbatim. Adding a
128128
/// new matrix entry to that workflow means adding the same ID here.
129-
const KNOWN_FILE_SIZES_SUITES: &[&str] = &[
129+
pub(crate) const KNOWN_FILE_SIZES_SUITES: &[&str] = &[
130130
"clickbench-nvme",
131131
"tpch-nvme",
132132
"tpch-s3",

0 commit comments

Comments
 (0)