Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ typedef struct {
bool has_null;
} duckdb_column_statistics;

typedef struct {
idx_t batch_index;
size_t file_index_column_pos;
size_t file_row_number_column_pos;
bool has_file_index_column_pos;
bool has_file_row_number_column_pos;
} duckdb_vx_partition_data;

// vtable mimicking subset of TableFunction.
// See duckdb/include/function/tfunc.hpp
typedef struct {
Expand Down Expand Up @@ -123,10 +131,10 @@ typedef struct {

double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);

idx_t (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_error *error_out);
void (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_partition_data *partition_data_out);
} duckdb_vx_tfunc_vtab_t;

// A single function for configuring the DuckDB table function vtable.
Expand Down
41 changes: 29 additions & 12 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ DUCKDB_INCLUDES_BEGIN
#include "duckdb.h"
#include "duckdb/catalog/catalog.hpp"
#include "duckdb/common/insertion_order_preserving_map.hpp"
#include "duckdb/common/multi_file/multi_file_reader.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/connection.hpp"
Expand All @@ -19,6 +20,8 @@ DUCKDB_INCLUDES_END
using namespace duckdb;
using vortex::CData;
using vortex::IntoErrString;
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;

struct CTableFunctionInfo final : TableFunctionInfo {
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
Expand Down Expand Up @@ -335,20 +338,26 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
}

OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
if (input.partition_info.RequiresPartitionColumns()) {
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
}
auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();

duckdb_vx_error error_out = nullptr;
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
duckdb_vx_partition_data partition_data;
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);

OperatorPartitionData out(partition_data.batch_index);
for (idx_t col : input.partition_info.partition_columns) {
if (partition_data.has_file_index_column_pos && col == partition_data.file_index_column_pos) {
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index_column_pos));
} else if (partition_data.has_file_row_number_column_pos &&
col == partition_data.file_row_number_column_pos) {
out.partition_data.emplace_back(Value::BIGINT(partition_data.file_row_number_column_pos));
} else {
throw InternalException("get_partition_data - did not find constant for the given partition");
}
}
return OperatorPartitionData(batch_index);
return out;
}

extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
Expand Down Expand Up @@ -377,11 +386,9 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d

tf.projection_pushdown = true;
tf.filter_pushdown = true;
// We can prune out filter columns that are unused in the remainder of the query plan.
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
tf.filter_prune = true;
tf.late_materialization = true;
tf.sampling_pushdown = false;
tf.late_materialization = false;

tf.pushdown_complex_filter = c_pushdown_complex_filter;
tf.cardinality = c_cardinality;
Expand All @@ -390,8 +397,18 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
tf.table_scan_progress = c_table_scan_progress;
tf.statistics = c_statistics;

// What columns uniquely identify a row for deferred re-fetch in a
// multi-file scan: (file index, row index in file).
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
};

tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
return {
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}},
};
};

tf.arguments.resize(vtab->parameter_count);
Expand Down
1 change: 1 addition & 0 deletions vortex-duckdb/src/convert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub use dtype::from_duckdb_table;
pub use expr::try_from_bound_expression;
pub use scalar::*;
pub use table_filter::try_from_table_filter;
pub use table_filter::try_from_virtual_column_filter;
pub use vector::data_chunk_to_vortex;
99 changes: 99 additions & 0 deletions vortex-duckdb/src/convert/table_filter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::ops::Range;
use std::sync::Arc;

use itertools::Itertools;
use vortex::buffer::Buffer;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::error::vortex_err;
use vortex::expr::Expression;
use vortex::expr::and_collect;
use vortex::expr::get_item;
Expand All @@ -21,10 +24,13 @@ use vortex::scalar::Scalar;
use vortex::scalar_fn::ScalarFnVTableExt;
use vortex::scalar_fn::fns::binary::Binary;
use vortex::scalar_fn::fns::operators::CompareOperator;
use vortex::scan::selection::Selection;

use crate::cpp::DUCKDB_VX_EXPR_TYPE;
use crate::duckdb::ExtractedValue;
use crate::duckdb::TableFilterClass;
use crate::duckdb::TableFilterRef;
use crate::duckdb::ValueRef;

pub fn try_from_table_filter(
value: &TableFilterRef,
Expand Down Expand Up @@ -125,3 +131,96 @@ pub fn try_from_table_filter(
}
}))
}

fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult<u64> {
match value.extract() {
ExtractedValue::BigInt(i) => {
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
}
ExtractedValue::Integer(i) => {
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
}
ExtractedValue::UBigInt(u) => Ok(u),
ExtractedValue::UInteger(u) => Ok(u64::from(u)),
_ => vortex_bail!("unexpected value type"),
}
}

fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec<u64> {
let mut result = Vec::new();
let (mut i, mut j) = (0, 0);
while i < left.len() && j < right.len() {
match left[i].cmp(&right[j]) {
std::cmp::Ordering::Equal => {
result.push(left[i]);
i += 1;
j += 1;
}
std::cmp::Ordering::Less => i += 1,
std::cmp::Ordering::Greater => j += 1,
}
}
result
}

/// For constant comparison on IN filters over file_index or file_row_number
/// virtual column, create a selection and a range covering the same range as
/// expressions do.
pub fn try_from_virtual_column_filter(
filter: &TableFilterRef,
) -> VortexResult<(Selection, Option<Range<u64>>)> {
match filter.as_class() {
TableFilterClass::InFilter(values) => {
let indices = values
.iter()
.map(nonnegative_number_from_value)
.collect::<VortexResult<Vec<u64>>>()?;
Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None))
}
TableFilterClass::ConstantComparison(const_) => {
let n = nonnegative_number_from_value(const_.value)?;
let range = match const_.operator {
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1),
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => {
Some(n..u64::MAX)
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => {
Some(n.saturating_add(1)..u64::MAX)
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => {
Some(0..n.saturating_add(1))
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n),
_ => None,
};
Ok((Selection::All, range))
}
TableFilterClass::ConjunctionAnd(conj) => {
let mut start = 0u64;
let mut end = u64::MAX;
let mut indices: Option<Vec<u64>> = None;
for child in conj.children() {
let (sel, range) = try_from_virtual_column_filter(child)?;
if let Selection::IncludeByIndex(buf) = sel {
indices = Some(match indices {
None => buf.iter().copied().collect(),
Some(existing) => intersect_sorted(&existing, buf.as_ref()),
});
}
if let Some(r) = range {
start = start.max(r.start);
end = end.min(r.end);
}
}
let range = (start < end).then_some(start..end);
let sel = indices
.map(|v| Selection::IncludeByIndex(Buffer::from_iter(v)))
.unwrap_or(Selection::All);
Ok((sel, range))
}
TableFilterClass::Optional(child) => {
try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None)))
}
_ => Ok((Selection::All, None)),
}
}
Loading
Loading