From b70fa7e95a5a1884ab12a8db4dccc18fab988a11 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 28 Apr 2026 15:00:26 +0100 Subject: [PATCH 01/11] fix Signed-off-by: Joe Isaacs --- encodings/runend/src/compute/filter.rs | 3 +- vortex-array/public-api.lock | 6 + vortex-array/src/array/erased.rs | 71 +++- vortex-array/src/array/mod.rs | 91 ++++- vortex-array/src/array/typed.rs | 8 + vortex-array/src/arrays/chunked/array.rs | 57 +-- vortex-array/src/arrays/chunked/tests.rs | 171 +++++++++ vortex-array/src/arrays/chunked/vtable/mod.rs | 27 +- vortex-array/src/executor.rs | 345 ++++++++++++------ vortex-array/src/optimizer/mod.rs | 23 +- 10 files changed, 618 insertions(+), 184 deletions(-) diff --git a/encodings/runend/src/compute/filter.rs b/encodings/runend/src/compute/filter.rs index e4c7a298430..d537845ee2b 100644 --- a/encodings/runend/src/compute/filter.rs +++ b/encodings/runend/src/compute/filter.rs @@ -158,13 +158,14 @@ mod tests { /// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire. #[test] fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + // 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes // the run-preserving path (true_count >= 25). let values: Vec = [10, 20, 30, 40] .iter() .flat_map(|&v| std::iter::repeat_n(v, 32)) .collect(); - let mut ctx = LEGACY_SESSION.create_execution_ctx(); let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array(), &mut ctx)?; // Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs. diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index dd7f4c0bea0..ca4c0117e87 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -21466,6 +21466,8 @@ pub vortex_array::ColumnarView::Constant(vortex_array::ArrayView<'a, vortex_arra pub enum vortex_array::ExecutionStep +pub vortex_array::ExecutionStep::AppendChild(usize) + pub vortex_array::ExecutionStep::Done pub vortex_array::ExecutionStep::ExecuteSlot(usize, vortex_array::DonePredicate) @@ -22586,6 +22588,8 @@ pub struct vortex_array::ExecutionResult impl vortex_array::ExecutionResult +pub fn vortex_array::ExecutionResult::append_child(array: impl vortex_array::IntoArray, slot_idx: usize) -> Self + pub fn vortex_array::ExecutionResult::array(&self) -> &vortex_array::ArrayRef pub fn vortex_array::ExecutionResult::done(result: impl vortex_array::IntoArray) -> Self @@ -25176,6 +25180,8 @@ pub fn vortex_session::VortexSession::create_execution_ctx(&self) -> vortex_arra pub fn vortex_array::child_to_validity(child: &core::option::Option, nullability: vortex_array::dtype::Nullability) -> vortex_array::validity::Validity +pub fn vortex_array::execute_into_builder(array: vortex_array::ArrayRef, builder: alloc::boxed::Box, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + pub fn vortex_array::patches_child(patches: &vortex_array::patches::Patches, idx: usize) -> vortex_array::ArrayRef pub fn vortex_array::patches_child_name(idx: usize) -> &'static str diff --git a/vortex-array/src/array/erased.rs b/vortex-array/src/array/erased.rs index 3046e2ba57c..7fe8ec953f8 100644 --- a/vortex-array/src/array/erased.rs +++ b/vortex-array/src/array/erased.rs @@ -23,6 +23,7 @@ use crate::ArrayHash; use crate::ArrayView; use crate::Canonical; use crate::ExecutionCtx; +use crate::ExecutionResult; use crate::IntoArray; use crate::LEGACY_SESSION; use crate::VTable; @@ -93,6 +94,12 @@ impl ArrayRef { &self.0 } + /// Returns a reference to the inner Arc. + #[inline(always)] + pub(crate) fn inner_mut(&mut self) -> &mut Arc { + &mut self.0 + } + /// Consumes the array reference, returning the owned backing allocation. #[inline(always)] pub(crate) fn into_inner(self) -> Arc { @@ -431,10 +438,13 @@ impl ArrayRef { self.with_slots(slots) } - /// Take a slot for executor-owned physical rewrites. This has the result that the array may - /// either be taken or cloned from the parent. + /// Take a slot for executor-owned physical rewrites. /// - /// The array can be put back with [`put_slot_unchecked`]. + /// On return the produced parent has the taken slot set to `None` + /// callers must put the slot back (typically via [`put_slot_unchecked`]) before the parent is + /// returned from the execution loop. + /// + /// When the `Arc` was shared this allocates a fresh parent. /// /// # Safety /// The caller must put back a slot with the same logical dtype and length before exposing the @@ -443,18 +453,27 @@ impl ArrayRef { mut self, slot_idx: usize, ) -> VortexResult<(ArrayRef, ArrayRef)> { - let child = if let Some(inner) = Arc::get_mut(&mut self.0) { - // # Safety: ensured by the caller. - unsafe { inner.slots_mut()[slot_idx].take() } - .vortex_expect("take_slot_unchecked cannot take an absent slot") - } else { - self.slots()[slot_idx] - .as_ref() - .vortex_expect("take_slot_unchecked cannot take an absent slot") - .clone() - }; + if let Some(inner) = Arc::get_mut(&mut self.0) { + // SAFETY: ensured by the caller. + let child = unsafe { inner.slots_mut()[slot_idx].take() } + .vortex_expect("take_slot_unchecked cannot take an absent slot"); + return Ok((self, child)); + } + + // Arc is shared: clone the child out and build a fresh parent with slot_idx = None, + // bypassing encoding-level validation so the absent slot does not panic `V::validate`. + let child = self.slots()[slot_idx] + .as_ref() + .vortex_expect("take_slot_unchecked cannot take an absent slot") + .clone(); + + let mut new_slots = self.slots().to_vec(); + new_slots[slot_idx] = None; - Ok((self, child)) + // SAFETY: ensured by the caller — the None slot is either put back or driven to completion + // via the builder path before the parent escapes the executor. + let new_parent = unsafe { self.0.with_slots_unchecked(&self, new_slots) }; + Ok((new_parent, child)) } /// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive @@ -532,12 +551,28 @@ impl ArrayRef { self.0.reduce_parent(self, parent, child_idx) } - pub(crate) fn execute_encoding( + pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult { + let inner = Arc::as_ptr(&self.0); + // Safety the Arc outline the DynArray function call + unsafe { (&*inner).execute(self, ctx) } + } + + /// Execute a single encoding step without applying `Done`-result postconditions. + /// + /// This is for the iterative executor only. It may operate on suspended executor-private + /// arrays whose slots temporarily contain `None`, so the executor itself must interpret + /// `Done`, enforce any `len`/`dtype` invariants, and transfer statistics. + pub(crate) fn execute_encoding_unchecked( self, ctx: &mut ExecutionCtx, - ) -> VortexResult { - let inner = Arc::clone(&self.0); - inner.execute(self, ctx) + ) -> VortexResult { + let inner = Arc::as_ptr(&self.0); + // Safety the Arc outline the DynArray function call + let inner = unsafe { &*inner }; + // SAFETY: `inner` points at the allocation owned by `self.0`. `self` stays alive for the + // duration of the call, so the pointee remains valid. Avoiding an extra `Arc` clone here + // preserves uniqueness so execute-time metadata cursors can use `Arc::get_mut`. + unsafe { inner.execute_unchecked(self, ctx) } } pub fn execute_parent( diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 888e7bf6893..8bf3bd01f22 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -56,6 +56,9 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug { /// Returns the array as a reference to a generic [`Any`] trait object. fn as_any(&self) -> &dyn Any; + /// Returns the array as a mutable reference to a generic [`Any`] trait object. + fn as_any_mut(&mut self) -> &mut dyn Any; + /// Converts an owned array allocation into an owned [`Any`] allocation for downcasting. fn into_any_arc(self: std::sync::Arc) -> std::sync::Arc; @@ -143,6 +146,24 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug { /// Returns a new array with the given slots. fn with_slots(&self, this: ArrayRef, slots: Vec>) -> VortexResult; + /// Returns a new array with the given slots, bypassing encoding-level validation. + /// + /// Used by the executor to temporarily carry an array that has had one of its child slots + /// taken out (leaving `None`) without panicking `V::validate`. The caller must ensure the + /// missing slot is filled back in (via `put_slot_unchecked`) or driven to completion by the + /// builder path before the array becomes externally observable. + /// + /// # Safety + /// + /// The array returned may have slots whose content does not match the encoding's normal + /// invariants. Callers must re-establish those invariants before handing the array to + /// anything outside the executor. + unsafe fn with_slots_unchecked( + &self, + this: &ArrayRef, + slots: Vec>, + ) -> ArrayRef; + /// Attempt to reduce the array to a simpler representation. fn reduce(&self, this: &ArrayRef) -> VortexResult>; @@ -155,8 +176,30 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug { ) -> VortexResult>; /// Execute the array by taking a single encoding-specific execution step. + /// + /// This is the checked entry point. If the encoding reports + /// [`ExecutionStep::Done`](crate::ExecutionStep::Done), implementations must validate that the + /// returned array preserves this array's logical `len` and `dtype`, and must transfer this + /// array's statistics to the returned array. fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult; + /// Execute the array by taking a single encoding-specific execution step without applying + /// `Done`-result postconditions. + /// + /// This exists for the iterative executor, which may call into `execute` on suspended + /// executor-private arrays whose slots temporarily contain `None`. In that mode the executor + /// itself is responsible for deciding when a `Done` result represents a real logical array, + /// enforcing any `len`/`dtype` invariants, and transferring statistics. + /// + /// # Safety + /// The `array` returned should have it's `DType` and len checked + /// (optionally it should have its stats propagated from `this`). + unsafe fn execute_unchecked( + &self, + this: ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult; + /// Attempt to execute the parent of this array. fn execute_parent( &self, @@ -203,6 +246,10 @@ impl DynArray for ArrayInner { self } + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + fn into_any_arc(self: std::sync::Arc) -> std::sync::Arc { self } @@ -387,6 +434,26 @@ impl DynArray for ArrayInner { .into_array()) } + unsafe fn with_slots_unchecked( + &self, + this: &ArrayRef, + slots: Vec>, + ) -> ArrayRef { + // SAFETY: we intentionally skip `V::validate` here. Caller guarantees that the resulting + // array is either repaired or not externally observed. + let inner = unsafe { + ArrayInner::::from_data_unchecked( + self.vtable.clone(), + this.dtype().clone(), + self.len, + self.data.clone(), + slots, + self.stats.clone(), + ) + }; + ArrayRef::from_inner(std::sync::Arc::new(inner)) + } + fn reduce(&self, this: &ArrayRef) -> VortexResult> { let view = unsafe { ArrayView::new_unchecked(this, &self.data) }; let Some(reduced) = V::reduce(view)? else { @@ -437,12 +504,8 @@ impl DynArray for ArrayInner { fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { let len = this.len(); let dtype = this.dtype().clone(); - let stats = this.statistics().to_owned(); - - let typed = Array::::try_from_array_ref(this) - .map_err(|_| vortex_err!("Failed to downcast array for execute")) - .vortex_expect("Failed to downcast array for execute"); - let result = V::execute(typed, ctx)?; + let stats = this.statistics().to_array_stats(); + let result = self.execute_unchecked(this, ctx)?; if matches!(result.step(), ExecutionStep::Done) { if cfg!(debug_assertions) { @@ -458,12 +521,26 @@ impl DynArray for ArrayInner { ); } - result.array().statistics().set_iter(stats.into_iter()); + result + .array() + .statistics() + .set_iter(crate::stats::StatsSet::from(stats).into_iter()); } Ok(result) } + fn execute_unchecked( + &self, + this: ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let typed = Array::::try_from_array_ref(this) + .map_err(|_| vortex_err!("Failed to downcast array for execute")) + .vortex_expect("Failed to downcast array for execute"); + V::execute(typed, ctx) + } + fn execute_parent( &self, this: &ArrayRef, diff --git a/vortex-array/src/array/typed.rs b/vortex-array/src/array/typed.rs index 49de1f7f17c..ffdbcc6f161 100644 --- a/vortex-array/src/array/typed.rs +++ b/vortex-array/src/array/typed.rs @@ -280,6 +280,14 @@ impl Array { &self.downcast_inner().data } + /// Try to fetch a mut ref to the inner ArrayData. + pub fn data_mut(&mut self) -> Option<&mut V::ArrayData> { + let m = self.inner.inner_mut(); + let inner = Arc::get_mut(m)?; + let array_inner = inner.as_any_mut().downcast_mut::>(); + Some(&mut array_inner?.data) + } + /// Returns the full typed array construction parts if this handle owns the allocation. pub fn try_into_parts(self) -> Result, Self> { let Self { inner, _phantom } = self; diff --git a/vortex-array/src/arrays/chunked/array.rs b/vortex-array/src/arrays/chunked/array.rs index e66d46a071c..6ec93d3aefb 100644 --- a/vortex-array/src/arrays/chunked/array.rs +++ b/vortex-array/src/arrays/chunked/array.rs @@ -37,6 +37,8 @@ pub(super) const CHUNKS_OFFSET: usize = 1; #[derive(Clone, Debug)] pub struct ChunkedData { pub(super) chunk_offsets: Vec, + /// This is used to find the next child to execute when in executing into a builder. + pub(super) next_builder_slot: usize, } impl Display for ChunkedData { @@ -114,6 +116,13 @@ pub trait ChunkedArrayExt: TypedArrayRef { impl> ChunkedArrayExt for T {} impl ChunkedData { + pub(super) fn new(chunk_offsets: Vec) -> Self { + Self { + chunk_offsets, + next_builder_slot: CHUNKS_OFFSET, + } + } + pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec { let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1); chunk_offsets.push(0); @@ -159,24 +168,31 @@ impl ChunkedData { } impl Array { + pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self { + if let Some(data) = self.data_mut() { + data.next_builder_slot = next_builder_slot; + return self; + } + // This is the slow path that will be hit at most once per execution since the second one + // *MUST* have execlusive access due to this copy. + let stats = self.statistics().to_owned(); + let mut data = self.data().clone(); + data.next_builder_slot = next_builder_slot; + // SAFETY: we only modified next_builder_slot which doesn't affect array invariants. + unsafe { + Array::from_parts_unchecked( + ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data) + .with_slots(self.slots().to_vec()), + ) + } + .with_stats_set(stats) + } + /// Constructs a new `ChunkedArray`. pub fn try_new(chunks: Vec, dtype: DType) -> VortexResult { ChunkedData::validate(&chunks, &dtype)?; - let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks); - Ok(unsafe { - Array::from_parts_unchecked( - ArrayParts::new( - Chunked, - dtype, - len, - ChunkedData { - chunk_offsets: chunk_offsets.clone(), - }, - ) - .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)), - ) - }) + // SAFETY just validated on previous line. + Ok(unsafe { Self::new_unchecked(chunks, dtype) }) } pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult { @@ -238,15 +254,8 @@ impl Array { let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks); unsafe { Array::from_parts_unchecked( - ArrayParts::new( - Chunked, - dtype, - len, - ChunkedData { - chunk_offsets: chunk_offsets.clone(), - }, - ) - .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)), + ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone())) + .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)), ) } } diff --git a/vortex-array/src/arrays/chunked/tests.rs b/vortex-array/src/arrays/chunked/tests.rs index 6cc2049ec64..8bd0504a9a1 100644 --- a/vortex-array/src/arrays/chunked/tests.rs +++ b/vortex-array/src/arrays/chunked/tests.rs @@ -17,6 +17,7 @@ use crate::arrays::PrimitiveArray; use crate::arrays::StructArray; use crate::arrays::VarBinViewArray; use crate::arrays::chunked::ChunkedArrayExt; +use crate::arrays::dict_test::gen_dict_primitive_chunks; use crate::arrays::struct_::StructArrayExt; use crate::assert_arrays_eq; #[expect(deprecated)] @@ -39,6 +40,176 @@ fn chunked_array() -> ChunkedArray { .unwrap() } +#[test] +fn builder_kernel_path_canonicalizes_primitive_chunks() { + use crate::builders::builder_with_capacity; + use crate::executor::execute_into_builder; + + let array = chunked_array().into_array(); + let dtype = array.dtype().clone(); + let len = array.len(); + + let builder = builder_with_capacity(&dtype, len); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + // Clone the array into the builder path — the test also holds `array` so refcount > 1 on + // entry, which previously caused `take_slot_unchecked` to silently keep slots populated. + let mut builder = execute_into_builder(array.clone(), builder, &mut ctx).unwrap(); + let output = builder.finish(); + drop(array); + + assert_arrays_eq!( + output, + PrimitiveArray::from_iter([1u64, 2, 3, 4, 5, 6, 7, 8, 9]) + ); +} + +#[test] +fn builder_kernel_nested_chunked_of_chunked() { + use crate::builders::builder_with_capacity; + use crate::executor::execute_into_builder; + + let inner_1 = ChunkedArray::try_new( + vec![buffer![1u64, 2].into_array(), buffer![3u64].into_array()], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap() + .into_array(); + let inner_2 = ChunkedArray::try_new( + vec![buffer![4u64, 5, 6].into_array()], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap() + .into_array(); + let outer = ChunkedArray::try_new( + vec![inner_1, inner_2], + DType::Primitive(PType::U64, Nullability::NonNullable), + ) + .unwrap() + .into_array(); + + let dtype = outer.dtype().clone(); + let len = outer.len(); + let builder = builder_with_capacity(&dtype, len); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut builder = execute_into_builder(outer, builder, &mut ctx).unwrap(); + let output = builder.finish(); + + assert_arrays_eq!(output, PrimitiveArray::from_iter([1u64, 2, 3, 4, 5, 6])); +} + +#[test] +fn builder_kernel_path_repeated_shared_chunked_dict_execution() { + use crate::builders::builder_with_capacity; + use crate::executor::execute_into_builder; + + let array = gen_dict_primitive_chunks::(8, 3, 3); + let keep_alive = array.clone(); + let dtype = array.dtype().clone(); + let len = array.len(); + + let mut expected_ctx = LEGACY_SESSION.create_execution_ctx(); + let expected = array + .clone() + .execute::(&mut expected_ctx) + .unwrap() + .into_array(); + + let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); + let first = { + let builder = builder_with_capacity(&dtype, len); + let mut builder = execute_into_builder(array.clone(), builder, &mut first_ctx).unwrap(); + builder.finish() + }; + + let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); + let second = { + let builder = builder_with_capacity(&dtype, len); + let mut builder = execute_into_builder(array, builder, &mut second_ctx).unwrap(); + builder.finish() + }; + + drop(keep_alive); + + assert_arrays_eq!(first, expected); + assert_arrays_eq!(second, expected); +} + +#[test] +fn execute_path_repeated_shared_chunked_dict_execution() { + let array = gen_dict_primitive_chunks::(8, 3, 3); + let keep_alive = array.clone(); + + let expected_source = gen_dict_primitive_chunks::(8, 3, 3); + let mut expected_ctx = LEGACY_SESSION.create_execution_ctx(); + let expected = expected_source + .execute::(&mut expected_ctx) + .unwrap() + .into_array(); + + let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); + let first = array + .clone() + .execute::(&mut first_ctx) + .unwrap() + .into_array(); + + let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); + let second = array + .execute::(&mut second_ctx) + .unwrap() + .into_array(); + + drop(keep_alive); + + assert_arrays_eq!(first, expected); + assert_arrays_eq!(second, expected); +} + +#[test] +fn execute_path_nested_chunked_dict_of_dict_into_canonical() { + use crate::builders::builder_with_capacity; + + let inner_1 = gen_dict_primitive_chunks::(8, 3, 2); + let inner_2 = gen_dict_primitive_chunks::(8, 3, 3); + let outer = ChunkedArray::try_new( + vec![inner_1.clone(), inner_2.clone()], + inner_1.dtype().clone(), + ) + .unwrap() + .into_array(); + let keep_alive = outer.clone(); + + let expected = { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut builder = builder_with_capacity(outer.dtype(), outer.len()); + inner_1 + .append_to_builder(builder.as_mut(), &mut ctx) + .unwrap(); + inner_2 + .append_to_builder(builder.as_mut(), &mut ctx) + .unwrap(); + builder.finish() + }; + + let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); + let first = outer + .clone() + .execute::(&mut first_ctx) + .unwrap() + .into_array(); + + let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); + let second = outer + .execute::(&mut second_ctx) + .unwrap() + .into_array(); + + drop(keep_alive); + + assert_arrays_eq!(first, expected); + assert_arrays_eq!(second, expected); +} + #[test] fn with_slot_rewrites_chunk_and_offsets() { let array = chunked_array().into_array(); diff --git a/vortex-array/src/arrays/chunked/vtable/mod.rs b/vortex-array/src/arrays/chunked/vtable/mod.rs index 5e9c4a4d817..535ba0eea65 100644 --- a/vortex-array/src/arrays/chunked/vtable/mod.rs +++ b/vortex-array/src/arrays/chunked/vtable/mod.rs @@ -44,6 +44,7 @@ use crate::serde::ArrayChildren; mod canonical; mod operations; mod validity; + /// A [`Chunked`]-encoded Vortex array. pub type ChunkedArray = Array; @@ -213,9 +214,7 @@ impl VTable for Chunked { self.clone(), dtype.clone(), len, - ChunkedData { - chunk_offsets: chunk_offsets_usize, - }, + ChunkedData::new(chunk_offsets_usize), ) .with_slots(slots)) } @@ -239,7 +238,27 @@ impl VTable for Chunked { } fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { - Ok(ExecutionResult::done(_canonicalize(array.as_view(), ctx)?)) + match array.dtype() { + // Struct and List need special swizzling logic, use the existing canonicalize path. + DType::Struct(..) | DType::List(..) => { + // TODO(joe)[#7674]: iterative exectuion here too + Ok(ExecutionResult::done(_canonicalize(array.as_view(), ctx)?)) + } + // For all other types, use the builder path via AppendChild. + _ => { + let slot_idx = array.next_builder_slot.max(CHUNKS_OFFSET); + if slot_idx < array.slots().len() { + Ok(ExecutionResult::append_child( + array.with_next_builder_slot(slot_idx + 1), + slot_idx, + )) + } else { + Ok(ExecutionResult::done( + Canonical::empty(array.dtype()).into_array(), + )) + } + } + } } fn execute_parent( diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 1ca27bc1de5..7c6e9ed9148 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -26,6 +26,7 @@ use std::sync::LazyLock; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; use vortex_error::vortex_panic; use vortex_session::VortexSession; @@ -33,21 +34,26 @@ use crate::AnyCanonical; use crate::ArrayRef; use crate::Canonical; use crate::IntoArray; +use crate::array::ArrayId; +use crate::builders::ArrayBuilder; +use crate::builders::builder_with_capacity_in; use crate::dtype::DType; use crate::matcher::Matcher; use crate::memory::HostAllocatorRef; use crate::memory::MemorySessionExt; use crate::optimizer::ArrayOptimizer; +use crate::stats::ArrayStats; +use crate::stats::StatsSet; /// Returns the maximum number of iterations to attempt when executing an array before giving up and returning -/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 128. +/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 2^22. pub(crate) fn max_iterations() -> usize { static MAX_ITERATIONS: LazyLock = LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") { Ok(val) => val.parse::().unwrap_or_else(|e| { vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}") }), - Err(VarError::NotPresent) => 128, + Err(VarError::NotPresent) => 2 << 21, // 2 ^ 22 Err(VarError::NotUnicode(_)) => { vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string") } @@ -87,23 +93,6 @@ impl ArrayRef { /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work /// stack. /// - /// Each iteration proceeds through three steps in order: - /// - /// 1. **Done / canonical check** - if `current` satisfies the active done predicate or is - /// canonical, splice it back into the stacked parent (if any) and continue, or return. - /// 2. **`execute_parent` on children** - try each child's `execute_parent` against `current` - /// as the parent (e.g. `Filter(RunEnd)` → `FilterExecuteAdaptor` fires from RunEnd). - /// If there is a stacked parent frame, the rewritten child is spliced back into it so - /// that optimize and further `execute_parent` can fire on the reconstructed parent - /// (e.g. `Slice(RunEnd)` → `RunEnd` spliced into stacked `Filter` → `Filter(RunEnd)` - /// whose `FilterExecuteAdaptor` fires on the next iteration). - /// 3. **`execute`** - call the encoding's own execute step, which either returns `Done` or - /// `ExecuteSlot(i)` to push a child onto the stack for focused execution. - /// - /// Optimizer calls in this loop use [`ExecutionCtx::session`], so kernels registered on the - /// session's [`ArrayKernels`](crate::optimizer::kernels::ArrayKernels) are visible between - /// execution steps. - /// /// Note: the returned array may not match `M`. If execution converges to a canonical form /// that does not match `M`, the canonical array is returned since no further execution /// progress is possible. @@ -111,117 +100,145 @@ impl ArrayRef { /// For safety, we will error when the number of execution iterations reaches a configurable /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { - let mut current = self; + let mut current_array = self; + let mut current_builder: Option> = None; let mut stack: Vec = Vec::new(); + let max_iterations = max_iterations(); - for _ in 0..max_iterations() { - // Step 1: done / canonical - splice back into stacked parent or return. + for _ in 0..max_iterations { let is_done = stack .last() .map_or(M::matches as DonePredicate, |frame| frame.done); - if is_done(¤t) || AnyCanonical::matches(¤t) { + + if is_done(¤t_array) || AnyCanonical::matches(¤t_array) { match stack.pop() { None => { - ctx.log(format_args!("-> {}", current)); - return Ok(current); + debug_assert!( + current_builder.is_none(), + "root activation should not retain a builder" + ); + ctx.log(format_args!("-> {}", current_array)); + return Ok(current_array); } Some(frame) => { - current = frame.put_back(current)?.optimize_ctx(ctx.session())?; + (current_array, current_builder) = pop_frame(frame, current_array)?; continue; } } } - // Step 2: execute_parent on children (current is the parent). - // If there is a stacked parent frame, splice the rewritten child back into it - // so that optimize and execute_parent can fire naturally on the reconstructed parent - // (e.g. Slice(RunEnd) -RunEndSliceKernel-> RunEnd, spliced back into Filter gives - // Filter(RunEnd), whose FilterExecuteAdaptor fires on the next iteration). - if let Some(rewritten) = try_execute_parent(¤t, ctx)? { + // ── Step 2a: execute_parent against stack parent ─────────────────── + // + // When executing a child for ExecuteSlot, try execute_parent against + // the suspended parent on the stack. This lets kernels like RunEnd's + // FilterKernel fire before the child is forced to canonical. + if current_builder.is_none() + && let Some(frame) = stack.last() + { + if let Some(result) = + current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)? + { + ctx.log(format_args!( + "execute_parent (stack) rewrote {} -> {}", + current_array, result + )); + let frame = stack.pop().vortex_expect("just peeked"); + current_array = result.optimize_ctx(ctx.session())?; + current_builder = frame.parent_builder; + continue; + } + } + + // ── Step 2b: execute_parent ───────────────────────────────────────── + // + // Skip execute_parent with child. + if let Some(rewritten) = try_execute_parent(¤t_array, ctx)? { ctx.log(format_args!( "execute_parent rewrote {} -> {}", - current, rewritten + current_array, rewritten )); - current = rewritten.optimize_ctx(ctx.session())?; - if let Some(frame) = stack.pop() { - current = frame.put_back(current)?.optimize_ctx(ctx.session())?; - } + current_array = rewritten.optimize_ctx(ctx.session())?; continue; } - // Step 4: execute the encoding's own step. - let result = execute_step(current, ctx)?; + // ── Step 3: execute step ─────────────────────────────────────────── + let expected_len = current_array.len(); + let expected_dtype = current_array.dtype().clone(); + let stats = current_array.statistics().to_array_stats(); + let encoding_id = current_array.encoding_id(); + let result = current_array.execute_encoding_unchecked(ctx)?; let (array, step) = result.into_parts(); match step { ExecutionStep::ExecuteSlot(i, done) => { - // SAFETY: we record the child's dtype and len, and assert they are preserved - // when the slot is put back via `put_slot_unchecked`. let (parent, child) = unsafe { array.take_slot_unchecked(i) }?; ctx.log(format_args!( "ExecuteSlot({i}): pushing {}, focusing on {}", parent, child )); - let frame = StackFrame::new(parent, i, done, &child); - stack.push(frame); - current = child.optimize_ctx(ctx.session())?; + stack.push(StackFrame { + parent_array: parent, + parent_builder: current_builder.take(), + slot_idx: i, + done, + original_dtype: child.dtype().clone(), + original_len: child.len(), + }); + current_array = child; + current_builder = None; + } + ExecutionStep::AppendChild(i) => { + if current_builder.is_none() { + current_builder = Some(builder_with_capacity_in( + ctx.allocator(), + array.dtype(), + array.len(), + )); + } + let (parent, child) = unsafe { array.take_slot_unchecked(i) }?; + ctx.log(format_args!( + "AppendChild({i}): appending {} into builder", + child + )); + // TODO(perf): replace with a builder kernel registry so we don't + // need to go through the VTable append_to_builder indirection. + child.append_to_builder( + current_builder + .as_deref_mut() + .vortex_expect("builder must exist"), + ctx, + )?; + current_array = parent; } ExecutionStep::Done => { ctx.log(format_args!("Done: {}", array)); - current = array; + (current_array, current_builder) = finalize_done( + array, + current_builder, + expected_len, + expected_dtype, + stats, + encoding_id, + )?; } } } vortex_bail!( "Exceeded maximum execution iterations ({}) while executing array", - max_iterations(), + max_iterations, ) } } -/// A stack frame for the iterative executor, tracking the parent array whose slot is being -/// executed and the original child's dtype/len for validation on put-back. struct StackFrame { - parent: ArrayRef, + parent_array: ArrayRef, + parent_builder: Option>, slot_idx: usize, done: DonePredicate, original_dtype: DType, original_len: usize, } -impl StackFrame { - fn new(parent: ArrayRef, slot_idx: usize, done: DonePredicate, child: &ArrayRef) -> Self { - Self { - parent, - slot_idx, - done, - original_dtype: child.dtype().clone(), - original_len: child.len(), - } - } - - fn put_back(self, replacement: ArrayRef) -> VortexResult { - debug_assert_eq!( - replacement.dtype(), - &self.original_dtype, - "slot {} dtype changed from {} to {} during execution", - self.slot_idx, - self.original_dtype, - replacement.dtype() - ); - debug_assert_eq!( - replacement.len(), - self.original_len, - "slot {} len changed from {} to {} during execution", - self.slot_idx, - self.original_len, - replacement.len() - ); - // SAFETY: we assert above that dtype and len are preserved. - unsafe { self.parent.put_slot_unchecked(self.slot_idx, replacement) } - } -} - /// Execution context for batch CPU compute. #[derive(Debug, Clone)] pub struct ExecutionCtx { @@ -306,42 +323,33 @@ impl Drop for ExecutionCtx { } } -/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex. -/// -/// It attempts to take the smallest possible step of execution such that the returned array -/// is incrementally more "executed" than the input array. In other words, it is closer to becoming -/// a canonical array. +/// Single-step execution: takes one step toward canonical form. /// -/// The execution steps are as follows: -/// 0. Check for canonical. -/// 1. Attempt to `reduce` the array with metadata-only optimizations. -/// 2. Attempt to call `reduce_parent` on each child. -/// 3. Attempt to call `execute_parent` on each child. -/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]). +/// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`, +/// only a single child execution step is performed — the child is executed once and put back, +/// making this a lightweight, bounded operation. /// -/// Most users will not call this method directly, instead preferring to specify an executable -/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as -/// [`crate::arrays::PrimitiveArray`]). +/// **However**, if `execute_step` returns [`ExecutionStep::AppendChild`], this implementation +/// drives the *entire* array to completion via [`execute_into_builder`] in a single call. +/// This can do substantially more work than a normal step because it creates a builder and +/// fully decodes the array into that builder before returning. Callers should be aware that a +/// single `.execute::(ctx)` call may perform O(n_children * decode_cost) work when +/// `AppendChild` is returned. impl Executable for ArrayRef { fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { - // 0. Check for canonical if let Some(canonical) = array.as_opt::() { ctx.log(format_args!("-> canonical {}", array)); return Ok(Canonical::from(canonical).into_array()); } - // 1. reduce (metadata-only rewrites) if let Some(reduced) = array.reduce()? { ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced)); reduced.statistics().inherit_from(array.statistics()); return Ok(reduced); } - // 2. reduce_parent (child-driven metadata-only rewrites) for (slot_idx, slot) in array.slots().iter().enumerate() { - let Some(child) = slot else { - continue; - }; + let Some(child) = slot else { continue }; if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? { ctx.log(format_args!( "reduce_parent: slot[{}]({}) rewrote {} -> {}", @@ -355,11 +363,8 @@ impl Executable for ArrayRef { } } - // 3. execute_parent (child-driven optimized execution) for (slot_idx, slot) in array.slots().iter().enumerate() { - let Some(child) = slot else { - continue; - }; + let Some(child) = slot else { continue }; if let Some(executed_parent) = child.execute_parent(&array, slot_idx, ctx)? { ctx.log(format_args!( "execute_parent: slot[{}]({}) rewrote {} -> {}", @@ -375,9 +380,8 @@ impl Executable for ArrayRef { } } - // 4. execute (returns an ExecutionResult) ctx.log(format_args!("executing {}", array)); - let result = execute_step(array, ctx)?; + let result = execute_step_checked(array, ctx)?; let (array, step) = result.into_parts(); match step { ExecutionStep::Done => { @@ -385,23 +389,96 @@ impl Executable for ArrayRef { Ok(array) } ExecutionStep::ExecuteSlot(i, _) => { - // For single-step execution, handle ExecuteSlot by executing the slot, - // replacing it, and returning the updated array. let child = array.slots()[i].clone().vortex_expect("valid slot index"); let executed_child = child.execute::(ctx)?; array.with_slot(i, executed_child) } + ExecutionStep::AppendChild(_) => { + // Single-step: build the entire parent via the builder path. + let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len()); + let mut builder = execute_into_builder(array, builder, ctx)?; + Ok(builder.finish()) + } } } } +/// Execute `array` into the given `builder`. +/// +/// This uses the encoding's [`crate::array::VTable::append_to_builder`] implementation. Most +/// encodings use the default path of `execute::` followed by `builder.extend_from_array`, +/// while encodings like `Chunked` can override that to append child-by-child without materializing +/// the entire parent. +/// +/// The builder must have a [`DType`] that is a nullability-superset of `array.dtype()`. +pub fn execute_into_builder( + array: ArrayRef, + mut builder: Box, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + array.append_to_builder(builder.as_mut(), ctx)?; + Ok(builder) +} + +/// Pop a stack frame, restoring the parent with the finished child in its slot. +fn pop_frame( + frame: StackFrame, + child: ArrayRef, +) -> VortexResult<(ArrayRef, Option>)> { + debug_assert_eq!( + child.dtype(), + &frame.original_dtype, + "child dtype changed during execution" + ); + debug_assert_eq!( + child.len(), + frame.original_len, + "child len changed during execution" + ); + let parent_array = unsafe { frame.parent_array.put_slot_unchecked(frame.slot_idx, child) }?; + Ok((parent_array, frame.parent_builder)) +} + /// Execute a single step on an array, consuming it. /// /// Extracts the vtable before consuming the array to avoid borrow conflicts. -fn execute_step(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { +fn execute_step_checked(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { array.execute_encoding(ctx) } +fn finalize_done( + result: ArrayRef, + mut builder: Option>, + expected_len: usize, + expected_dtype: DType, + stats: ArrayStats, + encoding_id: ArrayId, +) -> VortexResult<(ArrayRef, Option>)> { + let output = if let Some(mut builder) = builder.take() { + builder.finish() + } else { + result + }; + + if cfg!(debug_assertions) { + vortex_ensure!( + output.len() == expected_len, + "Result length mismatch for {:?}", + encoding_id + ); + vortex_ensure!( + output.dtype() == &expected_dtype, + "Executed canonical dtype mismatch for {:?}", + encoding_id + ); + } + + output + .statistics() + .set_iter(StatsSet::from(stats).into_iter()); + Ok((output, None)) +} + /// Try execute_parent on each occupied slot of the array. fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult> { for (slot_idx, slot) in array.slots().iter().enumerate() { @@ -424,6 +501,30 @@ pub type DonePredicate = fn(&ArrayRef) -> bool; /// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the /// scheduler what to do next. This enables the scheduler to manage execution iteratively using /// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions. +/// +/// # Semantics +/// +/// Each variant describes a different execution strategy with distinct cost profiles: +/// +/// - [`Done`](ExecutionStep::Done): The encoding has finished its work in this step. The +/// returned array is the result. The scheduler may continue executing if the target form +/// (e.g. canonical) has not yet been reached. +/// +/// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children +/// decoded before it can make further progress. The scheduler takes ownership of the child, +/// executes it until the [`DonePredicate`] matches, puts it back, and re-enters the parent. +/// Between steps the optimizer runs reduce/reduce_parent rules to fixpoint, enabling +/// cross-step optimization (e.g. pushing scalar functions through newly-decoded children). +/// This is a cooperative yield — the encoding does a bounded amount of work per step. +/// +/// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to +/// canonical form and then appended into a builder owned by the current activation. The +/// scheduler suspends the parent, executes the child, appends the finished child into the +/// parent's builder, and then resumes the same parent so it can continue with more +/// `AppendChild` or `ExecuteSlot` steps. **Important:** in the single-step executor +/// ([`Executable`] for [`ArrayRef`]), returning `AppendChild` still causes the executor to +/// drive the *entire* array to completion via [`execute_into_builder`] in one call — this can +/// do significantly more work than a single `ExecuteSlot` step. pub enum ExecutionStep { /// Request that the scheduler execute the slot at the given index, using the provided /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this @@ -435,6 +536,18 @@ pub enum ExecutionStep { /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly. ExecuteSlot(usize, DonePredicate), + /// Execute the slot at the given index to canonical form, then append it into a canonical + /// builder owned by the current activation. + /// + /// The parent activation remains suspended with its builder while the child executes. Once + /// the child reaches canonical form, the scheduler appends it into the parent builder and + /// resumes the same parent activation. + /// + /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant + /// drives the entire parent to completion in one call via [`execute_into_builder`], which + /// may perform substantially more work than a single `ExecuteSlot` step. + AppendChild(usize), + /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result. /// The scheduler will continue executing if it has not yet reached the target form. Done, @@ -444,6 +557,7 @@ impl fmt::Debug for ExecutionStep { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ExecutionStep::ExecuteSlot(idx, _) => f.debug_tuple("ExecuteSlot").field(idx).finish(), + ExecutionStep::AppendChild(idx) => f.debug_tuple("AppendChild").field(idx).finish(), ExecutionStep::Done => write!(f, "Done"), } } @@ -477,6 +591,15 @@ impl ExecutionResult { } } + /// Request that the child slot at `slot_idx` be executed and appended into the current + /// activation's canonical builder. + pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self { + Self { + array: array.into_array(), + step: ExecutionStep::AppendChild(slot_idx), + } + } + /// Returns a reference to the array. pub fn array(&self) -> &ArrayRef { &self.array diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 70a041bcc18..95d3b19817d 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -15,8 +15,6 @@ //! - [`ArrayOptimizer::optimize_recursive`] applies the session-aware optimizer to the root and //! every descendant. -use std::sync::Arc; - use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_session::SessionExt; @@ -24,7 +22,6 @@ use vortex_session::VortexSession; use crate::ArrayRef; use crate::optimizer::kernels::ArrayKernels; -use crate::optimizer::kernels::ReduceParentFn; pub mod kernels; pub mod rules; @@ -65,26 +62,13 @@ impl ArrayOptimizer for ArrayRef { } } -/// Resolve a session-registered [`ReduceParentFn`] for the `(parent, child)` pair. -/// -/// The returned [`Arc`] is owned so the caller can drop the [`ArrayKernels`] borrow before -/// invoking the function. -fn plugin_reduce_parent( - session: &VortexSession, - parent: &ArrayRef, - child: &ArrayRef, -) -> Option> { - session - .get_opt::() - .and_then(|s| s.find_reduce_parent(parent.encoding_id(), child.encoding_id())) -} - fn try_optimize( array: &ArrayRef, session: Option<&VortexSession>, ) -> VortexResult> { let mut current_array = array.clone(); let mut any_optimizations = false; + let array_ref = session.and_then(|s| s.get_opt::()); // Apply reduction rules to the current array until no more rules apply. let mut loop_counter = 0; @@ -106,8 +90,9 @@ fn try_optimize( let Some(child) = slot else { continue }; // Session kernels take precedence over the child encoding's static PARENT_RULES. - if let Some(session) = session - && let Some(plugins) = plugin_reduce_parent(session, ¤t_array, child) + if let Some(ref array_ref) = array_ref + && let Some(plugins) = + array_ref.find_reduce_parent(current_array.encoding_id(), child.encoding_id()) { for plugin in plugins.as_ref() { if let Some(new_array) = plugin(child, ¤t_array, slot_idx)? { From 81c30dd4251f6f4da14c45d1ba8c6e1341383ead Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 28 Apr 2026 15:21:07 +0100 Subject: [PATCH 02/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/array/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 8bf3bd01f22..1ade2a5a083 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -505,7 +505,7 @@ impl DynArray for ArrayInner { let len = this.len(); let dtype = this.dtype().clone(); let stats = this.statistics().to_array_stats(); - let result = self.execute_unchecked(this, ctx)?; + let result = unsafe { self.execute_unchecked(this, ctx)? }; if matches!(result.step(), ExecutionStep::Done) { if cfg!(debug_assertions) { @@ -530,7 +530,7 @@ impl DynArray for ArrayInner { Ok(result) } - fn execute_unchecked( + unsafe fn execute_unchecked( &self, this: ArrayRef, ctx: &mut ExecutionCtx, From 20c0ffb102620d8a64f592be2d40d26df07e3b1a Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 28 Apr 2026 15:23:27 +0100 Subject: [PATCH 03/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/executor.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 7c6e9ed9148..ac23f957f6d 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -381,7 +381,7 @@ impl Executable for ArrayRef { } ctx.log(format_args!("executing {}", array)); - let result = execute_step_checked(array, ctx)?; + let result = array.execute_encoding(ctx)?; let (array, step) = result.into_parts(); match step { ExecutionStep::Done => { @@ -439,13 +439,6 @@ fn pop_frame( Ok((parent_array, frame.parent_builder)) } -/// Execute a single step on an array, consuming it. -/// -/// Extracts the vtable before consuming the array to avoid borrow conflicts. -fn execute_step_checked(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { - array.execute_encoding(ctx) -} - fn finalize_done( result: ArrayRef, mut builder: Option>, From f8096d69303a797405f246aa147586e11e8f5d92 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 28 Apr 2026 15:26:04 +0100 Subject: [PATCH 04/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/arrays/chunked/vtable/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-array/src/arrays/chunked/vtable/mod.rs b/vortex-array/src/arrays/chunked/vtable/mod.rs index 535ba0eea65..046fdea1caa 100644 --- a/vortex-array/src/arrays/chunked/vtable/mod.rs +++ b/vortex-array/src/arrays/chunked/vtable/mod.rs @@ -241,7 +241,7 @@ impl VTable for Chunked { match array.dtype() { // Struct and List need special swizzling logic, use the existing canonicalize path. DType::Struct(..) | DType::List(..) => { - // TODO(joe)[#7674]: iterative exectuion here too + // TODO(joe)[#7674]: iterative execution here too Ok(ExecutionResult::done(_canonicalize(array.as_view(), ctx)?)) } // For all other types, use the builder path via AppendChild. From 178f4a2582dec1eb84e417b40ed4fa8329c5d34e Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 28 Apr 2026 17:08:04 +0100 Subject: [PATCH 05/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/executor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index ac23f957f6d..1cab02b7cf0 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -132,9 +132,7 @@ impl ArrayRef { // When executing a child for ExecuteSlot, try execute_parent against // the suspended parent on the stack. This lets kernels like RunEnd's // FilterKernel fire before the child is forced to canonical. - if current_builder.is_none() - && let Some(frame) = stack.last() - { + if let Some(frame) = stack.last() { if let Some(result) = current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)? { @@ -152,7 +150,9 @@ impl ArrayRef { // ── Step 2b: execute_parent ───────────────────────────────────────── // // Skip execute_parent with child. - if let Some(rewritten) = try_execute_parent(¤t_array, ctx)? { + if current_builder.is_none() + && let Some(rewritten) = try_execute_parent(¤t_array, ctx)? + { ctx.log(format_args!( "execute_parent rewrote {} -> {}", current_array, rewritten From d3d43a0a427eb4ecd1eac696007737a316a3579b Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 29 Apr 2026 11:00:21 +0100 Subject: [PATCH 06/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/executor.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 1cab02b7cf0..845737e36a0 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -132,7 +132,12 @@ impl ArrayRef { // When executing a child for ExecuteSlot, try execute_parent against // the suspended parent on the stack. This lets kernels like RunEnd's // FilterKernel fire before the child is forced to canonical. - if let Some(frame) = stack.last() { + // + // Skip when a builder is active: the current array has been partially + // consumed by AppendChild (some slots are already in the builder), so + // a parent rewrite would see inconsistent state and the builder data + // would be lost when we restore frame.parent_builder. + if current_builder.is_none() && let Some(frame) = stack.last() { if let Some(result) = current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)? { From 3b16c6c9793ff2bff8704de43e8894e57a4dd296 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 29 Apr 2026 12:09:18 +0100 Subject: [PATCH 07/11] fix Signed-off-by: Joe Isaacs --- vortex-array/public-api.lock | 2 + vortex-array/src/executor.rs | 86 +++++++++++++++++++++++++++++------- 2 files changed, 73 insertions(+), 15 deletions(-) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index ca4c0117e87..a50cd88cda9 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -21836,6 +21836,8 @@ pub fn vortex_array::Array::as_view(&self) -> vortex_array::ArrayView<'_, V> pub fn vortex_array::Array::data(&self) -> &::ArrayData +pub fn vortex_array::Array::data_mut(&mut self) -> core::option::Option<&mut ::ArrayData> + pub fn vortex_array::Array::dtype(&self) -> &vortex_array::dtype::DType pub fn vortex_array::Array::encoding_id(&self) -> vortex_array::ArrayId diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 845737e36a0..affc3586484 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -99,6 +99,62 @@ impl ArrayRef { /// /// For safety, we will error when the number of execution iterations reaches a configurable /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). + /// + /// # Execution model + /// + /// The executor maintains two pieces of mutable state: `current_array` (the array + /// being worked on) and `stack` (suspended parent frames from `ExecuteSlot`). + /// When an `AppendChild` step is active, a `current_builder` accumulates results. + /// + /// Consider a tree where `ExecuteSlot` has pushed a parent onto the stack: + /// + /// ```text + /// stack[0].parent_array: + /// RunEnd <-- PARENT (suspended on stack) + /// +-- slot 0: ends + /// +-- slot 1: _ (detached) + /// + /// current_array: + /// DictEncoding <-- CHILD (executor focus) + /// +-- slot 0: codes + /// +-- slot 1: dictionary + /// ``` + /// + /// Each iteration tries these steps in order on `current_array`: + /// + /// **Step 1 — done check**: if `current_array` satisfies the matcher (or is + /// canonical), pop the stack frame and reassemble the parent with the resolved + /// child via `put_slot_unchecked`. + /// + /// **Step 2a — `execute_parent` (stack)**: ask `current_array` (the child) whether + /// it can rewrite the suspended parent on the stack. For example, a child that has + /// been filtered can tell a RunEnd parent to fuse the filter, replacing both parent + /// and child with a single new array. Skipped when a builder is active (the current + /// array has been partially consumed by `AppendChild`). + /// + /// ```text + /// current_array.execute_parent(&stack_parent, slot_idx) + /// child looks UP at suspended parent + /// DictEncoding ---> "can I rewrite RunEnd?" + /// ``` + /// + /// **Step 2b — `execute_parent` (self)**: iterate `current_array`'s own children + /// and ask each whether it can rewrite `current_array` (their parent). Skipped + /// when a builder is active for the same reason as 2a. + /// + /// ```text + /// for child in current_array.children(): + /// child.execute_parent(¤t_array, slot_idx) + /// children look UP at current_array + /// codes -------> "can I rewrite DictEncoding?" + /// dictionary --> "can I rewrite DictEncoding?" + /// ``` + /// + /// **Step 3 — `execute`**: call the encoding's own decode step. Returns one of: + /// - `ExecuteSlot(i)`: push `current_array` onto the stack, focus on child `i`. + /// - `AppendChild(i)`: detach child `i`, append it into the builder, keep + /// `current_array` as the parent for the next iteration. + /// - `Done`: finalize (builder if present, otherwise the returned array). pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { let mut current_array = self; let mut current_builder: Option> = None; @@ -127,7 +183,7 @@ impl ArrayRef { } } - // ── Step 2a: execute_parent against stack parent ─────────────────── + // execute_parent against stack parent // // When executing a child for ExecuteSlot, try execute_parent against // the suspended parent on the stack. This lets kernels like RunEnd's @@ -137,22 +193,22 @@ impl ArrayRef { // consumed by AppendChild (some slots are already in the builder), so // a parent rewrite would see inconsistent state and the builder data // would be lost when we restore frame.parent_builder. - if current_builder.is_none() && let Some(frame) = stack.last() { - if let Some(result) = + if current_builder.is_none() + && let Some(frame) = stack.last() + && let Some(result) = current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)? - { - ctx.log(format_args!( - "execute_parent (stack) rewrote {} -> {}", - current_array, result - )); - let frame = stack.pop().vortex_expect("just peeked"); - current_array = result.optimize_ctx(ctx.session())?; - current_builder = frame.parent_builder; - continue; - } + { + ctx.log(format_args!( + "execute_parent (stack) rewrote {} -> {}", + current_array, result + )); + let frame = stack.pop().vortex_expect("just peeked"); + current_array = result.optimize_ctx(ctx.session())?; + current_builder = frame.parent_builder; + continue; } - // ── Step 2b: execute_parent ───────────────────────────────────────── + // execute_parent // // Skip execute_parent with child. if current_builder.is_none() @@ -166,7 +222,7 @@ impl ArrayRef { continue; } - // ── Step 3: execute step ─────────────────────────────────────────── + // execute step let expected_len = current_array.len(); let expected_dtype = current_array.dtype().clone(); let stats = current_array.statistics().to_array_stats(); From 671b39985fa53393989a80274cf854a3463157e0 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 30 Apr 2026 10:28:05 +0100 Subject: [PATCH 08/11] fix Signed-off-by: Joe Isaacs --- docs/developer-guide/internals/execution.md | 262 ++++++++++++-------- vortex-array/src/executor.rs | 163 ++++++------ 2 files changed, 243 insertions(+), 182 deletions(-) diff --git a/docs/developer-guide/internals/execution.md b/docs/developer-guide/internals/execution.md index 07eb88875b5..be92e29ceef 100644 --- a/docs/developer-guide/internals/execution.md +++ b/docs/developer-guide/internals/execution.md @@ -60,44 +60,31 @@ support constant vectors directly, avoiding unnecessary expansion. ## Execution Overview -The `execute_until` method on `ArrayRef` drives execution. The scheduler is -iterative: it rewrites and executes arrays in small steps until the current array matches the -requested target form. +Execution has two closely related entry points: -At a high level, each iteration works like this: +- `ArrayRef::execute::` is the single-step executor. It tries `reduce`, + `reduce_parent`, `execute_parent`, then `execute` once. +- `ArrayRef::execute_until` is the matcher-driven loop used by `Canonical`, `Columnar`, + and other target executors. It repeatedly interprets `ExecutionStep` until the current + activation matches `M` or no further progress is possible. -1. `optimize(current)` runs metadata-only rewrites to fixpoint: - `reduce` lets an array simplify itself, and `reduce_parent` lets a child rewrite its parent. -2. If optimization does not finish execution, each child gets a chance to `execute_parent`, - meaning "execute my parent's operation using my representation". -3. If no child can do that, the array's own `execute` method returns the next `ExecutionStep`. +`VTable::execute` never recursively descends into children on its own. Instead it returns an +`ExecutionResult` containing an `ExecutionStep` that tells `execute_until` what to do next. -This keeps execution iterative rather than recursive, and it gives optimization rules another -chance to fire after every structural or computational step. +The loop carries three mutable pieces of state: -## The Four Layers - -The execution model has four layers, but they are not all invoked in the same way. Layers 1 and -2 make up `optimize`, which runs to fixpoint before and after execution steps. Layers 3 and 4 -run only after optimization has stalled. - -``` -execute_until(root): - current = optimize(root) # Layers 1-2 to fixpoint +- `current_array: ArrayRef` -- the array currently in focus. +- `current_builder: Option>` -- active only for the builder path. + `AppendChild` appends detached children here, and `Done` finalizes the builder. +- `stack: Vec` -- suspended parents from `ExecuteSlot`, including the detached + slot index, its `DonePredicate`, and the parent builder that was active before focus moved + into the child. - loop: - if current matches target: - return / reattach to parent +## The Four Layers - Layer 3: try execute_parent on each child - if one succeeds: - current = optimize(result) - continue - - Layer 4: call execute(current) - ExecuteChild(i, pred) -> focus child[i], then optimize - Done -> current = optimize(result) -``` +Encodings can contribute logic in four places. The single-step executor can touch all four. +The iterative `execute_until` loop revisits Layers 3 and 4 directly, using `ExecuteSlot`, +`AppendChild`, and `Done` to move focus around the tree. ### Layer 1: `reduce` -- self-rewrite rules @@ -162,66 +149,92 @@ containing an `ExecutionStep` that tells the scheduler what to do next: ```rust pub enum ExecutionStep { - /// Ask the scheduler to execute child[idx] until it matches the predicate, - /// then replace the child and re-enter execution for this array. - ExecuteChild(usize, DonePredicate), + /// Push the parent onto the stack, focus a single child, and resume the + /// parent once that child matches the predicate. + ExecuteSlot(usize, DonePredicate), + + /// Detach a child, append it into the current activation's builder, and + /// keep the parent as current_array for the next iteration. + AppendChild(usize), - /// Execution is complete. The array in the ExecutionResult is the result. + /// Execution is complete. If a builder is active, it is finalized here. Done, } ``` +- `ExecuteSlot(i, pred)` detaches slot `i`, pushes the parent onto `stack`, and makes that + child the new `current_array` until `pred` says it is done. +- `AppendChild(i)` detaches slot `i`, appends that child into `current_builder`, and keeps + the returned parent as `current_array` for the next iteration. +- `Done` finishes the current activation. If `current_builder` is active, the builder is + finalized and its finished array becomes the result of this activation. + ## The Execution Loop -The full `execute_until` loop uses an explicit work stack to manage -parent-child relationships without recursion: +The full `execute_until` loop uses an explicit work stack and an optional builder +to manage parent-child relationships without recursion. ``` execute_until(root): + current_array = root + current_builder = None stack = [] - current = optimize(root) loop: - ┌─────────────────────────────────────────────────────┐ - │ Is current "done"? │ - │ (matches M if at root, or matches the stack │ - │ frame's DonePredicate if inside a child) │ - ├──────────────────────┬──────────────────────────────┘ + ┌──────────────────────────────────────────────────────────────┐ + │ Step 1: is current_array "done"? │ + │ (matches M at the root, or the stack frame's │ + │ DonePredicate inside ExecuteSlot) │ + ├──────────────────────┬───────────────────────────────────────┘ │ yes │ no │ │ - │ stack empty? │ Already canonical? - │ ├─ yes → return │ ├─ yes → pop stack (can't make more progress) - │ └─ no → pop frame, │ └─ no → continue to execution steps - │ replace child, │ - │ optimize, loop │ - │ ▼ - │ ┌────────────────────────────────────┐ - │ │ Try execute_parent on each child │ - │ │ (Layer 3 parent kernels) │ - │ ├────────┬───────────────────────────┘ - │ │ Some │ None - │ │ │ - │ │ ▼ - │ │ ┌─────────────────────────────────┐ - │ │ │ Call execute (Layer 4) │ - │ │ │ Returns ExecutionResult │ - │ │ ├────────┬────────────────────────┘ - │ │ │ │ - │ │ │ ExecuteChild(i, pred)? - │ │ │ ├─ yes → push (array, i, pred) - │ │ │ │ current = child[i] - │ │ │ │ optimize, loop - │ │ │ └─ Done → current = result - │ │ │ loop - │ │ │ - │ ▼ ▼ - │ optimize result, loop - └────────────────────────── + │ stack empty? │ current_builder active? + │ ├─ yes → return │ ├─ yes → skip Step 2a / 2b + │ └─ no → pop frame, │ └─ no + │ reattach child, │ + │ restore builder, │ + │ loop ▼ + │ ┌────────────────────────────────────────────┐ + │ │ Step 2a: current_array.execute_parent( │ + │ │ stack.top.parent_array ) │ + │ │ child looks UP at the suspended parent │ + │ ├────────────┬───────────────────────────────┘ + │ │ Some │ None + │ │ │ + │ │ ▼ + │ │ ┌─────────────────────────────────────────┐ + │ │ │ Step 2b: each child.execute_parent( │ + │ │ │ current_array ) │ + │ │ │ children look UP at current_array │ + │ │ ├──────────┬──────────────────────────────┘ + │ │ │ Some │ None + │ │ │ │ + │ │ │ ▼ + │ │ │ ┌──────────────────────────────────────┐ + │ │ │ │ Step 3: current_array.execute() │ + │ │ │ ├──────────────┬───────────────────────┘ + │ │ │ │ │ + │ │ │ │ ExecuteSlot(i, pred) + │ │ │ │ -> push parent + builder + │ │ │ │ -> current_array = child[i] + │ │ │ │ -> current_builder = None + │ │ │ │ + │ │ │ │ AppendChild(i) + │ │ │ │ -> ensure current_builder + │ │ │ │ -> child.append_to_builder(...) + │ │ │ │ -> current_array = parent + │ │ │ │ + │ │ │ │ Done + │ │ │ │ -> finish current_builder if present + │ │ │ │ -> otherwise use returned array + │ ▼ ▼ ▼ + │ continue loop with rewritten or finished array + └────────────────────────────────────────────────────── ``` -Note that `optimize` runs after every transformation. This is what enables cross-step -optimizations: after a child is decoded, new `reduce_parent` rules may now match that were -previously blocked. +Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild` partially +consumes `current_array`: some slots already live in the builder, so a parent rewrite would +observe inconsistent state and could discard accumulated builder data. ## Incremental Execution @@ -242,8 +255,9 @@ If execution jumped straight to canonicalizing the dict's children, it would exp codes through the slice, missing the Dict-RLE optimization entirely. Incremental execution avoids this: -1. First iteration: the slice `execute_parent` (parent kernel on RunEnd for Slice) performs a - binary search on run ends, returning a new `RunEndArray` with adjusted offsets. +1. First iteration: the slice `execute` returns `ExecuteSlot` for its `RunEndArray` child. + Once that child is in focus, Step 2a gives it a chance to rewrite the suspended slice + parent before the child is forced toward canonical form. 2. Second iteration: the `RunEndArray` codes child now matches the Dict-RLE pattern. Its `execute_parent` provides a fused kernel that expands runs while performing dictionary @@ -259,44 +273,96 @@ Input: RunEndArray { ends: [3, 7, 10], values: [A, B, C], len: 10 } Goal: Canonical (PrimitiveArray or similar) Iteration 1: - reduce? → None (no self-rewrite rules match) - reduce_parent? → None (no parent, this is root) - execute_parent? → None (no parent) - execute → ends are not Primitive yet? - ExecuteChild(0, Primitive::matches) + Step 1 → not done + Step 2a → skipped (root, no stacked parent) + Step 2b → None + Step 3 → ends are not Primitive yet? + ExecuteSlot(0, Primitive::matches) Stack: [(RunEnd, child_idx=0, Primitive::matches)] Focus on: ends + current_builder = None Iteration 2: - Current: ends array - Already Primitive? → yes, done. - Pop stack → replace child 0 in RunEnd, optimize. + Step 1 → done (ends already match Primitive) + Pop stack → replace child 0 in RunEnd Iteration 3: - reduce? → None - reduce_parent? → None - execute_parent? → None - execute → values are not Canonical yet? - ExecuteChild(1, AnyCanonical::matches) + Step 1 → not done + Step 2a → skipped (root again after the pop) + Step 2b → None + Step 3 → values are not Canonical yet? + ExecuteSlot(1, AnyCanonical::matches) Stack: [(RunEnd, child_idx=1, AnyCanonical::matches)] Focus on: values Iteration 4: - Current: values array - Already Canonical? → yes, done. - Pop stack → replace child 1 in RunEnd, optimize. + Step 1 → done (values already match AnyCanonical) + Pop stack → replace child 1 in RunEnd Iteration 5: - reduce? → None - reduce_parent? → None - execute_parent? → None - execute → all children ready, decode runs: + Step 1 → not done + Step 2a → skipped (root) + Step 2b → None + Step 3 → all children ready, decode runs: [A, A, A, B, B, B, B, C, C, C] Done → return PrimitiveArray → Result: PrimitiveArray [A, A, A, B, B, B, B, C, C, C] ``` +## Walkthrough: Executing a Chunked Bool Array via `AppendChild` + +`Chunked` uses the builder path for most dtypes. Instead of focusing one child as the new +`current_array`, it detaches one chunk at a time, appends it into `current_builder`, and keeps +the `ChunkedArray` itself as the active parent: + +``` +Input: Chunked { + chunks[0] = Bool[true, false], + chunks[1] = Bool[false], + chunks[2] = Bool[true, true], + } +Goal: Canonical BoolArray + +Iteration 1: + Step 1 → not done + Step 2a → skipped (root, no stacked parent) + Step 2b → None + Step 3 → AppendChild(1) + create current_builder = BoolBuilder [] + append chunks[0] + current_array = Chunked(next_builder_slot = 2) + current_builder = BoolBuilder [true, false] + +Iteration 2: + Step 1 → not done + Step 2a / 2b → skipped (builder active; current_array is partially consumed) + Step 3 → AppendChild(2) + append chunks[1] + current_array = Chunked(next_builder_slot = 3) + current_builder = BoolBuilder [true, false, false] + +Iteration 3: + Step 1 → not done + Step 2a / 2b → skipped + Step 3 → AppendChild(3) + append chunks[2] + current_array = Chunked(next_builder_slot = 4) + current_builder = BoolBuilder [true, false, false, true, true] + +Iteration 4: + Step 1 → not done + Step 2a / 2b → skipped + Step 3 → Done + finish current_builder + result = BoolArray [true, false, false, true, true] + +→ Result: BoolArray [true, false, false, true, true] +``` + +When `current_builder` is active, the array returned alongside `Done` is just the signal that +the parent activation has finished. The actual result comes from finalizing the builder. + ## Implementing an Encoding: Where Does My Logic Go? When adding a new encoding or optimizing an existing one, the key question is whether the @@ -317,7 +383,7 @@ Rules of thumb: knows how to handle its parent's operation more efficiently than the parent knows how to handle the child. - Treat `execute` as the fallback. If no reduce rule or parent kernel applies, the encoding - decodes itself and uses `ExecuteChild` to request child execution when needed. + decodes itself and uses `ExecuteSlot` or `AppendChild` to tell the scheduler what to do next. ## Future Work diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index affc3586484..31d475822dc 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -1,22 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! The execution engine: iteratively transforms arrays toward canonical form. +//! Iterative array execution. //! -//! Execution proceeds through four layers tried in order on each iteration: +//! The single-step [`Executable`] implementation for [`ArrayRef`] tries `reduce`, +//! `reduce_parent`, `execute_parent`, then `execute` once. The matcher-driven +//! [`ArrayRef::execute_until`] loop interprets [`ExecutionStep::ExecuteSlot`], +//! [`ExecutionStep::AppendChild`], and [`ExecutionStep::Done`] using an explicit stack plus an +//! optional builder, so encodings can advance without recursive descent. //! -//! 1. **`reduce`** -- metadata-only self-rewrite (cheapest). -//! 2. **`reduce_parent`** -- metadata-only child-driven parent rewrite. -//! 3. **`execute_parent`** -- child-driven fused execution (may read buffers). -//! 4. **`execute`** -- the encoding's own decode step (most expensive). -//! -//! The main entry point is [`ArrayRef::execute_until`], which uses an explicit work stack -//! to drive execution iteratively without recursion. Between steps, the optimizer runs -//! reduce/reduce_parent rules to fixpoint using the active [`ExecutionCtx`] session, so -//! session-registered optimizer kernels participate during execution. -//! -//! See for a full description -//! of the model. +//! See for the full execution +//! narrative, diagrams, and walkthroughs. use std::env::VarError; use std::fmt; @@ -91,70 +85,72 @@ impl ArrayRef { } /// Iteratively execute this array until the [`Matcher`] matches, using an explicit work - /// stack. + /// stack plus an optional builder for `AppendChild`. /// /// Note: the returned array may not match `M`. If execution converges to a canonical form /// that does not match `M`, the canonical array is returned since no further execution /// progress is possible. /// - /// For safety, we will error when the number of execution iterations reaches a configurable - /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). + /// For safety, this errors once execution reaches a configurable maximum number of + /// iterations (default `2^22`, override with `VORTEX_MAX_ITERATIONS`). /// - /// # Execution model + /// # Loop state /// - /// The executor maintains two pieces of mutable state: `current_array` (the array - /// being worked on) and `stack` (suspended parent frames from `ExecuteSlot`). - /// When an `AppendChild` step is active, a `current_builder` accumulates results. + /// - `current_array: ArrayRef` -- the array currently in focus. + /// - `current_builder: Option>` -- active only for builder-mode + /// execution. `AppendChild` appends detached children here. `Done` finishes the builder + /// and turns it back into the next `current_array`. + /// - `stack: Vec` -- suspended parents from `ExecuteSlot`, including the + /// detached slot index, its [`DonePredicate`], and the parent builder that was active + /// before focus moved into the child. /// - /// Consider a tree where `ExecuteSlot` has pushed a parent onto the stack: + /// Example after `ExecuteSlot(1, pred)` has focused slot 1 of a parent: /// /// ```text - /// stack[0].parent_array: - /// RunEnd <-- PARENT (suspended on stack) + /// stack[top].parent_array: + /// RunEnd <-- suspended parent /// +-- slot 0: ends /// +-- slot 1: _ (detached) /// /// current_array: - /// DictEncoding <-- CHILD (executor focus) + /// DictEncoding <-- focused child /// +-- slot 0: codes /// +-- slot 1: dictionary + /// + /// current_builder: + /// None /// ``` /// - /// Each iteration tries these steps in order on `current_array`: + /// Each loop iteration works like this: /// - /// **Step 1 — done check**: if `current_array` satisfies the matcher (or is - /// canonical), pop the stack frame and reassemble the parent with the resolved - /// child via `put_slot_unchecked`. + /// ```text + /// loop: + /// Step 1: done(current_array)? + /// - root activation -> return current_array + /// - ExecuteSlot frame -> pop, reattach child, resume parent /// - /// **Step 2a — `execute_parent` (stack)**: ask `current_array` (the child) whether - /// it can rewrite the suspended parent on the stack. For example, a child that has - /// been filtered can tell a RunEnd parent to fuse the filter, replacing both parent - /// and child with a single new array. Skipped when a builder is active (the current - /// array has been partially consumed by `AppendChild`). + /// Step 2: current_builder active? + /// - yes -> skip Step 2a / 2b + /// - no -> try parent kernels /// - /// ```text - /// current_array.execute_parent(&stack_parent, slot_idx) - /// child looks UP at suspended parent - /// DictEncoding ---> "can I rewrite RunEnd?" - /// ``` + /// Step 2a: current_array.execute_parent(stack.top.parent_array) + /// child looks up at the suspended parent from ExecuteSlot /// - /// **Step 2b — `execute_parent` (self)**: iterate `current_array`'s own children - /// and ask each whether it can rewrite `current_array` (their parent). Skipped - /// when a builder is active for the same reason as 2a. + /// Step 2b: for child in current_array.children(): + /// child.execute_parent(current_array) + /// each child looks up at current_array /// - /// ```text - /// for child in current_array.children(): - /// child.execute_parent(¤t_array, slot_idx) - /// children look UP at current_array - /// codes -------> "can I rewrite DictEncoding?" - /// dictionary --> "can I rewrite DictEncoding?" + /// Step 3: match current_array.execute() + /// ExecuteSlot(i, pred) -> push parent on stack, focus child `i` + /// AppendChild(i) -> detach child `i`, append it into current_builder, + /// keep parent as current_array + /// Done -> finish current_builder if present, else use returned array /// ``` /// - /// **Step 3 — `execute`**: call the encoding's own decode step. Returns one of: - /// - `ExecuteSlot(i)`: push `current_array` onto the stack, focus on child `i`. - /// - `AppendChild(i)`: detach child `i`, append it into the builder, keep - /// `current_array` as the parent for the next iteration. - /// - `Done`: finalize (builder if present, otherwise the returned array). + /// Step 2a and Step 2b are skipped while `current_builder` is active. `AppendChild` + /// partially consumes `current_array`: some slots already live in the builder, so a + /// parent rewrite would observe inconsistent state and could discard accumulated builder + /// data. pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { let mut current_array = self; let mut current_builder: Option> = None; @@ -183,7 +179,7 @@ impl ArrayRef { } } - // execute_parent against stack parent + // Step 2a: execute_parent against the suspended parent from ExecuteSlot. // // When executing a child for ExecuteSlot, try execute_parent against // the suspended parent on the stack. This lets kernels like RunEnd's @@ -208,9 +204,7 @@ impl ArrayRef { continue; } - // execute_parent - // - // Skip execute_parent with child. + // Step 2b: execute_parent against current_array's own children. if current_builder.is_none() && let Some(rewritten) = try_execute_parent(¤t_array, ctx)? { @@ -550,59 +544,59 @@ fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult< /// A predicate that determines when an array has reached a desired form during execution. pub type DonePredicate = fn(&ArrayRef) -> bool; -/// Metadata-only step indicator returned alongside an array in [`ExecutionResult`]. +/// Scheduler step indicator returned alongside an array in [`ExecutionResult`]. /// /// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the /// scheduler what to do next. This enables the scheduler to manage execution iteratively using -/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions. +/// an explicit work stack plus an optional builder. /// /// # Semantics /// /// Each variant describes a different execution strategy with distinct cost profiles: /// -/// - [`Done`](ExecutionStep::Done): The encoding has finished its work in this step. The -/// returned array is the result. The scheduler may continue executing if the target form -/// (e.g. canonical) has not yet been reached. +/// - [`Done`](ExecutionStep::Done): The current activation has finished its work. If no builder +/// is active, the returned array is the result. If a builder is active, the scheduler ignores +/// the placeholder array and finishes the builder instead. The scheduler may continue +/// executing if the target form (e.g. canonical) has not yet been reached. /// /// - [`ExecuteSlot`](ExecutionStep::ExecuteSlot): The encoding needs one of its children -/// decoded before it can make further progress. The scheduler takes ownership of the child, -/// executes it until the [`DonePredicate`] matches, puts it back, and re-enters the parent. -/// Between steps the optimizer runs reduce/reduce_parent rules to fixpoint, enabling -/// cross-step optimization (e.g. pushing scalar functions through newly-decoded children). -/// This is a cooperative yield — the encoding does a bounded amount of work per step. +/// decoded before it can make further progress. The scheduler detaches that child, pushes +/// the parent onto the explicit stack, executes the child until the [`DonePredicate`] +/// matches, puts it back, and re-enters the parent. This is a cooperative yield: the +/// encoding does a bounded amount of work per step while the loop tracks the parent-child +/// relationship explicitly. /// /// - [`AppendChild`](ExecutionStep::AppendChild): The encoding needs one child executed to /// canonical form and then appended into a builder owned by the current activation. The -/// scheduler suspends the parent, executes the child, appends the finished child into the -/// parent's builder, and then resumes the same parent so it can continue with more -/// `AppendChild` or `ExecuteSlot` steps. **Important:** in the single-step executor -/// ([`Executable`] for [`ArrayRef`]), returning `AppendChild` still causes the executor to -/// drive the *entire* array to completion via [`execute_into_builder`] in one call — this can -/// do significantly more work than a single `ExecuteSlot` step. +/// scheduler detaches that child, lazily creates `current_builder` if needed, appends the +/// child into it, and keeps the parent as `current_array` for the next iteration. While the +/// builder is active, parent-kernel rewrites are skipped because the parent is partially +/// consumed. **Important:** in the single-step executor ([`Executable`] for [`ArrayRef`]), +/// returning `AppendChild` still causes the executor to drive the *entire* array to +/// completion via [`execute_into_builder`] in one call — this can do significantly more +/// work than a single `ExecuteSlot` step. pub enum ExecutionStep { /// Request that the scheduler execute the slot at the given index, using the provided /// [`DonePredicate`] to determine when the slot is "done", then replace the slot in this /// array and re-enter execution. /// - /// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling - /// cross-step optimization (e.g., pushing scalar functions through newly-decoded children). - /// /// Use [`ExecutionResult::execute_slot`] instead of constructing this variant directly. ExecuteSlot(usize, DonePredicate), - /// Execute the slot at the given index to canonical form, then append it into a canonical - /// builder owned by the current activation. + /// Detach the slot at the given index, append that child into the current activation's + /// canonical builder, and keep the returned parent as `current_array`. /// - /// The parent activation remains suspended with its builder while the child executes. Once - /// the child reaches canonical form, the scheduler appends it into the parent builder and - /// resumes the same parent activation. + /// `Done` finalizes that builder and turns it into the result of the activation. /// /// **Note:** In the single-step executor ([`Executable`] for [`ArrayRef`]), this variant /// drives the entire parent to completion in one call via [`execute_into_builder`], which /// may perform substantially more work than a single `ExecuteSlot` step. AppendChild(usize), - /// Execution is complete. The array in the accompanying [`ExecutionResult`] is the result. + /// Execution is complete. If no builder is active, the array in the accompanying + /// [`ExecutionResult`] is the result. Otherwise, the scheduler finalizes the active + /// builder and uses that finished array instead. + /// /// The scheduler will continue executing if it has not yet reached the target form. Done, } @@ -645,8 +639,9 @@ impl ExecutionResult { } } - /// Request that the child slot at `slot_idx` be executed and appended into the current - /// activation's canonical builder. + /// Request that the child slot at `slot_idx` be detached, appended into the current + /// activation's canonical builder, and leave the returned parent as the next + /// `current_array`. pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self { Self { array: array.into_array(), From 53b48c3207b6a2616e28a5618f8ad96793430cb1 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 30 Apr 2026 10:40:33 +0100 Subject: [PATCH 09/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/arrays/chunked/tests.rs | 43 +++++++++++------------- vortex-array/src/executor.rs | 2 +- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/vortex-array/src/arrays/chunked/tests.rs b/vortex-array/src/arrays/chunked/tests.rs index 8bd0504a9a1..0a8ede24b89 100644 --- a/vortex-array/src/arrays/chunked/tests.rs +++ b/vortex-array/src/arrays/chunked/tests.rs @@ -2,12 +2,14 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::sync::Arc; +use std::sync::LazyLock; use vortex_buffer::Buffer; use vortex_buffer::buffer; +use vortex_session::VortexSession; +use crate::Canonical; use crate::IntoArray; -use crate::LEGACY_SESSION; use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::Chunked; @@ -20,14 +22,20 @@ use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::dict_test::gen_dict_primitive_chunks; use crate::arrays::struct_::StructArrayExt; use crate::assert_arrays_eq; +use crate::builders::builder_with_capacity; #[expect(deprecated)] use crate::canonical::ToCanonical as _; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; use crate::dtype::PType::I32; +use crate::executor::execute_into_builder; +use crate::session::ArraySession; use crate::validity::Validity; +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + fn chunked_array() -> ChunkedArray { ChunkedArray::try_new( vec![ @@ -42,15 +50,13 @@ fn chunked_array() -> ChunkedArray { #[test] fn builder_kernel_path_canonicalizes_primitive_chunks() { - use crate::builders::builder_with_capacity; - use crate::executor::execute_into_builder; + let mut ctx = SESSION.create_execution_ctx(); let array = chunked_array().into_array(); let dtype = array.dtype().clone(); let len = array.len(); let builder = builder_with_capacity(&dtype, len); - let mut ctx = LEGACY_SESSION.create_execution_ctx(); // Clone the array into the builder path — the test also holds `array` so refcount > 1 on // entry, which previously caused `take_slot_unchecked` to silently keep slots populated. let mut builder = execute_into_builder(array.clone(), builder, &mut ctx).unwrap(); @@ -65,8 +71,7 @@ fn builder_kernel_path_canonicalizes_primitive_chunks() { #[test] fn builder_kernel_nested_chunked_of_chunked() { - use crate::builders::builder_with_capacity; - use crate::executor::execute_into_builder; + let mut ctx = SESSION.create_execution_ctx(); let inner_1 = ChunkedArray::try_new( vec![buffer![1u64, 2].into_array(), buffer![3u64].into_array()], @@ -90,7 +95,6 @@ fn builder_kernel_nested_chunked_of_chunked() { let dtype = outer.dtype().clone(); let len = outer.len(); let builder = builder_with_capacity(&dtype, len); - let mut ctx = LEGACY_SESSION.create_execution_ctx(); let mut builder = execute_into_builder(outer, builder, &mut ctx).unwrap(); let output = builder.finish(); @@ -99,18 +103,16 @@ fn builder_kernel_nested_chunked_of_chunked() { #[test] fn builder_kernel_path_repeated_shared_chunked_dict_execution() { - use crate::builders::builder_with_capacity; - use crate::executor::execute_into_builder; + let mut expected_ctx = SESSION.create_execution_ctx(); let array = gen_dict_primitive_chunks::(8, 3, 3); let keep_alive = array.clone(); let dtype = array.dtype().clone(); let len = array.len(); - let mut expected_ctx = LEGACY_SESSION.create_execution_ctx(); let expected = array .clone() - .execute::(&mut expected_ctx) + .execute::(&mut expected_ctx) .unwrap() .into_array(); @@ -142,20 +144,20 @@ fn execute_path_repeated_shared_chunked_dict_execution() { let expected_source = gen_dict_primitive_chunks::(8, 3, 3); let mut expected_ctx = LEGACY_SESSION.create_execution_ctx(); let expected = expected_source - .execute::(&mut expected_ctx) + .execute::(&mut expected_ctx) .unwrap() .into_array(); let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); let first = array .clone() - .execute::(&mut first_ctx) + .execute::(&mut first_ctx) .unwrap() .into_array(); let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); let second = array - .execute::(&mut second_ctx) + .execute::(&mut second_ctx) .unwrap() .into_array(); @@ -167,8 +169,7 @@ fn execute_path_repeated_shared_chunked_dict_execution() { #[test] fn execute_path_nested_chunked_dict_of_dict_into_canonical() { - use crate::builders::builder_with_capacity; - + let mut ctx = LEGACY_SESSION.create_execution_ctx(); let inner_1 = gen_dict_primitive_chunks::(8, 3, 2); let inner_2 = gen_dict_primitive_chunks::(8, 3, 3); let outer = ChunkedArray::try_new( @@ -180,7 +181,6 @@ fn execute_path_nested_chunked_dict_of_dict_into_canonical() { let keep_alive = outer.clone(); let expected = { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); let mut builder = builder_with_capacity(outer.dtype(), outer.len()); inner_1 .append_to_builder(builder.as_mut(), &mut ctx) @@ -191,18 +191,13 @@ fn execute_path_nested_chunked_dict_of_dict_into_canonical() { builder.finish() }; - let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); let first = outer .clone() - .execute::(&mut first_ctx) + .execute::(&mut ctx) .unwrap() .into_array(); - let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); - let second = outer - .execute::(&mut second_ctx) - .unwrap() - .into_array(); + let second = outer.execute::(&mut ctx).unwrap().into_array(); drop(keep_alive); diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 31d475822dc..694fd25c3c6 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -254,7 +254,7 @@ impl ArrayRef { "AppendChild({i}): appending {} into builder", child )); - // TODO(perf): replace with a builder kernel registry so we don't + // TODO(joe)[7674]: replace with a builder kernel registry so we don't // need to go through the VTable append_to_builder indirection. child.append_to_builder( current_builder From 0bbeb0ab6b6d5e8bfd7bd18b0db219453a5ac48d Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 30 Apr 2026 10:44:05 +0100 Subject: [PATCH 10/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/arrays/chunked/tests.rs | 40 +++++++++--------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/vortex-array/src/arrays/chunked/tests.rs b/vortex-array/src/arrays/chunked/tests.rs index 0a8ede24b89..00d113f273a 100644 --- a/vortex-array/src/arrays/chunked/tests.rs +++ b/vortex-array/src/arrays/chunked/tests.rs @@ -103,7 +103,7 @@ fn builder_kernel_nested_chunked_of_chunked() { #[test] fn builder_kernel_path_repeated_shared_chunked_dict_execution() { - let mut expected_ctx = SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let array = gen_dict_primitive_chunks::(8, 3, 3); let keep_alive = array.clone(); @@ -112,21 +112,19 @@ fn builder_kernel_path_repeated_shared_chunked_dict_execution() { let expected = array .clone() - .execute::(&mut expected_ctx) + .execute::(&mut ctx) .unwrap() .into_array(); - let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); let first = { let builder = builder_with_capacity(&dtype, len); - let mut builder = execute_into_builder(array.clone(), builder, &mut first_ctx).unwrap(); + let mut builder = execute_into_builder(array.clone(), builder, &mut ctx).unwrap(); builder.finish() }; - let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); let second = { let builder = builder_with_capacity(&dtype, len); - let mut builder = execute_into_builder(array, builder, &mut second_ctx).unwrap(); + let mut builder = execute_into_builder(array, builder, &mut ctx).unwrap(); builder.finish() }; @@ -138,28 +136,23 @@ fn builder_kernel_path_repeated_shared_chunked_dict_execution() { #[test] fn execute_path_repeated_shared_chunked_dict_execution() { + let mut ctx = SESSION.create_execution_ctx(); let array = gen_dict_primitive_chunks::(8, 3, 3); let keep_alive = array.clone(); let expected_source = gen_dict_primitive_chunks::(8, 3, 3); - let mut expected_ctx = LEGACY_SESSION.create_execution_ctx(); let expected = expected_source - .execute::(&mut expected_ctx) + .execute::(&mut ctx) .unwrap() .into_array(); - let mut first_ctx = LEGACY_SESSION.create_execution_ctx(); let first = array .clone() - .execute::(&mut first_ctx) + .execute::(&mut ctx) .unwrap() .into_array(); - let mut second_ctx = LEGACY_SESSION.create_execution_ctx(); - let second = array - .execute::(&mut second_ctx) - .unwrap() - .into_array(); + let second = array.execute::(&mut ctx).unwrap().into_array(); drop(keep_alive); @@ -169,7 +162,7 @@ fn execute_path_repeated_shared_chunked_dict_execution() { #[test] fn execute_path_nested_chunked_dict_of_dict_into_canonical() { - let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); let inner_1 = gen_dict_primitive_chunks::(8, 3, 2); let inner_2 = gen_dict_primitive_chunks::(8, 3, 3); let outer = ChunkedArray::try_new( @@ -372,6 +365,7 @@ pub fn pack_nested_structs() { #[test] pub fn pack_nested_lists() { + let mut ctx = SESSION.create_execution_ctx(); let l1 = ListArray::try_new( buffer![1, 2, 3, 4].into_array(), buffer![0, 3].into_array(), @@ -398,17 +392,11 @@ pub fn pack_nested_lists() { let canon_values = chunked_list.unwrap().as_array().to_listview(); assert_eq!( - l1.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx()) - .unwrap(), - canon_values - .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx()) - .unwrap() + l1.execute_scalar(0, &mut ctx).unwrap(), + canon_values.execute_scalar(0, &mut ctx).unwrap() ); assert_eq!( - l2.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx()) - .unwrap(), - canon_values - .execute_scalar(1, &mut LEGACY_SESSION.create_execution_ctx()) - .unwrap() + l2.execute_scalar(0, &mut ctx).unwrap(), + canon_values.execute_scalar(1, &mut ctx).unwrap() ); } From cbafa66289fc7db3ca3d450a940e494d91946c4f Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Thu, 30 Apr 2026 11:44:04 +0100 Subject: [PATCH 11/11] fix Signed-off-by: Joe Isaacs --- vortex-array/src/optimizer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 95d3b19817d..f991dbb2c1d 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -90,7 +90,7 @@ fn try_optimize( let Some(child) = slot else { continue }; // Session kernels take precedence over the child encoding's static PARENT_RULES. - if let Some(ref array_ref) = array_ref + if let Some(array_ref) = &array_ref && let Some(plugins) = array_ref.find_reduce_parent(current_array.encoding_id(), child.encoding_id()) {