diff --git a/docs/api/python/expr.rst b/docs/api/python/expr.rst index 490d789b7dd..51fcc2f7495 100644 --- a/docs/api/python/expr.rst +++ b/docs/api/python/expr.rst @@ -7,14 +7,16 @@ the following expression represents the set of rows for which the `age` column l .. doctest:: - >>> import vortex.expr - >>> age = vortex.expr.column("age") + >>> import vortex as vx + >>> age = vx.col("age") >>> (23 > age) & (age < 55) # doctest: +SKIP .. autosummary:: :nosignatures: ~vortex.expr.column + ~vortex.expr.col + ~vortex.expr.plan ~vortex.expr.Expr .. raw:: html @@ -23,10 +25,20 @@ the following expression represents the set of rows for which the `age` column l .. autofunction:: vortex.expr.column +.. autofunction:: vortex.expr.col + .. autofunction:: vortex.expr.not_ .. autofunction:: vortex.expr.and_ +.. autofunction:: vortex.expr.cast + +.. autofunction:: vortex.expr.is_null + +.. autofunction:: vortex.expr.is_not_null + +.. autofunction:: vortex.expr.plan + .. autofunction:: vortex.expr.root .. autofunction:: vortex.expr.literal @@ -64,4 +76,3 @@ the following expression represents the set of rows for which the `age` column l ... .to_pylist() ... ) [{'x': 1, 'y': {'yy': 'a'}}] - diff --git a/docs/getting-started/python.rst b/docs/getting-started/python.rst index 357607cfb22..d642190eb7a 100644 --- a/docs/getting-started/python.rst +++ b/docs/getting-started/python.rst @@ -56,7 +56,7 @@ Use :func:`~vortex.open` to open and read the Vortex array from disk: .. doctest:: >>> import vortex as vx - >>> cvtx = vx.open("example.vortex").scan().read_all() # doctest: +SKIP + >>> table = vx.open("example.vortex").to_table() # doctest: +SKIP Vortex is architected to achieve fast random access, in many cases hundreds of times faster diff --git a/docs/project/changelog/index.md b/docs/project/changelog/index.md index ff63243e512..14f80a21172 100644 --- a/docs/project/changelog/index.md +++ b/docs/project/changelog/index.md @@ -1,9 +1,3 @@ # Changelog -For older releases, see the full [release history on GitHub](https://github.com/vortex-data/vortex/releases). - -```{toctree} ---- -maxdepth: 1 ---- -``` +See the full [release history on GitHub](https://github.com/vortex-data/vortex/releases). diff --git a/docs/user-guide/pyarrow.md b/docs/user-guide/pyarrow.md index 7907848fa1d..e85fc4cf69c 100644 --- a/docs/user-guide/pyarrow.md +++ b/docs/user-guide/pyarrow.md @@ -29,25 +29,50 @@ Use {func}`~vortex.open` to lazily open a Vortex file: ### As an Arrow Table -{meth}`.VortexFile.to_arrow` returns a {class}`pyarrow.RecordBatchReader`. Call -{meth}`~pyarrow.RecordBatchReader.read_all` to collect into a {class}`pyarrow.Table`: +{meth}`.VortexFile.to_table` collects the scan into a {class}`pyarrow.Table`: ```{doctest} pycon ->>> table = f.to_arrow().read_all() +>>> table = f.to_table() >>> table.num_rows 1000 ``` +{meth}`.VortexFile.to_arrow` returns a streaming {class}`pyarrow.RecordBatchReader`. + ### Column Projection Read only the columns you need: ```{doctest} pycon ->>> table = f.to_arrow(['tip_amount', 'fare_amount']).read_all() +>>> table = f.to_table(columns=['tip_amount', 'fare_amount']) >>> table.column_names ['tip_amount', 'fare_amount'] ``` +### Filters + +Vortex expressions are the stable pushdown API. PyVortex plans them against the file schema before +the scan runs: + +```{doctest} pycon +>>> table = f.to_table(columns=['tip_amount'], filter=vx.col('tip_amount') > 10) +>>> table.num_rows > 0 +True +``` + +PyArrow compute expressions are accepted as compatibility input. PyVortex converts them through +Substrait, then runs the same Vortex planner: + +```{doctest} pycon +>>> import pyarrow.compute as pc +>>> table = f.to_table(columns=['tip_amount'], filter=pc.field('tip_amount') > 10) +>>> table.num_rows > 0 +True +``` + +Use `filter_policy="pushdown"` to raise when a PyArrow expression cannot be pushed into Vortex. Use +`filter_policy="fallback"` to read the rows and apply the PyArrow filter after the scan. + ### Streaming Record Batches Iterate over record batches for streaming processing: diff --git a/docs/user-guide/vortex-python.md b/docs/user-guide/vortex-python.md index 0977ff54844..7267e48bdbe 100644 --- a/docs/user-guide/vortex-python.md +++ b/docs/user-guide/vortex-python.md @@ -117,9 +117,8 @@ Available types: {func}`~vortex.null`, {func}`~vortex.bool_`, ## Expressions -The `vortex.expr` module provides expressions for filtering and projecting. These -are primarily used with {meth}`.VortexFile.scan` and {meth}`.VortexFile.to_arrow` but can also be -applied directly: +The `vortex.expr` module provides expressions for filtering and projecting. Use `vx.col` or +`vortex.expr.col` to build the stable predicate DSL for pushdown: ```{doctest} pycon >>> import vortex.expr as ve @@ -128,11 +127,21 @@ applied directly: ... {'name': 'Bob', 'age': 25}, ... {'name': 'Carol', 'age': 35}, ... ]) ->>> expr = ve.column('age') > 28 +>>> expr = vx.col('age') > 28 >>> arr.apply(expr).to_arrow_array().to_pylist() [True, False, True] ``` +When a filter is used to read a file, PyVortex plans it against the file schema. Planning inserts +the casts required by the Vortex expression engine, simplifies the expression, and validates that +filters return Boolean values. You can run the same step directly: + +```{doctest} pycon +>>> planned = ve.plan(vx.col('age') > 28, schema=arr.dtype.to_arrow_schema(), kind="filter") +>>> isinstance(planned, ve.Expr) +True +``` + ## VortexFile {func}`~vortex.open` lazily opens a Vortex file for reading: @@ -146,19 +155,16 @@ applied directly: 1000 ``` -Use {meth}`.VortexFile.scan` to read data with optional projection, filtering, and limit: +Use {meth}`.VortexFile.to_table` or {meth}`.VortexFile.to_arrow` to read Arrow data with optional +column projection, filtering, and limit: ```{doctest} pycon ->>> result = f.scan(['tip_amount'], limit=3).read_all() ->>> result.to_arrow_array() - --- is_valid: all not null --- child 0 type: double - [ - 0, - 5.1, - 16.54 - ] +>>> table = f.to_table(columns=['tip_amount'], limit=3) +>>> table.to_pydict() +{'tip_amount': [0.0, 5.1, 16.54]} +>>> filtered = f.to_table(columns=['tip_amount'], filter=vx.col('tip_amount') > 10) +>>> filtered.num_rows > 0 +True ``` ## ArrayIterator diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b388e7e3a21..445cdaf138f 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -12402,6 +12402,10 @@ pub fn vortex_array::expr::or_collect(iter: I) -> core::option::Option, vortex_array::expr::Expression)>, nullability: vortex_array::dtype::Nullability) -> vortex_array::expr::Expression +pub fn vortex_array::expr::plan_expression(expr: vortex_array::expr::Expression, scope: &vortex_array::dtype::DType) -> vortex_error::VortexResult + +pub fn vortex_array::expr::plan_filter_expression(expr: vortex_array::expr::Expression, scope: &vortex_array::dtype::DType) -> vortex_error::VortexResult + pub fn vortex_array::expr::root() -> vortex_array::expr::Expression pub fn vortex_array::expr::select(field_names: impl core::convert::Into, child: vortex_array::expr::Expression) -> vortex_array::expr::Expression diff --git a/vortex-array/src/expr/mod.rs b/vortex-array/src/expr/mod.rs index 9d973a01cae..922477790e9 100644 --- a/vortex-array/src/expr/mod.rs +++ b/vortex-array/src/expr/mod.rs @@ -33,6 +33,7 @@ mod exprs; pub(crate) mod field; pub mod forms; mod optimize; +mod plan; pub mod proto; pub mod pruning; pub mod stats; @@ -42,6 +43,7 @@ pub mod traversal; pub use analysis::*; pub use expression::*; pub use exprs::*; +pub use plan::*; pub use pruning::StatsCatalog; pub trait VortexExprExt { diff --git a/vortex-array/src/expr/plan.rs b/vortex-array/src/expr/plan.rs new file mode 100644 index 00000000000..463b545445a --- /dev/null +++ b/vortex-array/src/expr/plan.rs @@ -0,0 +1,172 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Expression planning against an input scope. + +use vortex_error::VortexResult; +use vortex_error::vortex_bail; + +use crate::dtype::DType; +use crate::expr::Expression; +use crate::expr::transform::coerce_expression; + +/// Plan an expression against an input [`DType`]. +/// +/// Planning typeifies the expression by inserting casts required by scalar functions, simplifies +/// the resulting tree, and verifies that the planned expression has a valid return type for the +/// provided scope. +pub fn plan_expression(expr: Expression, scope: &DType) -> VortexResult { + let expr = coerce_expression(expr, scope)?; + let expr = expr.optimize_recursive(scope)?; + expr.return_dtype(scope)?; + Ok(expr) +} + +/// Plan a filter expression against an input [`DType`]. +/// +/// This performs the same planning pass as [`plan_expression`] and then requires the expression to +/// return a Boolean value. +pub fn plan_filter_expression(expr: Expression, scope: &DType) -> VortexResult { + let expr = plan_expression(expr, scope)?; + let dtype = expr.return_dtype(scope)?; + if !matches!(dtype, DType::Bool(_)) { + vortex_bail!("filter expression must return bool, got {}", dtype); + } + Ok(expr) +} + +#[cfg(test)] +mod tests { + use vortex_error::VortexResult; + + use crate::dtype::DType; + use crate::dtype::Nullability::NonNullable; + use crate::dtype::Nullability::Nullable; + use crate::dtype::PType; + use crate::dtype::StructFields; + use crate::expr::col; + use crate::expr::lit; + use crate::expr::plan_expression; + use crate::expr::plan_filter_expression; + use crate::scalar::Scalar; + use crate::scalar_fn::ScalarFnVTableExt; + use crate::scalar_fn::fns::binary::Binary; + use crate::scalar_fn::fns::cast::Cast; + use crate::scalar_fn::fns::operators::Operator; + + fn scope() -> DType { + DType::Struct( + StructFields::new( + ["i32", "i64", "u8", "flag"].into(), + vec![ + DType::Primitive(PType::I32, NonNullable), + DType::Primitive(PType::I64, NonNullable), + DType::Primitive(PType::U8, NonNullable), + DType::Bool(NonNullable), + ], + ), + NonNullable, + ) + } + + #[test] + fn mixed_numeric_comparison_inserts_cast() -> VortexResult<()> { + let scope = scope(); + let expr = Binary.new_expr(Operator::Lt, [col("i32"), col("i64")]); + + let planned = plan_filter_expression(expr, &scope)?; + + assert!(planned.child(0).is::()); + assert_eq!( + planned.child(0).return_dtype(&scope)?, + DType::Primitive(PType::I64, NonNullable) + ); + assert!(!planned.child(1).is::()); + Ok(()) + } + + #[test] + fn mixed_numeric_arithmetic_inserts_casts() -> VortexResult<()> { + let scope = scope(); + let expr = Binary.new_expr(Operator::Add, [col("u8"), col("i32")]); + + let planned = plan_expression(expr, &scope)?; + + assert!(planned.child(0).is::()); + assert_eq!( + planned.return_dtype(&scope)?, + DType::Primitive(PType::I64, NonNullable) + ); + Ok(()) + } + + #[test] + fn literal_values_are_coerced_against_column_types() -> VortexResult<()> { + let scope = scope(); + let expr = Binary.new_expr(Operator::Eq, [col("i32"), lit(1i64)]); + + let planned = plan_filter_expression(expr, &scope)?; + + assert!(!planned.child(0).is::()); + assert!(planned.child(1).is::()); + assert_eq!( + planned.child(1).return_dtype(&scope)?, + DType::Primitive(PType::I32, NonNullable) + ); + Ok(()) + } + + #[test] + fn null_literals_are_typed_from_context() -> VortexResult<()> { + let scope = scope(); + let expr = Binary.new_expr(Operator::Eq, [col("i32"), lit(Scalar::null(DType::Null))]); + + let planned = plan_filter_expression(expr, &scope)?; + + assert!(planned.child(1).is::()); + assert_eq!( + planned.child(1).return_dtype(&scope)?, + DType::Primitive(PType::I32, Nullable) + ); + Ok(()) + } + + #[test] + fn boolean_and_preserves_boolean_inputs() -> VortexResult<()> { + let scope = scope(); + let expr = Binary.new_expr(Operator::And, [col("flag"), col("flag")]); + + let planned = plan_filter_expression(expr, &scope)?; + + assert_eq!(planned.return_dtype(&scope)?, DType::Bool(NonNullable)); + assert!(!planned.child(0).is::()); + assert!(!planned.child(1).is::()); + Ok(()) + } + + #[test] + fn filter_planning_rejects_non_boolean_outputs() { + let scope = scope(); + let expr = Binary.new_expr(Operator::Add, [col("i32"), lit(1i32)]); + + let err = plan_filter_expression(expr, &scope).unwrap_err(); + + assert!( + err.to_string() + .contains("filter expression must return bool") + ); + } + + #[test] + fn logical_operators_reject_non_boolean_inputs() { + let scope = scope(); + let expr = Binary.new_expr(Operator::And, [col("i32"), col("i64")]); + + let err = plan_filter_expression(expr, &scope).unwrap_err(); + + assert!( + err.to_string() + .contains("logical operation requires boolean operands") + ); + } +} diff --git a/vortex-array/src/expr/transform/coerce.rs b/vortex-array/src/expr/transform/coerce.rs index 1b6e9acd661..41e13d7aec5 100644 --- a/vortex-array/src/expr/transform/coerce.rs +++ b/vortex-array/src/expr/transform/coerce.rs @@ -10,6 +10,7 @@ use crate::expr::Expression; use crate::expr::cast; use crate::expr::traversal::NodeExt; use crate::expr::traversal::Transformed; +use crate::scalar_fn::fns::binary::Binary; use crate::scalar_fn::fns::literal::Literal; use crate::scalar_fn::fns::root::Root; @@ -35,7 +36,17 @@ pub fn coerce_expression(expr: Expression, scope: &DType) -> VortexResult>()?; // Ask the scalar function what types it wants. - let coerced_dtypes = node.scalar_fn().coerce_args(&child_dtypes)?; + let coerced_dtypes = if node + .as_opt::() + .is_some_and(|operator| operator.is_comparison()) + { + match comparison_literal_coercions(&node, &child_dtypes) { + Some(coerced_dtypes) => coerced_dtypes, + None => node.scalar_fn().coerce_args(&child_dtypes)?, + } + } else { + node.scalar_fn().coerce_args(&child_dtypes)? + }; // If nothing changed, skip. if child_dtypes == coerced_dtypes { @@ -65,6 +76,44 @@ pub fn coerce_expression(expr: Expression, scope: &DType) -> VortexResult Option> { + let lhs_literal = node.child(0).is::(); + let rhs_literal = node.child(1).is::(); + if lhs_literal == rhs_literal { + return None; + } + + let literal_idx = usize::from(rhs_literal); + let context_idx = usize::from(lhs_literal); + let literal_dtype = &child_dtypes[literal_idx]; + let context_dtype = &child_dtypes[context_idx]; + + if !can_coerce_comparison_literal(context_dtype, literal_dtype) { + if context_dtype.is_extension() || literal_dtype.is_extension() { + return Some(child_dtypes.to_vec()); + } + return None; + } + + let mut coerced_dtypes = child_dtypes.to_vec(); + coerced_dtypes[literal_idx] = + context_dtype.with_nullability(context_dtype.nullability() | literal_dtype.nullability()); + Some(coerced_dtypes) +} + +fn can_coerce_comparison_literal(context_dtype: &DType, literal_dtype: &DType) -> bool { + if matches!(literal_dtype, DType::Null) { + return true; + } + + if context_dtype.is_extension() || literal_dtype.is_extension() { + return context_dtype.eq_ignore_nullability(literal_dtype); + } + + context_dtype.can_coerce_from(literal_dtype) + || literal_dtype.is_numeric() && context_dtype.is_numeric() +} + #[cfg(test)] mod tests { use vortex_error::VortexResult; @@ -76,6 +125,9 @@ mod tests { use crate::expr::col; use crate::expr::lit; use crate::expr::transform::coerce::coerce_expression; + use crate::extension::datetime::TimeUnit; + use crate::extension::datetime::Timestamp; + use crate::extension::datetime::TimestampOptions; use crate::scalar::Scalar; use crate::scalar_fn::ScalarFnVTableExt; use crate::scalar_fn::fns::binary::Binary; @@ -191,4 +243,47 @@ mod tests { ); Ok(()) } + + #[test] + fn comparison_literal_coerces_to_column_type() -> VortexResult<()> { + let scope = DType::Struct( + StructFields::new( + ["x"].into(), + vec![DType::Primitive(PType::I32, NonNullable)], + ), + NonNullable, + ); + let expr = Binary.new_expr(Operator::Eq, [col("x"), lit(1i64)]); + let coerced = coerce_expression(expr, &scope)?; + + assert!(!coerced.child(0).is::()); + assert!(coerced.child(1).is::()); + assert_eq!( + coerced.child(1).return_dtype(&scope)?, + DType::Primitive(PType::I32, NonNullable) + ); + Ok(()) + } + + #[test] + fn comparison_literal_does_not_coerce_extension_units() -> VortexResult<()> { + let scope = DType::Extension(Timestamp::new(TimeUnit::Milliseconds, NonNullable).erased()); + let expr = Binary.new_expr( + Operator::Gt, + [ + crate::expr::root(), + lit(Scalar::extension::( + TimestampOptions { + unit: TimeUnit::Seconds, + tz: None, + }, + Scalar::from(1704153600i64), + )), + ], + ); + let coerced = coerce_expression(expr, &scope)?; + + assert!(!coerced.child(1).is::()); + Ok(()) + } } diff --git a/vortex-array/src/expr/transform/partition.rs b/vortex-array/src/expr/transform/partition.rs index ef852a6cd7f..147c2a01cf1 100644 --- a/vortex-array/src/expr/transform/partition.rs +++ b/vortex-array/src/expr/transform/partition.rs @@ -212,7 +212,7 @@ mod tests { use crate::dtype::PType::I32; use crate::dtype::StructFields; use crate::expr::analysis::make_free_field_annotator; - use crate::expr::and; + use crate::expr::checked_add; use crate::expr::col; use crate::expr::get_item; use crate::expr::lit; @@ -299,7 +299,7 @@ mod tests { fn test_expr_top_level_ref_get_item_add(dtype: DType) { let fields = dtype.as_struct_fields_opt().unwrap(); - let expr = and(get_item("y", get_item("a", root())), lit(1)); + let expr = checked_add(get_item("y", get_item("a", root())), lit(1)); let partitioned = partition(expr, &dtype, make_free_field_annotator(fields)).unwrap(); // Whole expr is a single split @@ -310,7 +310,7 @@ mod tests { fn test_expr_top_level_ref_get_item_add_cannot_split(dtype: DType) { let fields = dtype.as_struct_fields_opt().unwrap(); - let expr = and(get_item("y", get_item("a", root())), get_item("b", root())); + let expr = checked_add(get_item("y", get_item("a", root())), get_item("b", root())); let partitioned = partition(expr, &dtype, make_free_field_annotator(fields)).unwrap(); // One for id.a and id.b diff --git a/vortex-array/src/scalar_fn/fns/binary/mod.rs b/vortex-array/src/scalar_fn/fns/binary/mod.rs index af81ef0ca28..7237298e778 100644 --- a/vortex-array/src/scalar_fn/fns/binary/mod.rs +++ b/vortex-array/src/scalar_fn/fns/binary/mod.rs @@ -135,6 +135,16 @@ impl ScalarFnVTable for Binary { vortex_bail!("Cannot compare different DTypes {} and {}", lhs, rhs); } + if matches!(operator, Operator::And | Operator::Or) + && (!lhs.is_boolean() || !rhs.is_boolean()) + { + vortex_bail!( + "logical operation requires boolean operands, got {} and {}", + lhs, + rhs + ); + } + Ok(DType::Bool((lhs.is_nullable() || rhs.is_nullable()).into())) } diff --git a/vortex-layout/src/scan/scan_builder.rs b/vortex-layout/src/scan/scan_builder.rs index e1b652631ec..9acc10ef598 100644 --- a/vortex-layout/src/scan/scan_builder.rs +++ b/vortex-layout/src/scan/scan_builder.rs @@ -21,6 +21,8 @@ use vortex_array::dtype::FieldName; use vortex_array::dtype::FieldPath; use vortex_array::expr::Expression; use vortex_array::expr::analysis::immediate_access::immediate_scope_access; +use vortex_array::expr::plan_expression; +use vortex_array::expr::plan_filter_expression; use vortex_array::expr::root; use vortex_array::iter::ArrayIterator; use vortex_array::iter::ArrayIteratorAdapter; @@ -205,7 +207,8 @@ impl ScanBuilder { /// The [`DType`] returned by the scan, after applying the projection. pub fn dtype(&self) -> VortexResult { - self.projection.return_dtype(self.layout_reader.dtype()) + plan_expression(self.projection.clone(), self.layout_reader.dtype())? + .return_dtype(self.layout_reader.dtype()) } /// The session used by the scan. @@ -238,8 +241,6 @@ impl ScanBuilder { } pub fn prepare(self) -> VortexResult> { - let dtype = self.dtype()?; - if self.filter.is_some() && self.limit.is_some() { vortex_bail!("Vortex doesn't support scans with both a filter and a limit") } @@ -258,13 +259,15 @@ impl ScanBuilder { )); // Normalize and simplify the expressions. - let projection = self.projection.optimize_recursive(layout_reader.dtype())?; + let projection = plan_expression(self.projection, layout_reader.dtype())?; let filter = self .filter - .map(|f| f.optimize_recursive(layout_reader.dtype())) + .map(|f| plan_filter_expression(f, layout_reader.dtype())) .transpose()?; + let dtype = projection.return_dtype(layout_reader.dtype())?; + // Construct field masks and compute the row splits of the scan. let (filter_mask, projection_mask) = filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?; diff --git a/vortex-python/clickbench.py b/vortex-python/clickbench.py index 57fd1e04db9..20c4e270ab3 100755 --- a/vortex-python/clickbench.py +++ b/vortex-python/clickbench.py @@ -632,7 +632,7 @@ def _iter(): arr = vx.Array.from_arrow(arr) yield arr - it = vx.ArrayIterator.from_iter(vx.DType.from_arrow(pf.schema_arrow, non_nullable=True), _iter()) + it = vx.ArrayIterator.from_iter(vx.DType.from_arrow_schema(pf.schema_arrow), _iter()) vx.io.write(it, vx_path) lf = vx.open(vx_path).to_polars() diff --git a/vortex-python/python/vortex/__init__.py b/vortex-python/python/vortex/__init__.py index 5396f50b7a0..17e6e34c9f5 100644 --- a/vortex-python/python/vortex/__init__.py +++ b/vortex-python/python/vortex/__init__.py @@ -81,6 +81,7 @@ _unpickle_array, # pyright: ignore[reportPrivateUsage] array, ) +from .expr import col from .file import VortexFile, open from .scan import RepeatedScan @@ -109,6 +110,7 @@ # --- Objects and Functions --- "array", "compress", + "col", # Arrays "Array", "PyArray", diff --git a/vortex-python/python/vortex/_lib/dtype.pyi b/vortex-python/python/vortex/_lib/dtype.pyi index bbbfb9fcf6d..803a39347cf 100644 --- a/vortex-python/python/vortex/_lib/dtype.pyi +++ b/vortex-python/python/vortex/_lib/dtype.pyi @@ -9,7 +9,9 @@ class DType: def to_arrow_schema(self) -> pa.Schema: ... def to_arrow_type(self) -> pa.DataType: ... @classmethod - def from_arrow(cls, arrow_dtype: pa.DataType | pa.Schema, *, non_nullable: bool = False) -> DType: ... + def from_arrow(cls, arrow_dtype: pa.DataType, *, non_nullable: bool = False) -> DType: ... + @classmethod + def from_arrow_schema(cls, arrow_schema: pa.Schema) -> DType: ... @final class NullDType(DType): ... diff --git a/vortex-python/python/vortex/_lib/expr.pyi b/vortex-python/python/vortex/_lib/expr.pyi index c69307266de..c3207701c8f 100644 --- a/vortex-python/python/vortex/_lib/expr.pyi +++ b/vortex-python/python/vortex/_lib/expr.pyi @@ -36,3 +36,4 @@ def and_(left: Expr, right: Expr) -> Expr: ... def cast(child: Expr, dtype: DType) -> Expr: ... def is_null(child: Expr) -> Expr: ... def is_not_null(child: Expr) -> Expr: ... +def plan(expr: Expr, scope: DType, *, kind: str = "expr") -> Expr: ... diff --git a/vortex-python/python/vortex/_lib/file.pyi b/vortex-python/python/vortex/_lib/file.pyi index e2eab367134..49a35b1a651 100644 --- a/vortex-python/python/vortex/_lib/file.pyi +++ b/vortex-python/python/vortex/_lib/file.pyi @@ -45,6 +45,7 @@ class VortexFile: *, expr: Expr | None = None, limit: int | None = None, + indices: Array | None = None, batch_size: int | None = None, ) -> pa.RecordBatchReader: ... def to_dataset(self) -> VortexDataset: ... diff --git a/vortex-python/python/vortex/arrow/expression.py b/vortex-python/python/vortex/arrow/expression.py index 92c7e87638f..4de685da8cf 100644 --- a/vortex-python/python/vortex/arrow/expression.py +++ b/vortex-python/python/vortex/arrow/expression.py @@ -11,21 +11,24 @@ from vortex._lib.expr import Expr # pyright: ignore[reportMissingModuleSource] +from ..expr import plan as plan_expression from ..substrait import extended_expression @overload -def ensure_vortex_expression(expression: None, *, schema: pa.Schema) -> None: ... +def ensure_vortex_expression(expression: None, *, schema: pa.Schema, kind: str = "filter") -> None: ... @overload -def ensure_vortex_expression(expression: pc.Expression | Expr, *, schema: pa.Schema) -> Expr: ... +def ensure_vortex_expression(expression: pc.Expression | Expr, *, schema: pa.Schema, kind: str = "filter") -> Expr: ... -def ensure_vortex_expression(expression: pc.Expression | Expr | None, *, schema: pa.Schema) -> Expr | None: +def ensure_vortex_expression( + expression: pc.Expression | Expr | None, *, schema: pa.Schema, kind: str = "filter" +) -> Expr | None: if expression is None: return None if isinstance(expression, pc.Expression): - return arrow_to_vortex(expression, schema) - return expression + expression = arrow_to_vortex(expression, schema) + return plan_expression(expression, schema=schema, kind=kind) def _schema_for_substrait(schema: pa.Schema) -> pa.Schema: @@ -51,6 +54,6 @@ def arrow_to_vortex(arrow_expression: pc.Expression, schema: pa.Schema) -> Expr: expressions = extended_expression(substrait_object) - if len(expressions) < 0 or len(expressions) > 1: + if len(expressions) != 1: raise ValueError("arrow_to_vortex: extended expression must have exactly one child") return expressions[0] diff --git a/vortex-python/python/vortex/expr.py b/vortex-python/python/vortex/expr.py index a14643a8463..9b31eeecbc0 100644 --- a/vortex-python/python/vortex/expr.py +++ b/vortex-python/python/vortex/expr.py @@ -1,7 +1,65 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors +from __future__ import annotations -from ._lib.expr import Expr, and_, cast, column, literal, not_, root # pyright: ignore[reportMissingModuleSource] +from typing import Protocol +from typing import cast as typing_cast -__all__ = ["Expr", "column", "literal", "root", "not_", "and_", "cast"] +import pyarrow as pa + +from ._lib import expr as _expr # pyright: ignore[reportMissingModuleSource] +from ._lib.dtype import DType # pyright: ignore[reportMissingModuleSource] + + +class _HasDType(Protocol): + @property + def dtype(self) -> DType: ... + + +Expr = _expr.Expr +and_ = _expr.and_ +cast = _expr.cast +column = _expr.column +col = column +is_null = _expr.is_null +is_not_null = _expr.is_not_null +literal = _expr.literal +not_ = _expr.not_ +root = _expr.root + + +def plan( + expr: Expr, + *, + schema: pa.Schema | None = None, + file: object | None = None, + kind: str = "expr", +) -> Expr: + """Plan an expression against an Arrow schema or Vortex file.""" + if schema is not None and file is not None: + raise ValueError("exactly one of schema or file must be provided") + if schema is None and file is None: + raise ValueError("exactly one of schema or file must be provided") + + if schema is not None: + scope = DType.from_arrow_schema(schema) + else: + assert file is not None + scope = typing_cast(_HasDType, file).dtype + return _expr.plan(expr, scope, kind=kind) + + +__all__ = [ + "Expr", + "column", + "col", + "literal", + "root", + "not_", + "and_", + "cast", + "is_null", + "is_not_null", + "plan", +] diff --git a/vortex-python/python/vortex/file.py b/vortex-python/python/vortex/file.py index f3e5249e749..b1522b13226 100644 --- a/vortex-python/python/vortex/file.py +++ b/vortex-python/python/vortex/file.py @@ -4,15 +4,18 @@ from __future__ import annotations from collections.abc import Iterator -from typing import TYPE_CHECKING, final +from typing import TYPE_CHECKING, cast, final import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.dataset as ds from ._lib import file as _file # pyright: ignore[reportMissingModuleSource] from ._lib.arrays import Array # pyright: ignore[reportMissingModuleSource] from ._lib.dtype import DType # pyright: ignore[reportMissingModuleSource] from ._lib.expr import Expr # pyright: ignore[reportMissingModuleSource] from ._lib.iter import ArrayIterator # pyright: ignore[reportMissingModuleSource] +from .arrow.expression import ensure_vortex_expression from .dataset import VortexDataset from .scan import RepeatedScan from .store import ( @@ -75,6 +78,11 @@ def dtype(self) -> DType: """The dtype of the file.""" return self._file.dtype + @property + def schema(self) -> pa.Schema: + """The Arrow schema of the file.""" + return self.dtype.to_arrow_schema() + def splits(self) -> list[tuple[int, int]]: return self._file.splits() @@ -197,26 +205,148 @@ def to_repeated_scan( def to_arrow( self, - projection: IntoProjection = None, + columns: IntoProjection = None, *, + projection: IntoProjection = None, + filter: pc.Expression | Expr | None = None, limit: int | None = None, expr: Expr | None = None, + indices: Array | None = None, batch_size: int | None = None, + filter_policy: str = "pushdown", ) -> RecordBatchReader: """Scan the Vortex file as a :class:`pyarrow.RecordBatchReader`. Parameters ---------- - projection : :class:`vortex.Expr` | list[str] | None + columns : :class:`vortex.Expr` | list[str] | None Either an expression over the columns of the file (only referenced columns will be read from the file) or an explicit list of desired columns. - expr : :class:`vortex.Expr` | None + filter : :class:`vortex.Expr` | ``pyarrow.compute.Expression`` | None The predicate used to filter rows. The filter columns need not appear in the projection. + limit : :class:`int` | None + The maximum number of rows to read after filtering. If None, read all rows. + indices : :class:`vortex.Array` | None + The indices of the rows to read. Must be sorted and non-null. batch_size : :class:`int` | None The number of rows to read per chunk. + filter_policy : :class:`str` + ``"pushdown"`` raises if a PyArrow filter cannot be pushed into Vortex. ``"fallback"`` + reads the requested rows and applies the PyArrow filter with Arrow after the scan. """ - return self._file.to_arrow(projection, expr=expr, limit=limit, batch_size=batch_size) + columns = self._resolve_columns(columns, projection) + filter = self._resolve_filter(filter, expr) + self._check_filter_policy(filter_policy) + + planned_filter: Expr | None = None + if filter is not None: + if isinstance(filter, pc.Expression): + try: + planned_filter = ensure_vortex_expression(filter, schema=self.schema) + except Exception: + if filter_policy == "fallback": + return self._to_arrow_with_arrow_filter_fallback( + columns, + filter, + limit=limit, + indices=indices, + batch_size=batch_size, + ) + raise + else: + planned_filter = ensure_vortex_expression(filter, schema=self.schema) + + return self._file.to_arrow(columns, expr=planned_filter, limit=limit, indices=indices, batch_size=batch_size) + + def to_table( + self, + columns: IntoProjection = None, + *, + projection: IntoProjection = None, + filter: pc.Expression | Expr | None = None, + limit: int | None = None, + expr: Expr | None = None, + indices: Array | None = None, + batch_size: int | None = None, + filter_policy: str = "pushdown", + ) -> pa.Table: + """Scan the Vortex file as a :class:`pyarrow.Table`.""" + return self.to_arrow( + columns, + projection=projection, + filter=filter, + limit=limit, + expr=expr, + indices=indices, + batch_size=batch_size, + filter_policy=filter_policy, + ).read_all() + + @staticmethod + def _resolve_columns(columns: IntoProjection, projection: IntoProjection) -> IntoProjection: + if projection is not None: + if columns is not None: + raise ValueError("use either columns or projection, not both") + return projection + return columns + + @staticmethod + def _resolve_filter( + filter: pc.Expression | Expr | None, + expr: Expr | None, + ) -> pc.Expression | Expr | None: + if expr is not None: + if filter is not None: + raise ValueError("use either filter or expr, not both") + return expr + return filter + + @staticmethod + def _check_filter_policy(filter_policy: str) -> None: + if filter_policy not in {"pushdown", "fallback"}: + raise ValueError("filter_policy must be 'pushdown' or 'fallback'") + + def _to_arrow_with_arrow_filter_fallback( + self, + columns: IntoProjection, + filter: pc.Expression, + *, + limit: int | None, + indices: Array | None, + batch_size: int | None, + ) -> RecordBatchReader: + if columns is not None and not isinstance(columns, list): + raise ValueError("filter_policy='fallback' only supports list[str] column selections") + + table = self._file.to_arrow(None, expr=None, limit=None, indices=indices, batch_size=batch_size).read_all() + table = self._arrow_filter_compatible_table(table) + table = ds.dataset(table).to_table(filter=filter) # pyright: ignore[reportUnknownMemberType] + if limit is not None: + table = table.slice(0, limit) + if columns is not None: + table = table.select(columns) + + batches = table.to_batches(max_chunksize=batch_size) if batch_size is not None else table.to_batches() + return pa.RecordBatchReader.from_batches(table.schema, batches) + + @staticmethod + def _arrow_filter_compatible_table(table: pa.Table) -> pa.Table: + fields: list[pa.Field[pa.DataType]] = [] + changed = False + for schema_field in table.schema: # pyright: ignore[reportUnknownVariableType] + field = cast("pa.Field[pa.DataType]", schema_field) + if field.type == pa.string_view(): + fields.append(field.with_type(pa.string())) + changed = True + elif field.type == pa.binary_view(): + fields.append(field.with_type(pa.binary())) + changed = True + else: + fields.append(field) + if not changed: + return table + return table.cast(pa.schema(fields)) def to_dataset(self) -> VortexDataset: """Scan the Vortex file using the :class:`pyarrow.dataset.Dataset` API.""" @@ -239,7 +369,7 @@ def _io_source( ) -> Iterator[pl.DataFrame]: vx_predicate: Expr | None = None if predicate is None else polars_to_vortex(predicate) - reader = self.to_arrow(projection=with_columns, expr=vx_predicate, limit=n_rows) + reader = self.to_arrow(columns=with_columns, filter=vx_predicate, limit=n_rows) for batch in reader: batch = pl.DataFrame._from_arrow(batch, rechunk=False) # pyright: ignore[reportPrivateUsage] diff --git a/vortex-python/src/dtype/mod.rs b/vortex-python/src/dtype/mod.rs index 1f739e7ab94..c7f1c56bba5 100644 --- a/vortex-python/src/dtype/mod.rs +++ b/vortex-python/src/dtype/mod.rs @@ -18,6 +18,7 @@ use std::ops::Deref; use arrow_schema::DataType; use arrow_schema::Field; +use arrow_schema::Schema; pub(crate) use ptype::*; use pyo3::Bound; use pyo3::Py; @@ -196,8 +197,21 @@ impl PyDType { DType::from_arrow(&Field::new("_", arrow_dtype, !non_nullable)), ) } + + /// Construct a Vortex data type from an Arrow schema. + #[classmethod] + fn from_arrow_schema<'py>( + cls: &'py Bound<'py, PyType>, + #[pyo3(from_py_with = import_arrow_schema)] arrow_schema: Schema, + ) -> PyResult> { + Self::init(cls.py(), DType::from_arrow(&arrow_schema)) + } } fn import_arrow_dtype(obj: &Bound) -> PyResult { DataType::from_pyarrow(&obj.as_borrowed()) } + +fn import_arrow_schema(obj: &Bound) -> PyResult { + Schema::from_pyarrow(&obj.as_borrowed()) +} diff --git a/vortex-python/src/expr/mod.rs b/vortex-python/src/expr/mod.rs index 17b342d7db5..0861ea94e75 100644 --- a/vortex-python/src/expr/mod.rs +++ b/vortex-python/src/expr/mod.rs @@ -20,6 +20,7 @@ use vortex::scalar_fn::fns::get_item::GetItem; use vortex::scalar_fn::fns::operators::Operator; use crate::dtype::PyDType; +use crate::error::PyVortexResult; use crate::install_module; use crate::scalar::factory::scalar_helper; @@ -36,6 +37,7 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { m.add_function(wrap_pyfunction!(cast, &m)?)?; m.add_function(wrap_pyfunction!(is_null, &m)?)?; m.add_function(wrap_pyfunction!(is_not_null, &m)?)?; + m.add_function(wrap_pyfunction!(plan, &m)?)?; m.add_class::()?; Ok(()) @@ -406,6 +408,24 @@ pub fn cast(child: PyExpr, dtype: PyDType) -> PyResult { }) } +/// Plan an expression against a Vortex dtype. +#[pyfunction] +#[pyo3(signature = (expr, scope, *, kind = "expr"))] +pub fn plan(expr: PyExpr, scope: &Bound, kind: &str) -> PyVortexResult { + let scope = scope.borrow().inner().clone(); + let planned = match kind { + "expr" => expr::plan_expression(expr.into_inner(), &scope)?, + "filter" => expr::plan_filter_expression(expr.into_inner(), &scope)?, + other => { + return Err(PyValueError::new_err(format!( + "kind must be 'expr' or 'filter', got {other:?}" + )) + .into()); + } + }; + Ok(PyExpr { inner: planned }) +} + /// Checks which elements of its child are null. /// /// Parameters @@ -416,7 +436,6 @@ pub fn cast(child: PyExpr, dtype: PyDType) -> PyResult { /// Returns /// ------- /// :class:`vortex.Expr` -/// ``` #[pyfunction] pub fn is_null(child: PyExpr) -> PyResult { Ok(PyExpr { diff --git a/vortex-python/src/file.rs b/vortex-python/src/file.rs index 458736db0e5..d1f4da43bcb 100644 --- a/vortex-python/src/file.rs +++ b/vortex-python/src/file.rs @@ -147,30 +147,24 @@ impl PyVortexFile { }) } - #[pyo3(signature = (projection = None, *, expr = None, limit = None, batch_size = None))] + #[pyo3(signature = (projection = None, *, expr = None, limit = None, indices = None, batch_size = None))] fn to_arrow( slf: Bound, projection: Option, expr: Option, limit: Option, + indices: Option, batch_size: Option, ) -> PyVortexResult> { - let vxf = slf.get().vxf.clone(); + let builder = slf.get().scan_builder( + projection.map(|p| p.0), + expr.map(|e| e.into_inner()), + limit, + indices.map(|i| i.into_inner()), + batch_size, + )?; let reader = slf.py().detach(|| { - let mut builder = vxf - .scan()? - .with_some_filter(expr.map(|e| e.into_inner())) - .with_projection(projection.map(|p| p.0).unwrap_or_else(root)); - - if let Some(limit) = limit { - builder = builder.with_limit(limit); - } - - if let Some(batch_size) = batch_size { - builder = builder.with_split_by(SplitBy::RowCount(batch_size)); - } - let schema = Arc::new(builder.dtype()?.to_arrow_schema()?); builder.into_record_batch_reader(schema, &*RUNTIME) })?; diff --git a/vortex-python/test/test_expression.py b/vortex-python/test/test_expression.py index 3d15d1daaa8..a31d8069980 100644 --- a/vortex-python/test/test_expression.py +++ b/vortex-python/test/test_expression.py @@ -152,3 +152,28 @@ def test_substrait_typed_null_literal(): expected = ve.literal(vx.int_(64, nullable=True), None) assert str(actual) == str(expected) + + +def test_plan_with_arrow_schema_returns_expr(): + schema = pa.schema([("x", pa.int32())]) + + planned = ve.plan(ve.column("x") > 10, schema=schema, kind="filter") + + assert isinstance(planned, ve.Expr) + + +def test_plan_with_arrow_schema_rejects_non_boolean_filter(): + schema = pa.schema([("x", pa.int32())]) + + with pytest.raises(RuntimeError, match="filter expression must return bool"): + _ = ve.plan(ve.column("x") + 1, schema=schema, kind="filter") + + +def test_plan_requires_exactly_one_scope(): + expr = ve.column("x") > 10 + + with pytest.raises(ValueError, match="exactly one"): + _ = ve.plan(expr) + + with pytest.raises(ValueError, match="exactly one"): + _ = ve.plan(expr, schema=pa.schema([("x", pa.int32())]), file=object()) diff --git a/vortex-python/test/test_file.py b/vortex-python/test/test_file.py index 2b706e3dd18..a9c8ef50519 100644 --- a/vortex-python/test/test_file.py +++ b/vortex-python/test/test_file.py @@ -3,8 +3,10 @@ import math import os +from pathlib import Path import pyarrow as pa +import pyarrow.compute as pc import pytest import vortex as vx @@ -36,6 +38,12 @@ def test_dtype(vxf: VortexFile): ) +def test_schema(vxf: VortexFile): + assert vxf.schema == pa.schema( + [("bool", pa.bool_()), ("float", pa.float64()), ("index", pa.int64()), ("string", pa.string_view())] + ) + + def test_row_count(vxf: VortexFile): assert len(vxf) == 1_000_000 @@ -62,6 +70,35 @@ def test_to_arrow_columns(vxf: VortexFile): assert rbr.schema == pa.schema([("string", pa.string_view()), ("bool", pa.bool_())]) +def test_to_table_with_vortex_filter_plans(tmp_path: Path): + path = tmp_path / "filter.vortex" + vx.io.write( + pa.table({"x": pa.array([1, 2, 3], type=pa.int32()), "name": ["a", "b", "c"]}), + str(path), + ) + + actual = vx.open(str(path)).to_table(filter=vx.col("x") > 1) + + assert actual.to_pylist() == [{"x": 2, "name": "b"}, {"x": 3, "name": "c"}] + + +def test_to_table_with_pyarrow_filter_fallback(tmp_path: Path): + path = tmp_path / "filter.vortex" + vx.io.write( + pa.table({"x": pa.array([1, 2, 3], type=pa.int32()), "name": ["a", "b", "c"]}), + str(path), + ) + vxf = vx.open(str(path)) + filter_expr = pc.field("x").isin([1, 3]) # pyright: ignore[reportUnknownMemberType] + + with pytest.raises(Exception): + _ = vxf.to_table(filter=filter_expr) + + actual = vxf.to_table(columns=["name"], filter=filter_expr, filter_policy="fallback") + + assert actual.to_pylist() == [{"name": "a"}, {"name": "c"}] + + def test_empty_file(tmpdir_factory): # pyright: ignore[reportUnknownParameterType, reportMissingParameterType] # test for writing empty files with null columns # create an empty table with schema `empty: null`