Skip to content

Commit 1838a7a

Browse files
authored
zoned cleanup (#7634)
## Summary Cleans up `ZonedLayout`. This is just a cosmetic change as I was trying to understand it. Also adds some tests that I thought would be helpful. ## Testing N/A Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 7c2df2c commit 1838a7a

9 files changed

Lines changed: 598 additions & 480 deletions

File tree

docs/concepts/scanning.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ independently. The scan tracks the selectivity of each conjunct using a probabil
6868
and dynamically reorders them so that the most selective predicates are evaluated first. This
6969
means that as a scan progresses, it learns the most efficient evaluation order for the filter.
7070

71-
Filters are evaluated in two stages. First, pruning evaluation uses statistics stored in
72-
`ZonedLayout` (such as per-zone min/max values) to eliminate entire regions without reading any
73-
data. Second, filter evaluation materializes only the filter-referenced columns and computes a
74-
row mask of matching rows.
71+
Filters are evaluated in two stages. First, pruning evaluation uses statistics stored in a
72+
`ZonedLayout` auxiliary `zones` child to eliminate entire row zones without reading the underlying
73+
data child. These pruning predicates are falsification checks derived from the original filter, for
74+
example by comparing a zone's min/max values against the requested predicate. Second, filter
75+
evaluation materializes only the filter-referenced columns and computes a row mask of matching
76+
rows.
7577

7678
## Projection Pushdown
7779

@@ -89,4 +91,3 @@ Query engines integrate with the Scan API by translating their internal plan rep
8991
scan requests and consuming the resulting array stream in their preferred format. Integrations
9092
exist for DuckDB, DataFusion, Spark, and Trino, with each engine converting its native filter
9193
and projection representations into Vortex [expressions](expressions.md).
92-

vortex-layout/src/layouts/file_stats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use vortex_error::VortexResult;
2121
use vortex_error::vortex_panic;
2222
use vortex_session::VortexSession;
2323

24-
use crate::layouts::zoned::zone_map::StatsAccumulator;
24+
use crate::layouts::zoned::StatsAccumulator;
2525
use crate::sequence::SendableSequentialStream;
2626
use crate::sequence::SequenceId;
2727
use crate::sequence::SequentialStreamAdapter;

vortex-layout/src/layouts/zoned/builder.rs

Lines changed: 229 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,137 @@
1+
//! Write-time accumulation and builders for zoned layout stats tables.
2+
13
// SPDX-License-Identifier: Apache-2.0
24
// SPDX-FileCopyrightText: Copyright the Vortex contributors
35

46
use std::marker::PhantomData;
57

8+
use itertools::Itertools;
69
use vortex_array::ArrayRef;
10+
use vortex_array::ExecutionCtx;
711
use vortex_array::IntoArray;
812
use vortex_array::LEGACY_SESSION;
913
use vortex_array::VortexSessionExecute;
1014
use vortex_array::arrays::ConstantArray;
15+
use vortex_array::arrays::StructArray;
1116
use vortex_array::builders::ArrayBuilder;
1217
use vortex_array::builders::BoolBuilder;
1318
use vortex_array::builders::builder_with_capacity;
1419
use vortex_array::dtype::DType;
1520
use vortex_array::dtype::FieldName;
1621
use vortex_array::dtype::Nullability;
22+
use vortex_array::expr::stats::Precision;
1723
use vortex_array::expr::stats::Stat;
24+
use vortex_array::expr::stats::StatsProvider;
1825
use vortex_array::scalar::Scalar;
1926
use vortex_array::scalar::ScalarTruncation;
2027
use vortex_array::scalar::lower_bound;
2128
use vortex_array::scalar::upper_bound;
29+
use vortex_array::validity::Validity;
2230
use vortex_buffer::BufferString;
2331
use vortex_buffer::ByteBuffer;
32+
use vortex_error::VortexExpect;
2433
use vortex_error::VortexResult;
2534

26-
pub const MAX_IS_TRUNCATED: &str = "max_is_truncated";
27-
pub const MIN_IS_TRUNCATED: &str = "min_is_truncated";
35+
use crate::layouts::zoned::schema::MAX_IS_TRUNCATED;
36+
use crate::layouts::zoned::schema::MIN_IS_TRUNCATED;
37+
use crate::layouts::zoned::zone_map::ZoneMap;
38+
39+
/// Accumulates write-time statistics for each logical zone.
40+
pub struct StatsAccumulator {
41+
builders: Vec<Box<dyn StatsArrayBuilder>>,
42+
length: usize,
43+
}
44+
45+
impl StatsAccumulator {
46+
pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
47+
let builders = stats
48+
.iter()
49+
.filter_map(|&stat| {
50+
stat.dtype(dtype).map(|stat_dtype| {
51+
stats_builder_with_capacity(
52+
stat,
53+
&stat_dtype.as_nullable(),
54+
1024,
55+
max_variable_length_statistics_size,
56+
)
57+
})
58+
})
59+
.collect::<Vec<_>>();
60+
61+
Self {
62+
builders,
63+
length: 0,
64+
}
65+
}
66+
67+
pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
68+
for builder in &mut self.builders {
69+
if let Some(Precision::Exact(value)) = array.statistics().get(builder.stat()) {
70+
builder.append_scalar(value.cast(&value.dtype().as_nullable())?)?;
71+
} else {
72+
builder.append_null();
73+
}
74+
}
75+
self.length += 1;
76+
Ok(())
77+
}
2878

29-
pub fn stats_builder_with_capacity(
79+
pub fn push_chunk(&mut self, array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<()> {
80+
for builder in &mut self.builders {
81+
if let Some(value) = array.statistics().compute_stat(builder.stat(), ctx)? {
82+
builder.append_scalar(value.cast(&value.dtype().as_nullable())?)?;
83+
} else {
84+
builder.append_null();
85+
}
86+
}
87+
self.length += 1;
88+
Ok(())
89+
}
90+
91+
/// Finishes the accumulator into a [`ZoneMap`].
92+
///
93+
/// Returns `None` if none of the requested statistics can be computed, for example they are
94+
/// not applicable to the column's data type.
95+
pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
96+
let mut names = Vec::new();
97+
let mut fields = Vec::new();
98+
let mut stats = Vec::new();
99+
100+
for builder in self
101+
.builders
102+
.iter_mut()
103+
// We sort the stats so the DType is deterministic based on which stats are present.
104+
.sorted_unstable_by_key(|builder| builder.stat())
105+
{
106+
let values = builder.finish();
107+
108+
// We drop any all-null stats columns.
109+
if values.all_invalid()? {
110+
continue;
111+
}
112+
113+
stats.push(builder.stat());
114+
names.extend(values.names);
115+
fields.extend(values.arrays);
116+
}
117+
118+
if names.is_empty() {
119+
return Ok(None);
120+
}
121+
122+
let array = StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
123+
.vortex_expect("Failed to create zone map");
124+
let stats = stats.into();
125+
126+
// SAFETY: `StatsAccumulator` builds the struct fields from `stats_builder_with_capacity`
127+
// using the same field-ordering and truncation-column rules as `stats_table_dtype`.
128+
// The `stats` list is collected in that same sorted order, so the resulting struct array
129+
// matches the expected zoned stats-table dtype by construction.
130+
Ok(Some(unsafe { ZoneMap::new_unchecked(array, stats) }))
131+
}
132+
}
133+
134+
fn stats_builder_with_capacity(
30135
stat: Stat,
31136
dtype: &DType,
32137
capacity: usize,
@@ -64,21 +169,20 @@ pub fn stats_builder_with_capacity(
64169
}
65170
}
66171

67-
/// Arrays with their associated names, reduced version of a StructArray
68-
pub struct NamedArrays {
69-
pub names: Vec<FieldName>,
70-
pub arrays: Vec<ArrayRef>,
172+
/// Arrays with their associated names, reduced version of a `StructArray`.
173+
struct NamedArrays {
174+
names: Vec<FieldName>,
175+
arrays: Vec<ArrayRef>,
71176
}
72177

73178
impl NamedArrays {
74-
pub fn all_invalid(&self) -> VortexResult<bool> {
75-
// by convention we assume that the first array is the one we care about for logical validity
179+
fn all_invalid(&self) -> VortexResult<bool> {
180+
// By convention the first array is the logical validity signal for the stat column.
76181
self.arrays[0].all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
77182
}
78183
}
79184

80-
/// Minimal array builder interface for use by StatsTable for building stats arrays
81-
pub trait StatsArrayBuilder: Send {
185+
trait StatsArrayBuilder: Send {
82186
fn stat(&self) -> Stat;
83187

84188
fn append_scalar(&mut self, value: Scalar) -> VortexResult<()>;
@@ -88,13 +192,13 @@ pub trait StatsArrayBuilder: Send {
88192
fn finish(&mut self) -> NamedArrays;
89193
}
90194

91-
pub struct StatNameArrayBuilder {
195+
struct StatNameArrayBuilder {
92196
stat: Stat,
93197
builder: Box<dyn ArrayBuilder>,
94198
}
95199

96200
impl StatNameArrayBuilder {
97-
pub fn new(stat: Stat, builder: Box<dyn ArrayBuilder>) -> Self {
201+
fn new(stat: Stat, builder: Box<dyn ArrayBuilder>) -> Self {
98202
Self { stat, builder }
99203
}
100204
}
@@ -140,7 +244,7 @@ struct TruncatedMaxBinaryStatsBuilder<T: ScalarTruncation> {
140244
}
141245

142246
impl<T: ScalarTruncation> TruncatedMaxBinaryStatsBuilder<T> {
143-
pub fn new(
247+
fn new(
144248
values: Box<dyn ArrayBuilder>,
145249
is_truncated: BoolBuilder,
146250
max_value_length: usize,
@@ -162,7 +266,7 @@ struct TruncatedMinBinaryStatsBuilder<T: ScalarTruncation> {
162266
}
163267

164268
impl<T: ScalarTruncation> TruncatedMinBinaryStatsBuilder<T> {
165-
pub fn new(
269+
fn new(
166270
values: Box<dyn ArrayBuilder>,
167271
is_truncated: BoolBuilder,
168272
max_value_length: usize,
@@ -194,7 +298,6 @@ impl<T: ScalarTruncation> StatsArrayBuilder for TruncatedMaxBinaryStatsBuilder<T
194298
Ok(())
195299
}
196300

197-
#[inline]
198301
fn append_null(&mut self) {
199302
ArrayBuilder::append_null(self.values.as_mut());
200303
self.is_truncated.append_value(false);
@@ -229,7 +332,6 @@ impl<T: ScalarTruncation> StatsArrayBuilder for TruncatedMinBinaryStatsBuilder<T
229332
Ok(())
230333
}
231334

232-
#[inline]
233335
fn append_null(&mut self) {
234336
ArrayBuilder::append_null(self.values.as_mut());
235337
self.is_truncated.append_value(false);
@@ -245,3 +347,113 @@ impl<T: ScalarTruncation> StatsArrayBuilder for TruncatedMinBinaryStatsBuilder<T
245347
}
246348
}
247349
}
350+
351+
#[cfg(test)]
352+
mod tests {
353+
use rstest::rstest;
354+
use vortex_array::IntoArray;
355+
use vortex_array::LEGACY_SESSION;
356+
use vortex_array::VortexSessionExecute;
357+
use vortex_array::arrays::BoolArray;
358+
use vortex_array::arrays::bool::BoolArrayExt;
359+
use vortex_array::arrays::struct_::StructArrayExt;
360+
use vortex_array::builders::ArrayBuilder;
361+
use vortex_array::builders::VarBinViewBuilder;
362+
use vortex_array::dtype::DType;
363+
use vortex_array::dtype::Nullability;
364+
use vortex_array::expr::stats::Stat;
365+
use vortex_buffer::BitBuffer;
366+
use vortex_buffer::buffer;
367+
use vortex_error::VortexExpect;
368+
369+
use super::*;
370+
371+
#[rstest]
372+
#[case(DType::Utf8(Nullability::NonNullable))]
373+
#[case(DType::Binary(Nullability::NonNullable))]
374+
fn truncates_accumulated_stats(#[case] dtype: DType) {
375+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
376+
let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
377+
builder.append_value("Value to be truncated");
378+
builder.append_value("untruncated");
379+
let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
380+
builder2.append_value("Another");
381+
builder2.append_value("wait a minute");
382+
let mut acc =
383+
StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
384+
acc.push_chunk(&builder.finish(), &mut ctx)
385+
.vortex_expect("push_chunk should succeed for test data");
386+
acc.push_chunk(&builder2.finish(), &mut ctx)
387+
.vortex_expect("push_chunk should succeed for test data");
388+
let stats_table = acc
389+
.as_stats_table()
390+
.unwrap()
391+
.expect("Must have stats table");
392+
assert_eq!(
393+
stats_table.array().names().as_ref(),
394+
&[
395+
Stat::Max.name(),
396+
MAX_IS_TRUNCATED,
397+
Stat::Min.name(),
398+
MIN_IS_TRUNCATED,
399+
]
400+
);
401+
let field1_bool = stats_table
402+
.array()
403+
.unmasked_field(1)
404+
.clone()
405+
.execute::<BoolArray>(&mut ctx)
406+
.unwrap();
407+
assert_eq!(
408+
field1_bool.to_bit_buffer(),
409+
BitBuffer::from(vec![false, true])
410+
);
411+
let field3_bool = stats_table
412+
.array()
413+
.unmasked_field(3)
414+
.clone()
415+
.execute::<BoolArray>(&mut ctx)
416+
.unwrap();
417+
assert_eq!(
418+
field3_bool.to_bit_buffer(),
419+
BitBuffer::from(vec![true, false])
420+
);
421+
}
422+
423+
#[test]
424+
fn always_adds_is_truncated_column() {
425+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
426+
let array = buffer![0, 1, 2].into_array();
427+
let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
428+
acc.push_chunk(&array, &mut ctx)
429+
.vortex_expect("push_chunk should succeed for test array");
430+
let stats_table = acc
431+
.as_stats_table()
432+
.unwrap()
433+
.expect("Must have stats table");
434+
assert_eq!(
435+
stats_table.array().names().as_ref(),
436+
&[
437+
Stat::Max.name(),
438+
MAX_IS_TRUNCATED,
439+
Stat::Min.name(),
440+
MIN_IS_TRUNCATED,
441+
Stat::Sum.name(),
442+
]
443+
);
444+
let field1_bool = stats_table
445+
.array()
446+
.unmasked_field(1)
447+
.clone()
448+
.execute::<BoolArray>(&mut ctx)
449+
.unwrap();
450+
assert_eq!(field1_bool.to_bit_buffer(), BitBuffer::from(vec![false]));
451+
let field3_bool = stats_table
452+
.array()
453+
.unmasked_field(3)
454+
.clone()
455+
.execute::<BoolArray>(&mut ctx)
456+
.unwrap();
457+
assert_eq!(field3_bool.to_bit_buffer(), BitBuffer::from(vec![false]));
458+
}
459+
}

0 commit comments

Comments
 (0)