Skip to content

Commit 8325606

Browse files
committed
better
1 parent 6d4dfca commit 8325606

10 files changed

Lines changed: 165 additions & 32 deletions

File tree

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ typedef struct {
8585

8686
typedef struct {
8787
idx_t batch_index;
88-
size_t file_index;
88+
size_t file_index_column_pos;
89+
size_t file_row_number_column_pos;
90+
bool has_file_index_column_pos;
91+
bool has_file_row_number_column_pos;
8992
} duckdb_vx_partition_data;
9093

9194
// vtable mimicking subset of TableFunction.

vortex-duckdb/cpp/table_function.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,6 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
338338
}
339339

340340
OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
341-
if (input.partition_info.RequiresPartitionColumns()) {
342-
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
343-
}
344341
auto &bind = input.bind_data->Cast<CTableBindData>();
345342
void *const ffi_bind = bind.ffi_data->DataPtr();
346343
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
@@ -350,7 +347,16 @@ OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPart
350347
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);
351348

352349
OperatorPartitionData out(partition_data.batch_index);
353-
out.partition_data = {ColumnPartitionData(Value::UBIGINT(partition_data.file_index))};
350+
for (idx_t col : input.partition_info.partition_columns) {
351+
if (partition_data.has_file_index_column_pos && col == partition_data.file_index_column_pos) {
352+
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index_column_pos));
353+
} else if (partition_data.has_file_row_number_column_pos &&
354+
col == partition_data.file_row_number_column_pos) {
355+
out.partition_data.emplace_back(Value::BIGINT(partition_data.file_row_number_column_pos));
356+
} else {
357+
throw InternalException("get_partition_data - did not find constant for the given partition");
358+
}
359+
}
354360
return out;
355361
}
356362

@@ -391,6 +397,8 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
391397
tf.table_scan_progress = c_table_scan_progress;
392398
tf.statistics = c_statistics;
393399

400+
// What columns uniquely identify a row for deferred re-fetch in a
401+
// multi-file scan: (file index, row index in file).
394402
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
395403
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
396404
};

vortex-duckdb/src/datasource.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@ use vortex::array::optimizer::ArrayOptimizer;
2828
use vortex::array::stats::StatsSet;
2929
use vortex::dtype::DType;
3030
use vortex::dtype::FieldNames;
31+
use vortex::dtype::PType;
3132
use vortex::error::VortexExpect;
3233
use vortex::error::VortexResult;
3334
use vortex::error::vortex_err;
3435
use vortex::expr::Expression;
3536
use vortex::expr::and_collect;
37+
use vortex::expr::cast;
3638
use vortex::expr::col;
39+
use vortex::expr::merge;
40+
use vortex::expr::pack;
3741
use vortex::expr::root;
3842
use vortex::expr::select;
3943
use vortex::expr::stats::Precision;
@@ -42,6 +46,7 @@ use vortex::file::v2::FileStatsLayoutReader;
4246
use vortex::io::kanal_ext::KanalExt;
4347
use vortex::io::runtime::BlockingRuntime;
4448
use vortex::io::runtime::current::ThreadSafeIterator;
49+
use vortex::layout::layouts::row_idx::row_idx;
4550
use vortex::layout::scan::multi::MultiLayoutChild;
4651
use vortex::layout::scan::multi::MultiLayoutDataSource;
4752
use vortex::metrics::tracing::get_global_labels;
@@ -75,9 +80,11 @@ use crate::duckdb::Value;
7580
use crate::exporter::ArrayExporter;
7681
use crate::exporter::ConversionCache;
7782

78-
/// "file_index" virtual column, may be requested either by user or optimizer.
83+
// See MultiFileReader for constants
84+
85+
/// "file_index" virtual column
7986
static FILE_INDEX_COLUMN_IDX: u64 = 9223372036854775810;
80-
/// "file_row_number" virtual column, may be requested either by user or optimizer.
87+
/// "file_row_number" virtual column
8188
static FILE_ROW_NUMBER_COLUMN_IDX: u64 = 9223372036854775809;
8289

8390
/// See duckdb/src/common/constants.cpp
@@ -156,7 +163,7 @@ pub struct DataSourceLocal {
156163
iterator: DataSourceIterator,
157164
exporter: Option<ArrayExporter>,
158165
batch_id: u64,
159-
file_idx: usize,
166+
file_index: usize,
160167
}
161168

162169
/// Returns scan progress as a percentage (0.0–100.0).
@@ -302,7 +309,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
302309
let request = ScanRequest {
303310
projection,
304311
filter: filter_expr,
305-
ordered: false,
312+
ordered: file_row_number_column_pos.is_some(),
306313
..Default::default()
307314
};
308315

@@ -321,11 +328,11 @@ impl<T: DataSourceTableFunction> TableFunction for T {
321328
let stream = scan
322329
.partitions()
323330
.enumerate()
324-
.map(move |(file_idx, partition)| {
331+
.map(move |(file_index, partition)| {
325332
// We create a new conversion cache scoped to the partition, since there's no point
326333
// caching anything across partitions.
327334
let cache = Arc::new(ConversionCache {
328-
file_idx,
335+
file_index,
329336
..Default::default()
330337
});
331338
let tx = tx.clone();
@@ -391,7 +398,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
391398
iterator: global.iterator.clone(),
392399
exporter: None,
393400
batch_id: 0,
394-
file_idx: 0,
401+
file_index: 0,
395402
})
396403
}
397404

@@ -409,8 +416,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
409416
return Ok(());
410417
};
411418
let (array_result, conversion_cache) = result?;
412-
local_state.file_idx = conversion_cache.file_idx;
413419
let array_result = array_result.optimize_recursive(ctx.session())?;
420+
local_state.file_index = conversion_cache.file_index;
414421

415422
let array_result: StructArray = if let Some(array) = array_result.as_opt::<Struct>()
416423
{
@@ -441,7 +448,11 @@ impl<T: DataSourceTableFunction> TableFunction for T {
441448
.exporter
442449
.as_mut()
443450
.vortex_expect("error: exporter missing");
444-
let has_more_data = exporter.export(chunk, global_state.file_index_column_pos)?;
451+
let has_more_data = exporter.export(
452+
chunk,
453+
global_state.file_index_column_pos,
454+
global_state.file_row_number_column_pos,
455+
)?;
445456

446457
global_state
447458
.bytes_read
@@ -461,7 +472,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
461472
if let Some(pos) = global_state.file_index_column_pos {
462473
chunk
463474
.get_vector_mut(pos)
464-
.reference_value(&Value::from(local_state.file_idx as u64));
475+
.reference_value(&Value::from(local_state.file_index as u64));
465476
}
466477

467478
Ok(())
@@ -533,12 +544,13 @@ impl<T: DataSourceTableFunction> TableFunction for T {
533544

534545
fn partition_data(
535546
_bind_data: &Self::BindData,
536-
_global_init_data: &Self::GlobalState,
547+
global_init_data: &Self::GlobalState,
537548
local_init_data: &mut Self::LocalState,
538549
) -> PartitionData {
539550
PartitionData {
540551
batch_index: local_init_data.batch_id,
541-
file_index: local_init_data.file_idx,
552+
file_index_column_pos: global_init_data.file_index_column_pos,
553+
file_row_number_column_pos: global_init_data.file_row_number_column_pos,
542554
}
543555
}
544556

@@ -583,8 +595,6 @@ struct ProjectionWithVirtualColumns {
583595

584596
/// Creates a projection expression from raw projection/column ID slices and
585597
/// column names.
586-
/// If FILE_INDEX_COLUMN_IDX is present, returns its position as second
587-
/// parameter
588598
fn extract_projection_expr(
589599
projection_ids: Option<&[u64]>,
590600
column_ids: &[u64],
@@ -624,8 +634,21 @@ fn extract_projection_expr(
624634
.map(|col_id| Arc::from(column_fields[col_id as usize].name.as_str()))
625635
.collect::<FieldNames>();
626636

637+
// file_index column will be filled later when exporting the chunk.
638+
639+
let select = select(names, root());
640+
let projection = if file_row_number_column_pos.is_some() {
641+
// Append row_idx to end - it will be rearranged to correct position
642+
// in scan()
643+
let row_idx = cast(row_idx(), DType::Primitive(PType::I64, false.into()));
644+
let row_idx_struct = pack([("file_row_number", row_idx)], false.into());
645+
merge([select, row_idx_struct])
646+
} else {
647+
select
648+
};
649+
627650
ProjectionWithVirtualColumns {
628-
projection: select(names, root()),
651+
projection,
629652
file_index_column_pos,
630653
file_row_number_column_pos,
631654
}

vortex-duckdb/src/duckdb/table_function/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ use crate::duckdb_try;
3838

3939
pub struct PartitionData {
4040
pub batch_index: u64,
41-
pub file_index: usize,
41+
pub file_index_column_pos: Option<usize>,
42+
pub file_row_number_column_pos: Option<usize>,
4243
}
4344

4445
#[derive(Debug, Default)]

vortex-duckdb/src/duckdb/table_function/partition.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,18 @@ pub(crate) unsafe extern "C-unwind" fn get_partition_data_callback<T: TableFunct
2424
let data = T::partition_data(bind_data, global_init_data, local_init_data);
2525
let out = unsafe { &mut *partition_data_out };
2626
out.batch_index = data.batch_index;
27-
out.file_index = data.file_index;
27+
28+
if let Some(pos) = data.file_index_column_pos {
29+
out.has_file_index_column_pos = true;
30+
out.file_index_column_pos = pos;
31+
} else {
32+
out.has_file_index_column_pos = false;
33+
}
34+
35+
if let Some(pos) = data.file_row_number_column_pos {
36+
out.has_file_row_number_column_pos = true;
37+
out.file_row_number_column_pos = pos;
38+
} else {
39+
out.has_file_row_number_column_pos = false;
40+
}
2841
}

vortex-duckdb/src/e2e_test/object_cache_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl TableFunction for TestTableFunction {
114114
) -> PartitionData {
115115
PartitionData {
116116
batch_index: 0,
117-
file_index: 0,
117+
file_index_column_pos: 0,
118118
}
119119
}
120120

vortex-duckdb/src/exporter/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ pub struct ConversionCache {
2121
pub dict_cache: DashMap<usize, (ArrayRef, ReusableDict)>,
2222
pub values_cache: DashMap<usize, (ArrayRef, Arc<Mutex<Vector>>)>,
2323
pub canonical_cache: DashMap<usize, (ArrayRef, Canonical)>,
24-
pub file_idx: usize,
24+
pub file_index: usize,
2525
}

vortex-duckdb/src/exporter/mod.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,19 @@ impl ArrayExporter {
7878
pub fn export(
7979
&mut self,
8080
chunk: &mut DataChunkRef,
81-
file_idx_pos: Option<usize>,
81+
file_index_column_pos: Option<usize>,
82+
file_row_number_column_pos: Option<usize>,
8283
) -> VortexResult<bool> {
8384
chunk.reset();
8485
if self.remaining == 0 {
8586
return Ok(false);
8687
}
8788

8889
let zero_projection = self.fields.is_empty();
89-
let expected_cols = self.fields.len() + file_idx_pos.is_some() as usize;
90+
91+
// file_row_number column is already populated in scan construction
92+
let expected_cols = self.fields.len() + file_index_column_pos.is_some() as usize;
93+
9094
let chunk_cols = chunk.column_count();
9195
if !zero_projection && chunk_cols != expected_cols {
9296
vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}");
@@ -110,13 +114,27 @@ impl ArrayExporter {
110114
return Ok(true);
111115
}
112116

113-
let mut fields = self.fields.iter_mut();
117+
let mut fields = self.fields.iter();
114118
for i in 0..chunk_cols {
115-
if let Some(file_idx_pos) = file_idx_pos
116-
&& i == file_idx_pos
119+
// file_index column: skip this index - it will be filled after
120+
// chunk export.
121+
if let Some(pos) = file_index_column_pos
122+
&& i == pos
117123
{
118124
continue;
119125
}
126+
127+
// file_row_number column: populate the column at index with
128+
// contents of last column - row index column is appended at end in
129+
// scan construction
130+
if let Some(pos) = file_row_number_column_pos
131+
&& i == pos
132+
{
133+
let field = &self.fields[chunk_cols - 1];
134+
field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?;
135+
continue;
136+
}
137+
120138
let field = fields.next().vortex_expect("field count mismatch");
121139
field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?;
122140
}

vortex-file/src/multi/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use vortex_error::vortex_bail;
1616
use vortex_io::filesystem::FileListing;
1717
use vortex_io::filesystem::FileSystemRef;
1818
use vortex_layout::LayoutReaderRef;
19+
use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
1920
use vortex_layout::scan::multi::LayoutReaderFactory;
2021
use vortex_layout::scan::multi::MultiLayoutDataSource;
2122
use vortex_scan::DataSource;
@@ -142,7 +143,8 @@ impl MultiFileDataSource {
142143
let (first_file_listing, first_fs) = &all_files[0];
143144
let open_fn = self.open_options_fn.as_ref();
144145
let first_file = open_file(first_fs, first_file_listing, &self.session, open_fn).await?;
145-
let first_reader = layout_reader_with_stats(&first_file)?;
146+
let first_reader =
147+
layout_reader_with_stats_and_row_idxs(&first_file, self.session.clone())?;
146148

147149
let factories: Vec<Arc<dyn LayoutReaderFactory>> = all_files[1..]
148150
.iter()
@@ -228,8 +230,14 @@ async fn open_file(
228230

229231
/// Creates a layout reader from a VortexFile, wrapping with `FileStatsLayoutReader` when
230232
/// file-level statistics are available.
231-
fn layout_reader_with_stats(file: &crate::VortexFile) -> VortexResult<LayoutReaderRef> {
233+
/// Also wraps with RowIdxLayoutReader
234+
fn layout_reader_with_stats_and_row_idxs(
235+
file: &crate::VortexFile,
236+
session: VortexSession,
237+
) -> VortexResult<LayoutReaderRef> {
232238
let mut reader = file.layout_reader()?;
239+
reader = Arc::new(RowIdxLayoutReader::new(0, reader, session));
240+
233241
if let Some(stats) = file.file_stats().cloned() {
234242
reader = Arc::new(FileStatsLayoutReader::new(
235243
reader,
@@ -258,6 +266,9 @@ impl LayoutReaderFactory for VortexFileReaderFactory {
258266
self.open_options_fn.as_ref(),
259267
)
260268
.await?;
261-
Ok(Some(layout_reader_with_stats(&file)?))
269+
Ok(Some(layout_reader_with_stats_and_row_idxs(
270+
&file,
271+
self.session.clone(),
272+
)?))
262273
}
263274
}

0 commit comments

Comments
 (0)