Skip to content

Commit b0f2e4a

Browse files
committed
initial
Signed-off-by: Mikhail Kot <to@myrrc.dev>
1 parent f418844 commit b0f2e4a

10 files changed

Lines changed: 189 additions & 87 deletions

File tree

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ typedef struct {
8383
bool has_null;
8484
} duckdb_column_statistics;
8585

86+
typedef struct {
87+
idx_t batch_index;
88+
size_t file_index;
89+
} duckdb_vx_partition_data;
90+
8691
// vtable mimicking subset of TableFunction.
8792
// See duckdb/include/function/tfunc.hpp
8893
typedef struct {
@@ -123,10 +128,10 @@ typedef struct {
123128

124129
double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);
125130

126-
idx_t (*get_partition_data)(const void *bind_data,
127-
void *init_global_data,
128-
void *init_local_data,
129-
duckdb_vx_error *error_out);
131+
void (*get_partition_data)(const void *bind_data,
132+
void *init_global_data,
133+
void *init_local_data,
134+
duckdb_vx_partition_data *partition_data_out);
130135
} duckdb_vx_tfunc_vtab_t;
131136

132137
// A single function for configuring the DuckDB table function vtable.

vortex-duckdb/cpp/table_function.cpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN
1010
#include "duckdb.h"
1111
#include "duckdb/catalog/catalog.hpp"
1212
#include "duckdb/common/insertion_order_preserving_map.hpp"
13+
#include "duckdb/common/multi_file/multi_file_reader.hpp"
1314
#include "duckdb/function/table_function.hpp"
1415
#include "duckdb/main/capi/capi_internal.hpp"
1516
#include "duckdb/main/connection.hpp"
@@ -19,6 +20,7 @@ DUCKDB_INCLUDES_END
1920
using namespace duckdb;
2021
using vortex::CData;
2122
using vortex::IntoErrString;
23+
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
2224

2325
struct CTableFunctionInfo final : TableFunctionInfo {
2426
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
@@ -343,12 +345,12 @@ OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPart
343345
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
344346
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
345347

346-
duckdb_vx_error error_out = nullptr;
347-
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
348-
if (error_out) {
349-
throw InvalidInputException(IntoErrString(error_out));
350-
}
351-
return OperatorPartitionData(batch_index);
348+
duckdb_vx_partition_data partition_data;
349+
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);
350+
351+
OperatorPartitionData out(partition_data.batch_index);
352+
out.partition_data = {ColumnPartitionData(Value::UBIGINT(partition_data.file_index))};
353+
return out;
352354
}
353355

354356
extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
@@ -377,11 +379,9 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
377379

378380
tf.projection_pushdown = true;
379381
tf.filter_pushdown = true;
380-
// We can prune out filter columns that are unused in the remainder of the query plan.
381-
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
382382
tf.filter_prune = true;
383+
tf.late_materialization = true;
383384
tf.sampling_pushdown = false;
384-
tf.late_materialization = false;
385385

386386
tf.pushdown_complex_filter = c_pushdown_complex_filter;
387387
tf.cardinality = c_cardinality;
@@ -390,8 +390,15 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
390390
tf.table_scan_progress = c_table_scan_progress;
391391
tf.statistics = c_statistics;
392392

393+
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
394+
return {COLUMN_IDENTIFIER_FILE_INDEX};
395+
};
396+
393397
tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
394-
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
398+
return {
399+
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
400+
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
401+
};
395402
};
396403

397404
tf.arguments.resize(vtab->parameter_count);

vortex-duckdb/src/datasource.rs

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,21 @@ use crate::duckdb::DataChunkRef;
6767
use crate::duckdb::DuckdbStringMapRef;
6868
use crate::duckdb::ExpressionRef;
6969
use crate::duckdb::LogicalType;
70+
use crate::duckdb::PartitionData;
7071
use crate::duckdb::TableFilterSetRef;
7172
use crate::duckdb::TableFunction;
7273
use crate::duckdb::TableInitInput;
74+
use crate::duckdb::Value;
7375
use crate::exporter::ArrayExporter;
7476
use crate::exporter::ConversionCache;
7577

76-
/// Taken from
77-
/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44
78-
///
79-
/// If DuckDB requests a zero-column projection from read_vortex like count(*),
80-
/// its planner tries to get any column:
81-
/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149
82-
///
83-
/// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the
84-
/// first column. As we don't want to fill the output chunk and we can leave
85-
/// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a
86-
/// virtual column.
87-
/// See virtual_columns in vortex-duckdb/cpp/table_function.cpp
88-
static EMPTY_COLUMN_IDX: u64 = 18446744073709551614;
78+
/// File index virtual column, may be requested either by user or optimizer.
79+
static FILE_INDEX_COLUMN_IDX: u64 = 9223372036854775810;
80+
81+
/// See duckdb/src/common/constants.cpp
82+
fn is_virtual_column(id: u64) -> bool {
83+
id >= 9223372036854775808u64
84+
}
8985

9086
/// A trait for table functions that resolve to a [`DataSourceRef`].
9187
///
@@ -149,14 +145,15 @@ pub struct DataSourceGlobal {
149145
batch_id: AtomicU64,
150146
bytes_total: Arc<AtomicU64>,
151147
bytes_read: AtomicU64,
148+
file_index_column_pos: Option<usize>,
152149
}
153150

154151
/// Per-thread local scan state.
155152
pub struct DataSourceLocal {
156153
iterator: DataSourceIterator,
157154
exporter: Option<ArrayExporter>,
158-
/// The unique batch id of the last chunk exported via scan().
159-
batch_id: Option<u64>,
155+
batch_id: u64,
156+
file_idx: usize,
160157
}
161158

162159
/// Returns scan progress as a percentage (0.0–100.0).
@@ -281,7 +278,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
281278
let column_ids = init_input.column_ids();
282279
let projection_ids = init_input.projection_ids();
283280

284-
let projection_expr =
281+
let (projection_expr, file_idx_pos) =
285282
extract_projection_expr(projection_ids, column_ids, &bind_data.column_fields);
286283
let filter_expr = extract_table_filter_expr(
287284
init_input.table_filter_set(),
@@ -317,10 +314,14 @@ impl<T: DataSourceTableFunction> TableFunction for T {
317314
// first available array chunk.
318315
let stream = scan
319316
.partitions()
320-
.map(move |partition| {
317+
.enumerate()
318+
.map(move |(file_idx, partition)| {
321319
// We create a new conversion cache scoped to the partition, since there's no point
322320
// caching anything across partitions.
323-
let cache = Arc::new(ConversionCache::default());
321+
let cache = Arc::new(ConversionCache {
322+
file_idx,
323+
..Default::default()
324+
});
324325
let tx = tx.clone();
325326

326327
RUNTIME.handle().spawn(async move {
@@ -356,6 +357,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
356357
batch_id: AtomicU64::new(0),
357358
bytes_total: Arc::new(AtomicU64::new(0)),
358359
bytes_read: AtomicU64::new(0),
360+
file_index_column_pos: file_idx_pos,
359361
})
360362
}
361363

@@ -381,7 +383,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
381383
Ok(DataSourceLocal {
382384
iterator: global.iterator.clone(),
383385
exporter: None,
384-
batch_id: None,
386+
batch_id: 0,
387+
file_idx: 0,
385388
})
386389
}
387390

@@ -399,6 +402,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
399402
return Ok(());
400403
};
401404
let (array_result, conversion_cache) = result?;
405+
local_state.file_idx = conversion_cache.file_idx;
402406
let array_result = array_result.optimize_recursive(ctx.session())?;
403407

404408
let array_result: StructArray = if let Some(array) = array_result.as_opt::<Struct>()
@@ -423,30 +427,36 @@ impl<T: DataSourceTableFunction> TableFunction for T {
423427
ctx,
424428
)?);
425429
// Relaxed since there is no intra-instruction ordering required.
426-
local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed));
430+
local_state.batch_id = global_state.batch_id.fetch_add(1, Ordering::Relaxed);
427431
}
428432

429433
let exporter = local_state
430434
.exporter
431435
.as_mut()
432436
.vortex_expect("error: exporter missing");
437+
let has_more_data = exporter.export(chunk, global_state.file_index_column_pos)?;
433438

434-
let has_more_data = exporter.export(chunk)?;
435439
global_state
436440
.bytes_read
437441
.fetch_add(chunk.len(), Ordering::Relaxed);
438442

439443
if !has_more_data {
440444
// This exporter is fully consumed.
441445
local_state.exporter = None;
442-
local_state.batch_id = None;
446+
local_state.batch_id = 0;
443447
} else {
444448
break;
445449
}
446450
}
447451

448452
assert!(!chunk.is_empty());
449453

454+
if let Some(pos) = global_state.file_index_column_pos {
455+
chunk
456+
.get_vector_mut(pos)
457+
.reference_value(&Value::from(local_state.file_idx as u64));
458+
}
459+
450460
Ok(())
451461
}
452462

@@ -518,10 +528,11 @@ impl<T: DataSourceTableFunction> TableFunction for T {
518528
_bind_data: &Self::BindData,
519529
_global_init_data: &Self::GlobalState,
520530
local_init_data: &mut Self::LocalState,
521-
) -> VortexResult<u64> {
522-
local_init_data
523-
.batch_id
524-
.ok_or_else(|| vortex_err!("batch id missing, no batches exported"))
531+
) -> PartitionData {
532+
PartitionData {
533+
batch_index: local_init_data.batch_id,
534+
file_index: local_init_data.file_idx,
535+
}
525536
}
526537

527538
fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) {
@@ -557,39 +568,46 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<Vec<DuckdbField>> {
557568
Ok(fields)
558569
}
559570

560-
/// Creates a projection expression from raw projection/column ID slices and column names.
571+
/// Creates a projection expression from raw projection/column ID slices and
572+
/// column names.
573+
/// If FILE_INDEX_COLUMN_IDX is present, returns its position as second
574+
/// parameter
561575
fn extract_projection_expr(
562576
projection_ids: Option<&[u64]>,
563577
column_ids: &[u64],
564578
column_fields: &[DuckdbField],
565-
) -> Expression {
566-
// Projection ids may be empty, in which case you need to use projection_ids
579+
) -> (Expression, Option<usize>) {
580+
// If projection ids are empty, use column_ids.
567581
// See duckdb/src/planner/operator/logical_get.cpp#L168
568-
let (projection_ids, has_projection_ids) = match projection_ids {
582+
let (ids, has_projection_ids) = match projection_ids {
569583
Some(ids) => (ids, true),
570584
None => (column_ids, false),
571585
};
572586

573-
// duckdb index is u64 (size_t) but in Rust u64 and usize are different things.
587+
let mut file_idx_pos = None;
588+
574589
#[expect(clippy::cast_possible_truncation)]
575-
let names = projection_ids
590+
let names = ids
576591
.iter()
577-
.filter(|p| **p != EMPTY_COLUMN_IDX)
578-
.map(|mut idx| {
579-
if has_projection_ids {
580-
idx = &column_ids[*idx as usize];
592+
.enumerate()
593+
.map(|(column_pos, &column_id)| {
594+
let column_id = if has_projection_ids {
595+
column_ids[column_id as usize]
596+
} else {
597+
column_id
598+
};
599+
600+
if column_id == FILE_INDEX_COLUMN_IDX {
601+
file_idx_pos = Some(column_pos);
581602
}
582603

583-
#[expect(clippy::cast_possible_truncation)]
584-
&column_fields
585-
.get(*idx as usize)
586-
.vortex_expect("prune idx in column names")
587-
.name
604+
column_id
588605
})
589-
.map(|s| Arc::from(s.as_str()))
606+
.filter(|&col_id| !is_virtual_column(col_id))
607+
.map(|col_id| Arc::from(column_fields[col_id as usize].name.as_str()))
590608
.collect::<FieldNames>();
591609

592-
select(names, root())
610+
(select(names, root()), file_idx_pos)
593611
}
594612

595613
/// Creates a table filter expression from the table filter set, column metadata, additional
@@ -604,6 +622,10 @@ fn extract_table_filter_expr(
604622
let mut table_filter_exprs: HashSet<Expression> = if let Some(filter) = table_filter_set {
605623
filter
606624
.into_iter()
625+
.filter(|(idx, _)| {
626+
let idx_u: usize = idx.as_();
627+
!is_virtual_column(column_ids[idx_u].as_())
628+
})
607629
.map(|(idx, ex)| {
608630
let idx_u: usize = idx.as_();
609631
let col_idx: usize = column_ids[idx_u].as_();

vortex-duckdb/src/duckdb/table_function/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ use crate::duckdb::table_function::statistics::statistics;
3636
use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback;
3737
use crate::duckdb_try;
3838

39+
pub struct PartitionData {
40+
pub batch_index: u64,
41+
pub file_index: usize,
42+
}
43+
3944
#[derive(Debug, Default)]
4045
pub struct ColumnStatistics {
4146
pub min: Option<Value>,
@@ -137,10 +142,10 @@ pub trait TableFunction: Sized + Debug {
137142
/// Returns the idx of the current partition being processed by a local threa.
138143
/// This *must* be globally unique.
139144
fn partition_data(
140-
_bind_data: &Self::BindData,
141-
_global_init_data: &Self::GlobalState,
142-
_local_init_data: &mut Self::LocalState,
143-
) -> VortexResult<u64>;
145+
bind_data: &Self::BindData,
146+
global_init_data: &Self::GlobalState,
147+
local_init_data: &mut Self::LocalState,
148+
) -> PartitionData;
144149

145150
/// Returns a vector of key-value pairs for EXPLAIN output
146151
fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef);

vortex-duckdb/src/duckdb/table_function/partition.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,26 @@
33

44
use std::ffi::c_void;
55

6-
use cpp::duckdb_vx_error;
76
use vortex::error::VortexExpect;
87

98
use crate::cpp;
10-
use crate::cpp::idx_t;
119
use crate::duckdb::TableFunction;
1210

1311
/// Native callback for the cardinality estimate of a table function.
1412
pub(crate) unsafe extern "C-unwind" fn get_partition_data_callback<T: TableFunction>(
1513
bind_data: *const c_void,
1614
global_init_data: *mut c_void,
1715
local_init_data: *mut c_void,
18-
error_out: *mut duckdb_vx_error,
19-
) -> idx_t {
16+
partition_data_out: *mut cpp::duckdb_vx_partition_data,
17+
) {
2018
let bind_data =
2119
unsafe { bind_data.cast::<T::BindData>().as_ref() }.vortex_expect("bind_data null pointer");
2220
let global_init_data = unsafe { global_init_data.cast::<T::GlobalState>().as_ref() }
2321
.vortex_expect("global_init_data null pointer");
2422
let local_init_data = unsafe { local_init_data.cast::<T::LocalState>().as_mut() }
2523
.vortex_expect("local_init_data null pointer");
26-
27-
match T::partition_data(bind_data, global_init_data, local_init_data) {
28-
Ok(batch_id) => batch_id,
29-
Err(e) => {
30-
// Set the error in the error output.
31-
let msg = e.to_string();
32-
unsafe { error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len())) };
33-
0
34-
}
35-
}
24+
let data = T::partition_data(bind_data, global_init_data, local_init_data);
25+
let out = unsafe { &mut *partition_data_out };
26+
out.batch_index = data.batch_index;
27+
out.file_index = data.file_index;
3628
}

0 commit comments

Comments
 (0)