Skip to content

Commit a3e6376

Browse files
committed
initial
1 parent cc06c60 commit a3e6376

9 files changed

Lines changed: 165 additions & 85 deletions

File tree

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ typedef struct {
8383
bool has_null;
8484
} duckdb_column_statistics;
8585

86+
typedef struct {
87+
idx_t batch_index;
88+
size_t file_index;
89+
} duckdb_vx_partition_data;
90+
8691
// vtable mimicking subset of TableFunction.
8792
// See duckdb/include/function/tfunc.hpp
8893
typedef struct {
@@ -123,10 +128,10 @@ typedef struct {
123128

124129
double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);
125130

126-
idx_t (*get_partition_data)(const void *bind_data,
127-
void *init_global_data,
128-
void *init_local_data,
129-
duckdb_vx_error *error_out);
131+
void (*get_partition_data)(const void *bind_data,
132+
void *init_global_data,
133+
void *init_local_data,
134+
duckdb_vx_partition_data *partition_data_out);
130135
} duckdb_vx_tfunc_vtab_t;
131136

132137
// A single function for configuring the DuckDB table function vtable.

vortex-duckdb/cpp/table_function.cpp

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN
1010
#include "duckdb.h"
1111
#include "duckdb/catalog/catalog.hpp"
1212
#include "duckdb/common/insertion_order_preserving_map.hpp"
13+
#include "duckdb/common/multi_file/multi_file_reader.hpp"
1314
#include "duckdb/function/table_function.hpp"
1415
#include "duckdb/main/capi/capi_internal.hpp"
1516
#include "duckdb/main/connection.hpp"
@@ -19,6 +20,8 @@ DUCKDB_INCLUDES_END
1920
using namespace duckdb;
2021
using vortex::CData;
2122
using vortex::IntoErrString;
23+
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
24+
// constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;
2225

2326
struct CTableFunctionInfo final : TableFunctionInfo {
2427
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
@@ -343,12 +346,12 @@ OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPart
343346
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
344347
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
345348

346-
duckdb_vx_error error_out = nullptr;
347-
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
348-
if (error_out) {
349-
throw InvalidInputException(IntoErrString(error_out));
350-
}
351-
return OperatorPartitionData(batch_index);
349+
duckdb_vx_partition_data partition_data;
350+
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);
351+
352+
OperatorPartitionData out(partition_data.batch_index);
353+
out.partition_data = {ColumnPartitionData(Value::UBIGINT(partition_data.file_index))};
354+
return out;
352355
}
353356

354357
extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
@@ -380,8 +383,8 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
380383
// We can prune out filter columns that are unused in the remainder of the query plan.
381384
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
382385
tf.filter_prune = true;
386+
tf.late_materialization = true;
383387
tf.sampling_pushdown = false;
384-
tf.late_materialization = false;
385388

386389
tf.pushdown_complex_filter = c_pushdown_complex_filter;
387390
tf.cardinality = c_cardinality;
@@ -390,8 +393,19 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
390393
tf.table_scan_progress = c_table_scan_progress;
391394
tf.statistics = c_statistics;
392395

396+
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
397+
return {
398+
COLUMN_IDENTIFIER_FILE_INDEX,
399+
// COLUMN_IDENTIFIER_FILE_ROW_NUMBER
400+
};
401+
};
402+
393403
tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
394-
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
404+
return {
405+
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
406+
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
407+
//{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}}
408+
};
395409
};
396410

397411
tf.arguments.resize(vtab->parameter_count);

vortex-duckdb/src/datasource.rs

Lines changed: 65 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,21 @@ use crate::duckdb::DataChunkRef;
6767
use crate::duckdb::DuckdbStringMapRef;
6868
use crate::duckdb::ExpressionRef;
6969
use crate::duckdb::LogicalType;
70+
use crate::duckdb::PartitionData;
7071
use crate::duckdb::TableFilterSetRef;
7172
use crate::duckdb::TableFunction;
7273
use crate::duckdb::TableInitInput;
74+
use crate::duckdb::Value;
7375
use crate::exporter::ArrayExporter;
7476
use crate::exporter::ConversionCache;
7577

76-
/// Taken from
77-
/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44
78-
///
79-
/// If DuckDB requests a zero-column projection from read_vortex like count(*),
80-
/// its planner tries to get any column:
81-
/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149
82-
///
83-
/// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the
84-
/// first column. As we don't want to fill the output chunk and we can leave
85-
/// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a
86-
/// virtual column.
87-
/// See virtual_columns in vortex-duckdb/cpp/table_function.cpp
88-
static EMPTY_COLUMN_IDX: u64 = 18446744073709551614;
78+
/// File index virtual column, may be requested either by user or optimizer.
79+
static FILE_INDEX_COLUMN_IDX: u64 = 9223372036854775810;
80+
81+
/// See duckdb/src/common/constants.cpp
82+
fn is_virtual_column(id: u64) -> bool {
83+
return id >= 9223372036854775808u64;
84+
}
8985

9086
/// A trait for table functions that resolve to a [`DataSourceRef`].
9187
///
@@ -141,22 +137,24 @@ impl Debug for DataSourceBindData {
141137
}
142138
}
143139

144-
type DataSourceIterator = ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>;
140+
type DataSourceIterator = ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>, usize)>>;
145141

146142
/// Global scan state for driving a `DataSource` scan through DuckDB.
147143
pub struct DataSourceGlobal {
148144
iterator: DataSourceIterator,
149145
batch_id: AtomicU64,
150146
bytes_total: Arc<AtomicU64>,
151147
bytes_read: AtomicU64,
148+
/// "file_index" column position if requested
149+
file_idx_pos: Option<usize>,
152150
}
153151

154152
/// Per-thread local scan state.
155153
pub struct DataSourceLocal {
156154
iterator: DataSourceIterator,
157155
exporter: Option<ArrayExporter>,
158-
/// The unique batch id of the last chunk exported via scan().
159-
batch_id: Option<u64>,
156+
batch_id: u64,
157+
file_idx: usize,
160158
}
161159

162160
/// Returns scan progress as a percentage (0.0–100.0).
@@ -281,7 +279,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
281279
let column_ids = init_input.column_ids();
282280
let projection_ids = init_input.projection_ids();
283281

284-
let projection_expr =
282+
let (projection_expr, file_idx_pos) =
285283
extract_projection_expr(projection_ids, column_ids, &bind_data.column_fields);
286284
let filter_expr = extract_table_filter_expr(
287285
init_input.table_filter_set(),
@@ -317,7 +315,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
317315
// first available array chunk.
318316
let stream = scan
319317
.partitions()
320-
.map(move |partition| {
318+
.enumerate()
319+
.map(move |(file_idx, partition)| {
321320
// We create a new conversion cache scoped to the partition, since there's no point
322321
// caching anything across partitions.
323322
let cache = Arc::new(ConversionCache::default());
@@ -333,7 +332,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
333332
};
334333
while let Some(item) = stream.next().await {
335334
if tx
336-
.send(item.map(|a| (a, Arc::clone(&cache))))
335+
.send(item.map(|a| (a, Arc::clone(&cache), file_idx)))
337336
.await
338337
.is_err()
339338
{
@@ -356,6 +355,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
356355
batch_id: AtomicU64::new(0),
357356
bytes_total: Arc::new(AtomicU64::new(0)),
358357
bytes_read: AtomicU64::new(0),
358+
file_idx_pos,
359359
})
360360
}
361361

@@ -381,7 +381,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
381381
Ok(DataSourceLocal {
382382
iterator: global.iterator.clone(),
383383
exporter: None,
384-
batch_id: None,
384+
batch_id: 0,
385+
file_idx: 0,
385386
})
386387
}
387388

@@ -398,7 +399,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
398399
let Some(result) = local_state.iterator.next() else {
399400
return Ok(());
400401
};
401-
let (array_result, conversion_cache) = result?;
402+
let (array_result, conversion_cache, file_idx) = result?;
403+
local_state.file_idx = file_idx;
402404
let array_result = array_result.optimize_recursive()?;
403405

404406
let array_result: StructArray = if let Some(array) = array_result.as_opt::<Struct>()
@@ -423,30 +425,36 @@ impl<T: DataSourceTableFunction> TableFunction for T {
423425
ctx,
424426
)?);
425427
// Relaxed since there is no intra-instruction ordering required.
426-
local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed));
428+
local_state.batch_id = global_state.batch_id.fetch_add(1, Ordering::Relaxed);
427429
}
428430

429431
let exporter = local_state
430432
.exporter
431433
.as_mut()
432434
.vortex_expect("error: exporter missing");
435+
let has_more_data = exporter.export(chunk, global_state.file_idx_pos)?;
433436

434-
let has_more_data = exporter.export(chunk)?;
435437
global_state
436438
.bytes_read
437439
.fetch_add(chunk.len(), Ordering::Relaxed);
438440

439441
if !has_more_data {
440442
// This exporter is fully consumed.
441443
local_state.exporter = None;
442-
local_state.batch_id = None;
444+
local_state.batch_id = 0;
443445
} else {
444446
break;
445447
}
446448
}
447449

448450
assert!(!chunk.is_empty());
449451

452+
if let Some(pos) = global_state.file_idx_pos {
453+
chunk
454+
.get_vector_mut(pos)
455+
.reference_value(&Value::new_u64(local_state.file_idx as u64));
456+
}
457+
450458
Ok(())
451459
}
452460

@@ -518,10 +526,11 @@ impl<T: DataSourceTableFunction> TableFunction for T {
518526
_bind_data: &Self::BindData,
519527
_global_init_data: &Self::GlobalState,
520528
local_init_data: &mut Self::LocalState,
521-
) -> VortexResult<u64> {
522-
local_init_data
523-
.batch_id
524-
.ok_or_else(|| vortex_err!("batch id missing, no batches exported"))
529+
) -> PartitionData {
530+
PartitionData {
531+
batch_index: local_init_data.batch_id,
532+
file_index: local_init_data.file_idx,
533+
}
525534
}
526535

527536
fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) {
@@ -557,39 +566,46 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<Vec<DuckdbField>> {
557566
Ok(fields)
558567
}
559568

560-
/// Creates a projection expression from raw projection/column ID slices and column names.
569+
/// Creates a projection expression from raw projection/column ID slices and
570+
/// column names.
571+
/// If FILE_INDEX_COLUMN_IDX is present, returns its position as second
572+
/// parameter
561573
fn extract_projection_expr(
562574
projection_ids: Option<&[u64]>,
563575
column_ids: &[u64],
564576
column_fields: &[DuckdbField],
565-
) -> Expression {
566-
// Projection ids may be empty, in which case you need to use projection_ids
577+
) -> (Expression, Option<usize>) {
578+
// If projection ids are empty, use column_ids.
567579
// See duckdb/src/planner/operator/logical_get.cpp#L168
568-
let (projection_ids, has_projection_ids) = match projection_ids {
580+
let (ids, has_projection_ids) = match projection_ids {
569581
Some(ids) => (ids, true),
570582
None => (column_ids, false),
571583
};
572584

573-
// duckdb index is u64 (size_t) but in Rust u64 and usize are different things.
585+
let mut file_idx_pos = None;
586+
574587
#[expect(clippy::cast_possible_truncation)]
575-
let names = projection_ids
588+
let names = ids
576589
.iter()
577-
.filter(|p| **p != EMPTY_COLUMN_IDX)
578-
.map(|mut idx| {
579-
if has_projection_ids {
580-
idx = &column_ids[*idx as usize];
590+
.enumerate()
591+
.map(|(column_pos, &column_id)| {
592+
let column_id = if has_projection_ids {
593+
column_ids[column_id as usize]
594+
} else {
595+
column_id
596+
};
597+
598+
if column_id == FILE_INDEX_COLUMN_IDX {
599+
file_idx_pos = Some(column_pos);
581600
}
582601

583-
#[expect(clippy::cast_possible_truncation)]
584-
&column_fields
585-
.get(*idx as usize)
586-
.vortex_expect("prune idx in column names")
587-
.name
602+
column_id
588603
})
589-
.map(|s| Arc::from(s.as_str()))
604+
.filter(|&col_id| !is_virtual_column(col_id))
605+
.map(|col_id| Arc::from(column_fields[col_id as usize].name.as_str()))
590606
.collect::<FieldNames>();
591607

592-
select(names, root())
608+
(select(names, root()), file_idx_pos)
593609
}
594610

595611
/// Creates a table filter expression from the table filter set, column metadata, additional
@@ -604,6 +620,10 @@ fn extract_table_filter_expr(
604620
let mut table_filter_exprs: HashSet<Expression> = if let Some(filter) = table_filter_set {
605621
filter
606622
.into_iter()
623+
.filter(|(idx, _)| {
624+
let idx_u: usize = idx.as_();
625+
!is_virtual_column(column_ids[idx_u].as_())
626+
})
607627
.map(|(idx, ex)| {
608628
let idx_u: usize = idx.as_();
609629
let col_idx: usize = column_ids[idx_u].as_();

vortex-duckdb/src/duckdb/table_function/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ use crate::duckdb::table_function::statistics::statistics;
3636
use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback;
3737
use crate::duckdb_try;
3838

39+
pub struct PartitionData {
40+
pub batch_index: u64,
41+
pub file_index: usize,
42+
}
43+
3944
#[derive(Debug, Default)]
4045
pub struct ColumnStatistics {
4146
pub min: Option<Value>,
@@ -137,10 +142,10 @@ pub trait TableFunction: Sized + Debug {
137142
/// Returns the idx of the current partition being processed by a local threa.
138143
/// This *must* be globally unique.
139144
fn partition_data(
140-
_bind_data: &Self::BindData,
141-
_global_init_data: &Self::GlobalState,
142-
_local_init_data: &mut Self::LocalState,
143-
) -> VortexResult<u64>;
145+
bind_data: &Self::BindData,
146+
global_init_data: &Self::GlobalState,
147+
local_init_data: &mut Self::LocalState,
148+
) -> PartitionData;
144149

145150
/// Returns a vector of key-value pairs for EXPLAIN output
146151
fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef);

0 commit comments

Comments
 (0)