Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,19 @@ void c_function(ClientContext &context, TableFunctionInput &input, DataChunk &ou
}
}

/*
* Table filter pushdown is used twice in duckdb:
*
* 1. Planning time: duckdb uses file metadata (filename, hive_partitioning
* options in MultiFileReader) to prune files based on filename or hive
* partition data i.e. month, year, etc. This happens before any file IO.
* We don't use this because we have own file-level pruning in
* FileStatsLayoutReader.
*
* 2. Scan time. As we have filter_pushdown = true, filter expressions are
* converted to TableFilterSet and pushed down to Vortex. We convert them to
* vortex expressions and use as filter options while initializing the scan.
*/
void c_pushdown_complex_filter(ClientContext &,
LogicalGet &,
FunctionData *bind_data,
Expand All @@ -300,8 +313,6 @@ void c_pushdown_complex_filter(ClientContext &,
if (error_out) {
throw BinderException(IntoErrString(error_out));
}

// If the pushdown complex filter returns true, we can remove the filter from the list.
iter = pushed ? filters.erase(iter) : std::next(iter);
}
}
Expand Down Expand Up @@ -440,6 +451,10 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
};
};

tf.pushdown_expression = [](auto &, auto &, auto &) {
return true;
};

tf.arguments.resize(vtab->parameter_count);
for (size_t i = 0; i < vtab->parameter_count; i++) {
tf.arguments[i] = *reinterpret_cast<LogicalType *>(vtab->parameters[i]);
Expand Down
192 changes: 129 additions & 63 deletions vortex-duckdb/src/convert/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use vortex::error::VortexError;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::error::vortex_ensure;
use vortex::error::vortex_err;
use vortex::expr::Expression;
use vortex::expr::and_collect;
use vortex::expr::col;
use vortex::expr::get_item;
use vortex::expr::is_not_null;
use vortex::expr::is_null;
use vortex::expr::list_contains;
Expand All @@ -32,48 +34,116 @@ use vortex::scalar_fn::fns::operators::Operator;

use crate::cpp::DUCKDB_VX_EXPR_TYPE;
use crate::duckdb;
use crate::duckdb::BoundFunction;
use crate::duckdb::BoundOperator;

const DUCKDB_FUNCTION_NAME_CONTAINS: &str = "contains";

fn like_pattern_str(value: &duckdb::ExpressionRef) -> VortexResult<Option<String>> {
fn from_bound_str(value: &duckdb::ExpressionRef) -> VortexResult<String> {
match value.as_class().vortex_expect("unknown class") {
duckdb::ExpressionClass::BoundConstant(constant) => {
Ok(Some(format!("%{}%", constant.value.as_string().as_str())))
Ok(constant.value.as_string().as_str().to_owned())
}
_ => Ok(None),
_ => vortex_bail!("Expected string expression, got {:?}", value.as_class_id()),
}
}

fn try_from_bound_function(
func: &BoundFunction,
col_sub: Option<&Expression>,
) -> VortexResult<Option<Expression>> {
let expr = match func.scalar_function.name() {
"struct_extract" => {
let children: Vec<_> = func.children().collect();
vortex_ensure!(children.len() == 2);
let Some(child) = try_from_expression_inner(children[0], col_sub)? else {
return Ok(None);
};
let field = from_bound_str(children[1])?;
get_item(field, child)
}
"contains" => {
let children: Vec<_> = func.children().collect();
vortex_ensure!(children.len() == 2);
let Some(value) = try_from_expression_inner(children[0], col_sub)? else {
return Ok(None);
};
let pattern = from_bound_str(children[1])?;
let pattern = lit(format!("%{pattern}%"));
Like.new_expr(LikeOptions::default(), [value, pattern])
}
like @ ("~~" | "!~~") => {
let children: Vec<_> = func.children().collect();
vortex_ensure!(children.len() == 2);
let Some(string) = try_from_expression_inner(children[0], col_sub)? else {
return Ok(None);
};
let Some(target) = try_from_expression_inner(children[1], col_sub)? else {
return Ok(None);
};
let opts = LikeOptions {
negated: like == "!~~",
case_insensitive: false,
};
Like.new_expr(opts, [string, target])
}
_ => {
debug!("bound function {}", func.scalar_function.name());
return Ok(None);
}
};

Ok(Some(expr))
}

pub fn try_from_bound_expression(
value: &duckdb::ExpressionRef,
) -> VortexResult<Option<Expression>> {
try_from_expression_inner(value, None)
}

pub(super) fn try_from_bound_expression_with_col_sub(
value: &duckdb::ExpressionRef,
col_sub: &Expression,
) -> VortexResult<Option<Expression>> {
try_from_expression_inner(value, Some(col_sub))
}

fn try_from_expression_inner(
value: &duckdb::ExpressionRef,
col_sub: Option<&Expression>,
) -> VortexResult<Option<Expression>> {
let Some(value) = value.as_class() else {
debug!("no expression class id {:?}", value.as_class_id());
return Ok(None);
};
Ok(Some(match value {
duckdb::ExpressionClass::BoundRef => {
let Some(col) = col_sub else {
vortex_bail!("BoundRef requested but no column supplied");
};
col.clone()
}
duckdb::ExpressionClass::BoundColumnRef(col_ref) => col(col_ref.name.as_ref()),
duckdb::ExpressionClass::BoundConstant(const_) => lit(Scalar::try_from(const_.value)?),
duckdb::ExpressionClass::BoundComparison(compare) => {
let operator: Operator = compare.op.try_into()?;

let Some(left) = try_from_bound_expression(compare.left)? else {
let Some(left) = try_from_expression_inner(compare.left, col_sub)? else {
return Ok(None);
};
let Some(right) = try_from_bound_expression(compare.right)? else {
let Some(right) = try_from_expression_inner(compare.right, col_sub)? else {
return Ok(None);
};

Binary.new_expr(operator, [left, right])
}
duckdb::ExpressionClass::BoundBetween(between) => {
let Some(array) = try_from_bound_expression(between.input)? else {
let Some(array) = try_from_expression_inner(between.input, col_sub)? else {
return Ok(None);
};
let Some(lower) = try_from_bound_expression(between.lower)? else {
let Some(lower) = try_from_expression_inner(between.lower, col_sub)? else {
return Ok(None);
};
let Some(upper) = try_from_bound_expression(between.upper)? else {
let Some(upper) = try_from_expression_inner(between.upper, col_sub)? else {
return Ok(None);
};
Between.new_expr(
Expand All @@ -98,7 +168,7 @@ pub fn try_from_bound_expression(
| DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_OPERATOR_IS_NOT_NULL => {
let children: Vec<_> = operator.children().collect();
assert_eq!(children.len(), 1);
let Some(child) = try_from_bound_expression(children[0])? else {
let Some(child) = try_from_expression_inner(children[0], col_sub)? else {
return Ok(None);
};
match operator.op {
Expand All @@ -111,67 +181,23 @@ pub fn try_from_bound_expression(
}
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_IN => {
// First child is element, rest form the list.
let children: Vec<_> = operator.children().collect();
assert!(children.len() >= 2);
let Some(element) = try_from_bound_expression(children[0])? else {
return Ok(None);
};

let Some(list_elements) = children
.iter()
.skip(1)
.map(|c| {
let Some(value) = try_from_bound_expression(c)? else {
return Ok(None);
};
Ok(Some(
value
.as_opt::<Literal>()
.ok_or_else(|| {
vortex_err!("cannot have a non literal in a in_list")
})?
.clone(),
))
})
.collect::<VortexResult<Option<Vec<_>>>>()?
else {
return Ok(None);
};
let list = Scalar::list(
Arc::new(list_elements[0].dtype().clone()),
list_elements,
Nullability::Nullable,
);
list_contains(lit(list), element)
}
_ => {
debug!(op=?operator.op, "cannot be pushed down");
return Ok(None);
return try_from_compare_in(operator, col_sub, false);
}
},
duckdb::ExpressionClass::BoundFunction(func) => match func.scalar_function.name() {
DUCKDB_FUNCTION_NAME_CONTAINS => {
let children: Vec<_> = func.children().collect();
assert_eq!(children.len(), 2);
let Some(value) = try_from_bound_expression(children[0])? else {
return Ok(None);
};
let Some(pattern_lit) = like_pattern_str(children[1])? else {
vortex_bail!("expected pattern to be bound string")
};
let pattern = lit(pattern_lit);
Like.new_expr(LikeOptions::default(), [value, pattern])
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_NOT_IN => {
return try_from_compare_in(operator, col_sub, true);
}
_ => {
debug!("bound function {}", func.scalar_function.name());
debug!(op=?operator.op, "cannot be pushed down");
return Ok(None);
}
},
duckdb::ExpressionClass::BoundFunction(func) => {
return try_from_bound_function(&func, col_sub);
}
duckdb::ExpressionClass::BoundConjunction(conj) => {
let Some(children) = conj
.children()
.map(try_from_bound_expression)
.map(|c| try_from_expression_inner(c, col_sub))
.collect::<VortexResult<Option<Vec<_>>>>()?
else {
return Ok(None);
Expand All @@ -189,6 +215,46 @@ pub fn try_from_bound_expression(
}))
}

fn try_from_compare_in(
operator: BoundOperator,
col_sub: Option<&Expression>,
not_in: bool,
) -> VortexResult<Option<Expression>> {
// First child is element, rest form the list.
let children: Vec<_> = operator.children().collect();
assert!(children.len() >= 2);
let Some(element) = try_from_expression_inner(children[0], col_sub)? else {
return Ok(None);
};

let Some(list_elements) = children
.iter()
.skip(1)
.map(|c| {
let Some(value) = try_from_expression_inner(c, col_sub)? else {
return Ok(None);
};
Ok(Some(
value
.as_opt::<Literal>()
.ok_or_else(|| vortex_err!("cannot have a non literal in a in_list"))?
.clone(),
))
})
.collect::<VortexResult<Option<Vec<_>>>>()?
else {
return Ok(None);
};
let list = Scalar::list(
Arc::new(list_elements[0].dtype().clone()),
list_elements,
Nullability::Nullable,
);

let expr = list_contains(lit(list), element);
Ok(Some(if not_in { not(expr) } else { expr }))
}

impl TryFrom<DUCKDB_VX_EXPR_TYPE> for Operator {
type Error = VortexError;

Expand Down
6 changes: 4 additions & 2 deletions vortex-duckdb/src/convert/table_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ pub fn try_from_table_filter(
)
}
TableFilterClass::ExpressionRef(expr) => {
// TODO(ngates): figure out which column ID DuckDB is using for the expression.
vortex_bail!("expression table filter is not supported: {}", expr);
match super::expr::try_from_bound_expression_with_col_sub(expr, col)? {
Some(expression) => expression,
None => return Ok(None),
}
}
TableFilterClass::Bloom => {
vortex_bail!("bloom filter table filter is not supported")
Expand Down
5 changes: 5 additions & 0 deletions vortex-duckdb/src/duckdb/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ impl ExpressionRef {
bind_info: out.bind_info,
})
}
cpp::DUCKDB_VX_EXPR_CLASS::DUCKDB_VX_EXPR_CLASS_BOUND_REF => {
ExpressionClass::BoundRef
}
_ => {
return None;
}
Expand All @@ -155,6 +158,8 @@ pub enum ExpressionClass<'a> {
BoundBetween(BoundBetween<'a>),
BoundOperator(BoundOperator<'a>),
BoundFunction(BoundFunction<'a>),
/// Column inside ExpressionFilter for expression pushed down to Vortex.
BoundRef,
}

pub struct BoundColumnRef {
Expand Down
Loading