Skip to content

Commit 46a3b2a

Browse files
committed
Fix custom-labels for DuckDB
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 91a6b57 commit 46a3b2a

4 files changed

Lines changed: 51 additions & 25 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/duckdb-bench/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ publish = false
1717
[dependencies]
1818
anyhow = { workspace = true }
1919
clap = { workspace = true, features = ["derive"] }
20+
custom-labels = { workspace = true }
2021
similar = { workspace = true }
2122
tokio = { workspace = true }
2223
tracing = { workspace = true }

benchmarks/duckdb-bench/src/main.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::path::PathBuf;
77

88
use clap::Parser;
99
use clap::value_parser;
10+
use custom_labels::Labelset;
1011
use duckdb_bench::DuckClient;
1112
use tokio::runtime::Runtime;
1213
use vortex::metrics::tracing::set_global_labels;
@@ -25,6 +26,27 @@ use vortex_bench::runner::SqlBenchmarkRunner;
2526
use vortex_bench::runner::filter_queries;
2627
use vortex_bench::setup_logging_and_tracing;
2728

29+
fn with_query_labels<T>(
30+
benchmark_name: &str,
31+
query_idx: usize,
32+
format: Format,
33+
f: impl FnOnce() -> T,
34+
) -> T {
35+
let labels = vec![
36+
("format", format.to_string()),
37+
("benchmark_name", benchmark_name.to_owned()),
38+
("query_idx", query_idx.to_string()),
39+
];
40+
set_global_labels(labels.clone());
41+
42+
let mut labelset = Labelset::clone_from_current();
43+
for (key, value) in labels {
44+
labelset.set(key, value);
45+
}
46+
47+
labelset.enter(f)
48+
}
49+
2850
/// Common arguments shared across benchmarks
2951
#[derive(Parser)]
3052
struct Args {
@@ -171,17 +193,13 @@ fn main() -> anyhow::Result<()> {
171193
Ok(ctx)
172194
},
173195
|ctx, query_idx, format, query| {
174-
set_global_labels(vec![
175-
("format", format.to_string()),
176-
("benchmark_name", benchmark_name.clone()),
177-
("query_idx", query_idx.to_string()),
178-
]);
179-
180-
// Make sure to reopen the duckdb connection between iterations
181-
if !args.reuse {
182-
ctx.reopen()?;
183-
}
184-
ctx.execute_query_result(query)
196+
with_query_labels(&benchmark_name, query_idx, format, || {
197+
// Make sure to reopen the duckdb connection between iterations
198+
if !args.reuse {
199+
ctx.reopen()?;
200+
}
201+
ctx.execute_query_result(query)
202+
})
185203
},
186204
)?;
187205

vortex-duckdb/src/datasource.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,21 @@ pub struct DataSourceLocal {
158158
batch_id: Option<u64>,
159159
}
160160

161+
fn sync_current_labels_from_global() {
162+
unsafe {
163+
use custom_labels::sys;
164+
165+
if sys::current().is_null() {
166+
let ls = sys::new(0);
167+
sys::replace(ls);
168+
};
169+
}
170+
171+
for (key, value) in get_global_labels() {
172+
CURRENT_LABELSET.set(key, value);
173+
}
174+
}
175+
161176
/// Returns scan progress as a percentage (0.0–100.0).
162177
fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 {
163178
let read = bytes_read.load(Ordering::Relaxed);
@@ -364,20 +379,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
364379
_init: &TableInitInput<Self>,
365380
global: &Self::GlobalState,
366381
) -> VortexResult<Self::LocalState> {
367-
unsafe {
368-
use custom_labels::sys;
369-
370-
if sys::current().is_null() {
371-
let ls = sys::new(0);
372-
sys::replace(ls);
373-
};
374-
}
375-
376-
let global_labels = get_global_labels();
377-
378-
for (key, value) in global_labels {
379-
CURRENT_LABELSET.set(key, value);
380-
}
382+
sync_current_labels_from_global();
381383

382384
Ok(DataSourceLocal {
383385
iterator: global.iterator.clone(),
@@ -393,6 +395,10 @@ impl<T: DataSourceTableFunction> TableFunction for T {
393395
global_state: &Self::GlobalState,
394396
chunk: &mut DataChunkRef,
395397
) -> VortexResult<()> {
398+
// DuckDB can reuse a thread-local state across multiple benchmark queries, so refresh
399+
// the thread-local labels on every callback to keep `format`/`query_idx` current.
400+
sync_current_labels_from_global();
401+
396402
loop {
397403
if local_state.exporter.is_none() {
398404
let mut ctx = SESSION.create_execution_ctx();

0 commit comments

Comments
 (0)