Skip to content

Commit e8c4ae9

Browse files
authored
Late materialization support for duckdb (#7721)
Add file_index, file_row_number virtual columns. Add file-based filtering (range, selection) to ScanRequest. Add partition index method. Add late materialization support and row id columns support in duckdb. Attempt 1 was accidentally merged at #7631 and reverted Signed-off-by: Mikhail Kot <to@myrrc.dev>
1 parent e2062a7 commit e8c4ae9

19 files changed

Lines changed: 614 additions & 112 deletions

File tree

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
#include "error.h"
1313
#include "table_filter.h"
1414
#include "duckdb_vx/data.h"
15+
#include <stdint.h>
16+
17+
#ifdef __cplusplus
18+
static_assert(sizeof(idx_t) == 8);
19+
#endif
1520

1621
#ifdef __cplusplus
1722
extern "C" {
@@ -83,6 +88,16 @@ typedef struct {
8388
bool has_null;
8489
} duckdb_column_statistics;
8590

91+
const idx_t INVALID_IDX = UINT64_MAX;
92+
93+
typedef struct {
94+
idx_t partition_index;
95+
// Either INVALID_IDX or position of column in output for file_index column
96+
size_t file_index_column_pos;
97+
// File index for the exported partition.
98+
size_t file_index;
99+
} duckdb_vx_partition_data;
100+
86101
// vtable mimicking subset of TableFunction.
87102
// See duckdb/include/function/tfunc.hpp
88103
typedef struct {
@@ -123,10 +138,10 @@ typedef struct {
123138

124139
double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);
125140

126-
idx_t (*get_partition_data)(const void *bind_data,
127-
void *init_global_data,
128-
void *init_local_data,
129-
duckdb_vx_error *error_out);
141+
void (*get_partition_data)(const void *bind_data,
142+
void *init_global_data,
143+
void *init_local_data,
144+
duckdb_vx_partition_data *partition_data_out);
130145
} duckdb_vx_tfunc_vtab_t;
131146

132147
// A single function for configuring the DuckDB table function vtable.

vortex-duckdb/cpp/table_function.cpp

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN
1010
#include "duckdb.h"
1111
#include "duckdb/catalog/catalog.hpp"
1212
#include "duckdb/common/insertion_order_preserving_map.hpp"
13+
#include "duckdb/common/multi_file/multi_file_reader.hpp"
1314
#include "duckdb/function/table_function.hpp"
1415
#include "duckdb/main/capi/capi_internal.hpp"
1516
#include "duckdb/main/connection.hpp"
@@ -19,6 +20,8 @@ DUCKDB_INCLUDES_END
1920
using namespace duckdb;
2021
using vortex::CData;
2122
using vortex::IntoErrString;
23+
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
24+
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;
2225

2326
struct CTableFunctionInfo final : TableFunctionInfo {
2427
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
@@ -191,6 +194,11 @@ struct CTableBindResult {
191194
vector<string> &names;
192195
};
193196

197+
/**
198+
* Called for every new query. For example, if there is a VIEW over *.vortex,
199+
* and after a query another file is added matching the glob, for second query
200+
* bind() will be called again.
201+
*/
194202
unique_ptr<FunctionData> c_bind(ClientContext &context,
195203
TableFunctionBindInput &input,
196204
vector<LogicalType> &return_types,
@@ -334,21 +342,50 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
334342
result.return_types.emplace_back(logical_type);
335343
}
336344

337-
OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
338-
if (input.partition_info.RequiresPartitionColumns()) {
339-
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
340-
}
345+
/**
346+
* Called at planning time to determine whether data is partitioned by a
347+
* given set of columns. Requested columns are GROUP BY parameters i.e. columns
348+
* over which the query aggregates.
349+
*/
350+
TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) {
351+
const vector<column_t> &ids = input.partition_ids;
352+
// Our data is partitioned by array exporters. Each exporter processes a
353+
// single Array which belongs to a single file. If data is partitioned only
354+
// by file_index, there is one unique value for an Array. Otherwise there
355+
// may be multiple values.
356+
return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX)
357+
? TablePartitionInfo::SINGLE_VALUE_PARTITIONS
358+
: TablePartitionInfo::NOT_PARTITIONED;
359+
}
360+
361+
/**
362+
* Duckdb requests this function after exporting the chunk. We answer with
363+
* partition_index we have exported as well as information about constant
364+
* columns in this partition. As data is partitioned by array exporters, in
365+
* each partition ~ exported array file_index is constant.
366+
*/
367+
OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
341368
auto &bind = input.bind_data->Cast<CTableBindData>();
342369
void *const ffi_bind = bind.ffi_data->DataPtr();
343370
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
344371
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
345-
346-
duckdb_vx_error error_out = nullptr;
347-
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
348-
if (error_out) {
349-
throw InvalidInputException(IntoErrString(error_out));
372+
duckdb_vx_partition_data partition_data;
373+
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);
374+
375+
OperatorPartitionData out(partition_data.partition_index);
376+
377+
// file_index_column_pos may be INVALID_IDX, but column_index will never
378+
// be INVALID_IDX, so we can compare directly
379+
for (const column_t column_index : input.partition_info.partition_columns) {
380+
if (column_index == partition_data.file_index_column_pos) {
381+
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index));
382+
} else {
383+
throw InternalException(StringUtil::Format(
384+
"get_partition_data: requested column_index %d is not constant for given partition",
385+
column_index));
386+
}
350387
}
351-
return OperatorPartitionData(batch_index);
388+
return out;
352389
}
353390

354391
extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
@@ -377,21 +414,30 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
377414

378415
tf.projection_pushdown = true;
379416
tf.filter_pushdown = true;
380-
// We can prune out filter columns that are unused in the remainder of the query plan.
381-
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
382417
tf.filter_prune = true;
383418
tf.sampling_pushdown = false;
384-
tf.late_materialization = false;
385419

386420
tf.pushdown_complex_filter = c_pushdown_complex_filter;
387421
tf.cardinality = c_cardinality;
388-
tf.get_partition_data = c_get_partition_data;
422+
tf.get_partition_info = get_partition_info;
423+
tf.get_partition_data = get_partition_data;
389424
tf.to_string = c_to_string;
390425
tf.table_scan_progress = c_table_scan_progress;
391426
tf.statistics = c_statistics;
392427

428+
tf.late_materialization = true;
429+
// Columns that uniquely identify a row for deferred re-fetch in a multi
430+
// file scan: (file index, row number in file).
431+
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
432+
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
433+
};
434+
393435
tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
394-
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
436+
return {
437+
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
438+
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
439+
{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}},
440+
};
395441
};
396442

397443
tf.arguments.resize(vtab->parameter_count);

vortex-duckdb/src/convert/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ pub use dtype::from_duckdb_table;
1111
pub use expr::try_from_bound_expression;
1212
pub use scalar::*;
1313
pub use table_filter::try_from_table_filter;
14+
pub use table_filter::try_from_virtual_column_filter;
1415
pub use vector::data_chunk_to_vortex;

vortex-duckdb/src/convert/table_filter.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::ops::Range;
45
use std::sync::Arc;
56

67
use itertools::Itertools;
8+
use vortex::buffer::Buffer;
79
use vortex::dtype::DType;
810
use vortex::dtype::Nullability;
911
use vortex::error::VortexExpect;
1012
use vortex::error::VortexResult;
1113
use vortex::error::vortex_bail;
14+
use vortex::error::vortex_err;
1215
use vortex::expr::Expression;
1316
use vortex::expr::and_collect;
1417
use vortex::expr::get_item;
@@ -21,10 +24,13 @@ use vortex::scalar::Scalar;
2124
use vortex::scalar_fn::ScalarFnVTableExt;
2225
use vortex::scalar_fn::fns::binary::Binary;
2326
use vortex::scalar_fn::fns::operators::CompareOperator;
27+
use vortex::scan::selection::Selection;
2428

2529
use crate::cpp::DUCKDB_VX_EXPR_TYPE;
30+
use crate::duckdb::ExtractedValue;
2631
use crate::duckdb::TableFilterClass;
2732
use crate::duckdb::TableFilterRef;
33+
use crate::duckdb::ValueRef;
2834

2935
pub fn try_from_table_filter(
3036
value: &TableFilterRef,
@@ -125,3 +131,96 @@ pub fn try_from_table_filter(
125131
}
126132
}))
127133
}
134+
135+
fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult<u64> {
136+
match value.extract() {
137+
ExtractedValue::BigInt(i) => {
138+
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
139+
}
140+
ExtractedValue::Integer(i) => {
141+
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
142+
}
143+
ExtractedValue::UBigInt(u) => Ok(u),
144+
ExtractedValue::UInteger(u) => Ok(u64::from(u)),
145+
_ => vortex_bail!("unexpected value type"),
146+
}
147+
}
148+
149+
fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec<u64> {
150+
let mut result = Vec::new();
151+
let (mut i, mut j) = (0, 0);
152+
while i < left.len() && j < right.len() {
153+
match left[i].cmp(&right[j]) {
154+
std::cmp::Ordering::Equal => {
155+
result.push(left[i]);
156+
i += 1;
157+
j += 1;
158+
}
159+
std::cmp::Ordering::Less => i += 1,
160+
std::cmp::Ordering::Greater => j += 1,
161+
}
162+
}
163+
result
164+
}
165+
166+
/// For constant comparison on IN filters over file_index or file_row_number
167+
/// virtual column, create a selection and a range covering the same range as
168+
/// expressions do.
169+
pub fn try_from_virtual_column_filter(
170+
filter: &TableFilterRef,
171+
) -> VortexResult<(Selection, Option<Range<u64>>)> {
172+
match filter.as_class() {
173+
TableFilterClass::InFilter(values) => {
174+
let indices = values
175+
.iter()
176+
.map(nonnegative_number_from_value)
177+
.collect::<VortexResult<Vec<u64>>>()?;
178+
Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None))
179+
}
180+
TableFilterClass::ConstantComparison(const_) => {
181+
let n = nonnegative_number_from_value(const_.value)?;
182+
let range = match const_.operator {
183+
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1),
184+
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => {
185+
Some(n..u64::MAX)
186+
}
187+
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => {
188+
Some(n.saturating_add(1)..u64::MAX)
189+
}
190+
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => {
191+
Some(0..n.saturating_add(1))
192+
}
193+
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n),
194+
_ => None,
195+
};
196+
Ok((Selection::All, range))
197+
}
198+
TableFilterClass::ConjunctionAnd(conj) => {
199+
let mut start = 0u64;
200+
let mut end = u64::MAX;
201+
let mut indices: Option<Vec<u64>> = None;
202+
for child in conj.children() {
203+
let (sel, range) = try_from_virtual_column_filter(child)?;
204+
if let Selection::IncludeByIndex(buf) = sel {
205+
indices = Some(match indices {
206+
None => buf.iter().copied().collect(),
207+
Some(existing) => intersect_sorted(&existing, buf.as_ref()),
208+
});
209+
}
210+
if let Some(r) = range {
211+
start = start.max(r.start);
212+
end = end.min(r.end);
213+
}
214+
}
215+
let range = (start < end).then_some(start..end);
216+
let sel = indices
217+
.map(|v| Selection::IncludeByIndex(Buffer::from_iter(v)))
218+
.unwrap_or(Selection::All);
219+
Ok((sel, range))
220+
}
221+
TableFilterClass::Optional(child) => {
222+
try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None)))
223+
}
224+
_ => Ok((Selection::All, None)),
225+
}
226+
}

0 commit comments

Comments
 (0)