Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions docs/api/python/expr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ the following expression represents the set of rows for which the `age` column l

.. doctest::

>>> import vortex.expr
>>> age = vortex.expr.column("age")
>>> import vortex as vx
>>> age = vx.col("age")
>>> (23 > age) & (age < 55) # doctest: +SKIP

.. autosummary::
:nosignatures:

~vortex.expr.column
~vortex.expr.col
~vortex.expr.plan
~vortex.expr.Expr

.. raw:: html
Expand All @@ -23,10 +25,20 @@ the following expression represents the set of rows for which the `age` column l

.. autofunction:: vortex.expr.column

.. autofunction:: vortex.expr.col

.. autofunction:: vortex.expr.not_

.. autofunction:: vortex.expr.and_

.. autofunction:: vortex.expr.cast

.. autofunction:: vortex.expr.is_null

.. autofunction:: vortex.expr.is_not_null

.. autofunction:: vortex.expr.plan

.. autofunction:: vortex.expr.root

.. autofunction:: vortex.expr.literal
Expand Down Expand Up @@ -64,4 +76,3 @@ the following expression represents the set of rows for which the `age` column l
... .to_pylist()
... )
[{'x': 1, 'y': {'yy': 'a'}}]

2 changes: 1 addition & 1 deletion docs/getting-started/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Use :func:`~vortex.open` to open and read the Vortex array from disk:
.. doctest::

>>> import vortex as vx
>>> cvtx = vx.open("example.vortex").scan().read_all() # doctest: +SKIP
>>> table = vx.open("example.vortex").to_table() # doctest: +SKIP


Vortex is architected to achieve fast random access, in many cases hundreds of times faster
Expand Down
8 changes: 1 addition & 7 deletions docs/project/changelog/index.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
# Changelog

For older releases, see the full [release history on GitHub](https://github.com/vortex-data/vortex/releases).

```{toctree}
---
maxdepth: 1
---
```
See the full [release history on GitHub](https://github.com/vortex-data/vortex/releases).
33 changes: 29 additions & 4 deletions docs/user-guide/pyarrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,50 @@ Use {func}`~vortex.open` to lazily open a Vortex file:

### As an Arrow Table

{meth}`.VortexFile.to_arrow` returns a {class}`pyarrow.RecordBatchReader`. Call
{meth}`~pyarrow.RecordBatchReader.read_all` to collect into a {class}`pyarrow.Table`:
{meth}`.VortexFile.to_table` collects the scan into a {class}`pyarrow.Table`:

```{doctest} pycon
>>> table = f.to_arrow().read_all()
>>> table = f.to_table()
>>> table.num_rows
1000
```

{meth}`.VortexFile.to_arrow` returns a streaming {class}`pyarrow.RecordBatchReader`.

### Column Projection

Read only the columns you need:

```{doctest} pycon
>>> table = f.to_arrow(['tip_amount', 'fare_amount']).read_all()
>>> table = f.to_table(columns=['tip_amount', 'fare_amount'])
>>> table.column_names
['tip_amount', 'fare_amount']
```

### Filters

Vortex expressions are the stable pushdown API. PyVortex plans them against the file schema before
the scan runs:

```{doctest} pycon
>>> table = f.to_table(columns=['tip_amount'], filter=vx.col('tip_amount') > 10)
>>> table.num_rows > 0
True
```

PyArrow compute expressions are accepted as compatibility input. PyVortex converts them through
Comment thread
gatesn marked this conversation as resolved.
Substrait, then runs the same Vortex planner:

```{doctest} pycon
>>> import pyarrow.compute as pc
>>> table = f.to_table(columns=['tip_amount'], filter=pc.field('tip_amount') > 10)
>>> table.num_rows > 0
True
```

Use `filter_policy="pushdown"` to raise when a PyArrow expression cannot be pushed into Vortex. Use
`filter_policy="fallback"` to read the rows and apply the PyArrow filter after the scan.
Comment on lines +73 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is default?


### Streaming Record Batches

Iterate over record batches for streaming processing:
Expand Down
36 changes: 21 additions & 15 deletions docs/user-guide/vortex-python.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ Available types: {func}`~vortex.null`, {func}`~vortex.bool_`,

## Expressions

The `vortex.expr` module provides expressions for filtering and projecting. These
are primarily used with {meth}`.VortexFile.scan` and {meth}`.VortexFile.to_arrow` but can also be
applied directly:
The `vortex.expr` module provides expressions for filtering and projecting. Use `vx.col` or
`vortex.expr.col` to build the stable predicate DSL for pushdown:

```{doctest} pycon
>>> import vortex.expr as ve
Expand All @@ -128,11 +127,21 @@ applied directly:
... {'name': 'Bob', 'age': 25},
... {'name': 'Carol', 'age': 35},
... ])
>>> expr = ve.column('age') > 28
>>> expr = vx.col('age') > 28
>>> arr.apply(expr).to_arrow_array().to_pylist()
[True, False, True]
```

When a filter is used to read a file, PyVortex plans it against the file schema. Planning inserts
the casts required by the Vortex expression engine, simplifies the expression, and validates that
filters return Boolean values. You can run the same step directly:

```{doctest} pycon
>>> planned = ve.plan(vx.col('age') > 28, schema=arr.dtype.to_arrow_schema(), kind="filter")
>>> isinstance(planned, ve.Expr)
True
```

## VortexFile

{func}`~vortex.open` lazily opens a Vortex file for reading:
Expand All @@ -146,19 +155,16 @@ applied directly:
1000
```

Use {meth}`.VortexFile.scan` to read data with optional projection, filtering, and limit:
Use {meth}`.VortexFile.to_table` or {meth}`.VortexFile.to_arrow` to read Arrow data with optional
column projection, filtering, and limit:

```{doctest} pycon
>>> result = f.scan(['tip_amount'], limit=3).read_all()
>>> result.to_arrow_array()
<pyarrow.lib.StructArray object at ...>
-- is_valid: all not null
-- child 0 type: double
[
0,
5.1,
16.54
]
>>> table = f.to_table(columns=['tip_amount'], limit=3)
>>> table.to_pydict()
{'tip_amount': [0.0, 5.1, 16.54]}
>>> filtered = f.to_table(columns=['tip_amount'], filter=vx.col('tip_amount') > 10)
>>> filtered.num_rows > 0
True
```

## ArrayIterator
Expand Down
4 changes: 4 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12402,6 +12402,10 @@ pub fn vortex_array::expr::or_collect<I>(iter: I) -> core::option::Option<vortex

pub fn vortex_array::expr::pack(elements: impl core::iter::traits::collect::IntoIterator<Item = (impl core::convert::Into<vortex_array::dtype::FieldName>, vortex_array::expr::Expression)>, nullability: vortex_array::dtype::Nullability) -> vortex_array::expr::Expression

pub fn vortex_array::expr::plan_expression(expr: vortex_array::expr::Expression, scope: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::expr::Expression>

pub fn vortex_array::expr::plan_filter_expression(expr: vortex_array::expr::Expression, scope: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::expr::Expression>

pub fn vortex_array::expr::root() -> vortex_array::expr::Expression

pub fn vortex_array::expr::select(field_names: impl core::convert::Into<vortex_array::dtype::FieldNames>, child: vortex_array::expr::Expression) -> vortex_array::expr::Expression
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod exprs;
pub(crate) mod field;
pub mod forms;
mod optimize;
mod plan;
pub mod proto;
pub mod pruning;
pub mod stats;
Expand All @@ -42,6 +43,7 @@ pub mod traversal;
pub use analysis::*;
pub use expression::*;
pub use exprs::*;
pub use plan::*;
pub use pruning::StatsCatalog;

pub trait VortexExprExt {
Expand Down
172 changes: 172 additions & 0 deletions vortex-array/src/expr/plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// SPDX-License-Identifier: Apache-2.0
Comment thread
gatesn marked this conversation as resolved.
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Expression planning against an input scope.

use vortex_error::VortexResult;
use vortex_error::vortex_bail;

use crate::dtype::DType;
use crate::expr::Expression;
use crate::expr::transform::coerce_expression;

/// Plan an expression against an input [`DType`].
///
/// Planning typeifies the expression by inserting casts required by scalar functions, simplifies
/// the resulting tree, and verifies that the planned expression has a valid return type for the
/// provided scope.
pub fn plan_expression(expr: Expression, scope: &DType) -> VortexResult<Expression> {
let expr = coerce_expression(expr, scope)?;
let expr = expr.optimize_recursive(scope)?;
expr.return_dtype(scope)?;
Ok(expr)
}

/// Plan a filter expression against an input [`DType`].
///
/// This performs the same planning pass as [`plan_expression`] and then requires the expression to
/// return a Boolean value.
pub fn plan_filter_expression(expr: Expression, scope: &DType) -> VortexResult<Expression> {
let expr = plan_expression(expr, scope)?;
let dtype = expr.return_dtype(scope)?;
if !matches!(dtype, DType::Bool(_)) {
vortex_bail!("filter expression must return bool, got {}", dtype);
}
Ok(expr)
}

#[cfg(test)]
mod tests {
use vortex_error::VortexResult;

use crate::dtype::DType;
use crate::dtype::Nullability::NonNullable;
use crate::dtype::Nullability::Nullable;
use crate::dtype::PType;
use crate::dtype::StructFields;
use crate::expr::col;
use crate::expr::lit;
use crate::expr::plan_expression;
use crate::expr::plan_filter_expression;
use crate::scalar::Scalar;
use crate::scalar_fn::ScalarFnVTableExt;
use crate::scalar_fn::fns::binary::Binary;
use crate::scalar_fn::fns::cast::Cast;
use crate::scalar_fn::fns::operators::Operator;

fn scope() -> DType {
DType::Struct(
StructFields::new(
["i32", "i64", "u8", "flag"].into(),
vec![
DType::Primitive(PType::I32, NonNullable),
DType::Primitive(PType::I64, NonNullable),
DType::Primitive(PType::U8, NonNullable),
DType::Bool(NonNullable),
],
),
NonNullable,
)
}

#[test]
fn mixed_numeric_comparison_inserts_cast() -> VortexResult<()> {
let scope = scope();
let expr = Binary.new_expr(Operator::Lt, [col("i32"), col("i64")]);

let planned = plan_filter_expression(expr, &scope)?;

assert!(planned.child(0).is::<Cast>());
assert_eq!(
planned.child(0).return_dtype(&scope)?,
DType::Primitive(PType::I64, NonNullable)
);
assert!(!planned.child(1).is::<Cast>());
Ok(())
}

#[test]
fn mixed_numeric_arithmetic_inserts_casts() -> VortexResult<()> {
let scope = scope();
let expr = Binary.new_expr(Operator::Add, [col("u8"), col("i32")]);

let planned = plan_expression(expr, &scope)?;

assert!(planned.child(0).is::<Cast>());
assert_eq!(
planned.return_dtype(&scope)?,
DType::Primitive(PType::I64, NonNullable)
);
Ok(())
}

#[test]
fn literal_values_are_coerced_against_column_types() -> VortexResult<()> {
let scope = scope();
let expr = Binary.new_expr(Operator::Eq, [col("i32"), lit(1i64)]);

let planned = plan_filter_expression(expr, &scope)?;

assert!(!planned.child(0).is::<Cast>());
assert!(planned.child(1).is::<Cast>());
assert_eq!(
planned.child(1).return_dtype(&scope)?,
DType::Primitive(PType::I32, NonNullable)
);
Ok(())
}

#[test]
fn null_literals_are_typed_from_context() -> VortexResult<()> {
let scope = scope();
let expr = Binary.new_expr(Operator::Eq, [col("i32"), lit(Scalar::null(DType::Null))]);

let planned = plan_filter_expression(expr, &scope)?;

assert!(planned.child(1).is::<Cast>());
assert_eq!(
planned.child(1).return_dtype(&scope)?,
DType::Primitive(PType::I32, Nullable)
);
Ok(())
}

#[test]
fn boolean_and_preserves_boolean_inputs() -> VortexResult<()> {
let scope = scope();
let expr = Binary.new_expr(Operator::And, [col("flag"), col("flag")]);

let planned = plan_filter_expression(expr, &scope)?;

assert_eq!(planned.return_dtype(&scope)?, DType::Bool(NonNullable));
assert!(!planned.child(0).is::<Cast>());
assert!(!planned.child(1).is::<Cast>());
Ok(())
}

#[test]
fn filter_planning_rejects_non_boolean_outputs() {
let scope = scope();
let expr = Binary.new_expr(Operator::Add, [col("i32"), lit(1i32)]);

let err = plan_filter_expression(expr, &scope).unwrap_err();

assert!(
err.to_string()
.contains("filter expression must return bool")
);
}

#[test]
fn logical_operators_reject_non_boolean_inputs() {
let scope = scope();
let expr = Binary.new_expr(Operator::And, [col("i32"), col("i64")]);

let err = plan_filter_expression(expr, &scope).unwrap_err();

assert!(
err.to_string()
.contains("logical operation requires boolean operands")
);
}
}
Loading
Loading