Skip to content

Commit 41adb8e

Browse files
joseph-isaacsrobert3005
authored andcommitted
Revert "Late materialization support for duckdb" (#7719)
Reverts #7631
1 parent 0e5c154 commit 41adb8e

19 files changed

Lines changed: 109 additions & 595 deletions

File tree

vortex-duckdb/cpp/include/duckdb_vx/table_function.h

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,6 @@
1212
#include "error.h"
1313
#include "table_filter.h"
1414
#include "duckdb_vx/data.h"
15-
#include <stdint.h>
16-
17-
#ifdef __cplusplus
18-
static_assert(sizeof(idx_t) == 8);
19-
#endif
2015

2116
#ifdef __cplusplus
2217
extern "C" {
@@ -88,16 +83,6 @@ typedef struct {
8883
bool has_null;
8984
} duckdb_column_statistics;
9085

91-
const idx_t INVALID_IDX = UINT64_MAX;
92-
93-
typedef struct {
94-
idx_t partition_index;
95-
// Either INVALID_IDX or position of column in output for file_index column
96-
size_t file_index_column_pos;
97-
// File index for the exported partition.
98-
size_t file_index;
99-
} duckdb_vx_partition_data;
100-
10186
// vtable mimicking subset of TableFunction.
10287
// See duckdb/include/function/tfunc.hpp
10388
typedef struct {
@@ -138,10 +123,10 @@ typedef struct {
138123

139124
double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);
140125

141-
void (*get_partition_data)(const void *bind_data,
142-
void *init_global_data,
143-
void *init_local_data,
144-
duckdb_vx_partition_data *partition_data_out);
126+
idx_t (*get_partition_data)(const void *bind_data,
127+
void *init_global_data,
128+
void *init_local_data,
129+
duckdb_vx_error *error_out);
145130
} duckdb_vx_tfunc_vtab_t;
146131

147132
// A single function for configuring the DuckDB table function vtable.

vortex-duckdb/cpp/table_function.cpp

Lines changed: 15 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ DUCKDB_INCLUDES_BEGIN
1010
#include "duckdb.h"
1111
#include "duckdb/catalog/catalog.hpp"
1212
#include "duckdb/common/insertion_order_preserving_map.hpp"
13-
#include "duckdb/common/multi_file/multi_file_reader.hpp"
1413
#include "duckdb/function/table_function.hpp"
1514
#include "duckdb/main/capi/capi_internal.hpp"
1615
#include "duckdb/main/connection.hpp"
@@ -20,8 +19,6 @@ DUCKDB_INCLUDES_END
2019
using namespace duckdb;
2120
using vortex::CData;
2221
using vortex::IntoErrString;
23-
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
24-
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;
2522

2623
struct CTableFunctionInfo final : TableFunctionInfo {
2724
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
@@ -337,50 +334,21 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
337334
result.return_types.emplace_back(logical_type);
338335
}
339336

340-
/**
341-
* Called at planning time to determine whether data is partitioned by a
342-
* given set of columns. Requested columns are GROUP BY parameters i.e. columns
343-
* over which the query aggregates.
344-
*/
345-
TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) {
346-
const vector<column_t> &ids = input.partition_ids;
347-
// Our data is partitioned by array exporters. Each exporter processes a
348-
// single Array which belongs to a single file. If data is partitioned only
349-
// by file_index, there is one unique value for an Array. Otherwise there
350-
// may be multiple values.
351-
return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX)
352-
? TablePartitionInfo::SINGLE_VALUE_PARTITIONS
353-
: TablePartitionInfo::NOT_PARTITIONED;
354-
}
355-
356-
/**
357-
* Duckdb requests this function after exporting the chunk. We answer with
358-
* partition_index we have exported as well as information about constant
359-
* columns in this partition. As data is partitioned by array exporters, in
360-
* each partition ~ exported array file_index is constant.
361-
*/
362-
OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
337+
OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
338+
if (input.partition_info.RequiresPartitionColumns()) {
339+
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
340+
}
363341
auto &bind = input.bind_data->Cast<CTableBindData>();
364342
void *const ffi_bind = bind.ffi_data->DataPtr();
365343
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
366344
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
367-
duckdb_vx_partition_data partition_data;
368-
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);
369-
370-
OperatorPartitionData out(partition_data.partition_index);
371-
372-
// file_index_column_pos may be INVALID_IDX, but column_index will never
373-
// be INVALID_IDX, so we can compare directly
374-
for (const column_t column_index : input.partition_info.partition_columns) {
375-
if (column_index == partition_data.file_index_column_pos) {
376-
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index));
377-
} else {
378-
throw InternalException(StringUtil::Format(
379-
"get_partition_data: requested column_index %d is not constant for given partition",
380-
column_index));
381-
}
345+
346+
duckdb_vx_error error_out = nullptr;
347+
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
348+
if (error_out) {
349+
throw InvalidInputException(IntoErrString(error_out));
382350
}
383-
return out;
351+
return OperatorPartitionData(batch_index);
384352
}
385353

386354
extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
@@ -409,30 +377,21 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
409377

410378
tf.projection_pushdown = true;
411379
tf.filter_pushdown = true;
380+
// We can prune out filter columns that are unused in the remainder of the query plan.
381+
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
412382
tf.filter_prune = true;
413383
tf.sampling_pushdown = false;
384+
tf.late_materialization = false;
414385

415386
tf.pushdown_complex_filter = c_pushdown_complex_filter;
416387
tf.cardinality = c_cardinality;
417-
tf.get_partition_info = get_partition_info;
418-
tf.get_partition_data = get_partition_data;
388+
tf.get_partition_data = c_get_partition_data;
419389
tf.to_string = c_to_string;
420390
tf.table_scan_progress = c_table_scan_progress;
421391
tf.statistics = c_statistics;
422392

423-
tf.late_materialization = true;
424-
// Columns that uniquely identify a row for deferred re-fetch in a multi
425-
// file scan: (file index, row number in file).
426-
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
427-
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
428-
};
429-
430393
tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
431-
return {
432-
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
433-
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
434-
{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}},
435-
};
394+
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
436395
};
437396

438397
tf.arguments.resize(vtab->parameter_count);

vortex-duckdb/src/convert/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,4 @@ pub use dtype::from_duckdb_table;
1111
pub use expr::try_from_bound_expression;
1212
pub use scalar::*;
1313
pub use table_filter::try_from_table_filter;
14-
pub use table_filter::try_from_virtual_column_filter;
1514
pub use vector::data_chunk_to_vortex;
Lines changed: 0 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::ops::Range;
54
use std::sync::Arc;
65

76
use itertools::Itertools;
8-
use vortex::buffer::Buffer;
97
use vortex::dtype::DType;
108
use vortex::dtype::Nullability;
119
use vortex::error::VortexExpect;
1210
use vortex::error::VortexResult;
1311
use vortex::error::vortex_bail;
14-
use vortex::error::vortex_err;
1512
use vortex::expr::Expression;
1613
use vortex::expr::and_collect;
1714
use vortex::expr::get_item;
@@ -24,13 +21,10 @@ use vortex::scalar::Scalar;
2421
use vortex::scalar_fn::ScalarFnVTableExt;
2522
use vortex::scalar_fn::fns::binary::Binary;
2623
use vortex::scalar_fn::fns::operators::CompareOperator;
27-
use vortex::scan::selection::Selection;
2824

2925
use crate::cpp::DUCKDB_VX_EXPR_TYPE;
30-
use crate::duckdb::ExtractedValue;
3126
use crate::duckdb::TableFilterClass;
3227
use crate::duckdb::TableFilterRef;
33-
use crate::duckdb::ValueRef;
3428

3529
pub fn try_from_table_filter(
3630
value: &TableFilterRef,
@@ -131,96 +125,3 @@ pub fn try_from_table_filter(
131125
}
132126
}))
133127
}
134-
135-
fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult<u64> {
136-
match value.extract() {
137-
ExtractedValue::BigInt(i) => {
138-
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
139-
}
140-
ExtractedValue::Integer(i) => {
141-
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
142-
}
143-
ExtractedValue::UBigInt(u) => Ok(u),
144-
ExtractedValue::UInteger(u) => Ok(u64::from(u)),
145-
_ => vortex_bail!("unexpected value type"),
146-
}
147-
}
148-
149-
fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec<u64> {
150-
let mut result = Vec::new();
151-
let (mut i, mut j) = (0, 0);
152-
while i < left.len() && j < right.len() {
153-
match left[i].cmp(&right[j]) {
154-
std::cmp::Ordering::Equal => {
155-
result.push(left[i]);
156-
i += 1;
157-
j += 1;
158-
}
159-
std::cmp::Ordering::Less => i += 1,
160-
std::cmp::Ordering::Greater => j += 1,
161-
}
162-
}
163-
result
164-
}
165-
166-
/// For constant comparison on IN filters over file_index or file_row_number
167-
/// virtual column, create a selection and a range covering the same range as
168-
/// expressions do.
169-
pub fn try_from_virtual_column_filter(
170-
filter: &TableFilterRef,
171-
) -> VortexResult<(Selection, Option<Range<u64>>)> {
172-
match filter.as_class() {
173-
TableFilterClass::InFilter(values) => {
174-
let indices = values
175-
.iter()
176-
.map(nonnegative_number_from_value)
177-
.collect::<VortexResult<Vec<u64>>>()?;
178-
Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None))
179-
}
180-
TableFilterClass::ConstantComparison(const_) => {
181-
let n = nonnegative_number_from_value(const_.value)?;
182-
let range = match const_.operator {
183-
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1),
184-
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => {
185-
Some(n..u64::MAX)
186-
}
187-
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => {
188-
Some(n.saturating_add(1)..u64::MAX)
189-
}
190-
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => {
191-
Some(0..n.saturating_add(1))
192-
}
193-
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n),
194-
_ => None,
195-
};
196-
Ok((Selection::All, range))
197-
}
198-
TableFilterClass::ConjunctionAnd(conj) => {
199-
let mut start = 0u64;
200-
let mut end = u64::MAX;
201-
let mut indices: Option<Vec<u64>> = None;
202-
for child in conj.children() {
203-
let (sel, range) = try_from_virtual_column_filter(child)?;
204-
if let Selection::IncludeByIndex(buf) = sel {
205-
indices = Some(match indices {
206-
None => buf.iter().copied().collect(),
207-
Some(existing) => intersect_sorted(&existing, buf.as_ref()),
208-
});
209-
}
210-
if let Some(r) = range {
211-
start = start.max(r.start);
212-
end = end.min(r.end);
213-
}
214-
}
215-
let range = (start < end).then_some(start..end);
216-
let sel = indices
217-
.map(|v| Selection::IncludeByIndex(Buffer::from_iter(v)))
218-
.unwrap_or(Selection::All);
219-
Ok((sel, range))
220-
}
221-
TableFilterClass::Optional(child) => {
222-
try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None)))
223-
}
224-
_ => Ok((Selection::All, None)),
225-
}
226-
}

0 commit comments

Comments
 (0)