Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ mod tests {
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
#[test]
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();

// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
// the run-preserving path (true_count >= 25).
let values: Vec<i32> = [10, 20, 30, 40]
.iter()
.flat_map(|&v| std::iter::repeat_n(v, 32))
.collect();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array(), &mut ctx)?;

// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.
Expand Down
6 changes: 6 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21466,6 +21466,8 @@ pub vortex_array::ColumnarView::Constant(vortex_array::ArrayView<'a, vortex_arra

pub enum vortex_array::ExecutionStep

pub vortex_array::ExecutionStep::AppendChild(usize)

pub vortex_array::ExecutionStep::Done

pub vortex_array::ExecutionStep::ExecuteSlot(usize, vortex_array::DonePredicate)
Expand Down Expand Up @@ -22586,6 +22588,8 @@ pub struct vortex_array::ExecutionResult

impl vortex_array::ExecutionResult

pub fn vortex_array::ExecutionResult::append_child(array: impl vortex_array::IntoArray, slot_idx: usize) -> Self

pub fn vortex_array::ExecutionResult::array(&self) -> &vortex_array::ArrayRef

pub fn vortex_array::ExecutionResult::done(result: impl vortex_array::IntoArray) -> Self
Expand Down Expand Up @@ -25176,6 +25180,8 @@ pub fn vortex_session::VortexSession::create_execution_ctx(&self) -> vortex_arra

pub fn vortex_array::child_to_validity(child: &core::option::Option<vortex_array::ArrayRef>, nullability: vortex_array::dtype::Nullability) -> vortex_array::validity::Validity

pub fn vortex_array::execute_into_builder(array: vortex_array::ArrayRef, builder: alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>>

pub fn vortex_array::patches_child(patches: &vortex_array::patches::Patches, idx: usize) -> vortex_array::ArrayRef

pub fn vortex_array::patches_child_name(idx: usize) -> &'static str
Expand Down
71 changes: 53 additions & 18 deletions vortex-array/src/array/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::ArrayHash;
use crate::ArrayView;
use crate::Canonical;
use crate::ExecutionCtx;
use crate::ExecutionResult;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VTable;
Expand Down Expand Up @@ -93,6 +94,12 @@ impl ArrayRef {
&self.0
}

/// Returns a reference to the inner Arc.
#[inline(always)]
pub(crate) fn inner_mut(&mut self) -> &mut Arc<dyn DynArray> {
&mut self.0
}

/// Consumes the array reference, returning the owned backing allocation.
#[inline(always)]
pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
Expand Down Expand Up @@ -431,10 +438,13 @@ impl ArrayRef {
self.with_slots(slots)
}

/// Take a slot for executor-owned physical rewrites. This has the result that the array may
/// either be taken or cloned from the parent.
/// Take a slot for executor-owned physical rewrites.
///
/// The array can be put back with [`put_slot_unchecked`].
/// On return the produced parent has the taken slot set to `None`
/// callers must put the slot back (typically via [`put_slot_unchecked`]) before the parent is
/// returned from the execution loop.
///
/// When the `Arc` was shared this allocates a fresh parent.
///
/// # Safety
/// The caller must put back a slot with the same logical dtype and length before exposing the
Expand All @@ -443,18 +453,27 @@ impl ArrayRef {
mut self,
slot_idx: usize,
) -> VortexResult<(ArrayRef, ArrayRef)> {
let child = if let Some(inner) = Arc::get_mut(&mut self.0) {
// # Safety: ensured by the caller.
unsafe { inner.slots_mut()[slot_idx].take() }
.vortex_expect("take_slot_unchecked cannot take an absent slot")
} else {
self.slots()[slot_idx]
.as_ref()
.vortex_expect("take_slot_unchecked cannot take an absent slot")
.clone()
};
if let Some(inner) = Arc::get_mut(&mut self.0) {
// SAFETY: ensured by the caller.
let child = unsafe { inner.slots_mut()[slot_idx].take() }
.vortex_expect("take_slot_unchecked cannot take an absent slot");
return Ok((self, child));
}

// Arc is shared: clone the child out and build a fresh parent with slot_idx = None,
// bypassing encoding-level validation so the absent slot does not panic `V::validate`.
let child = self.slots()[slot_idx]
.as_ref()
.vortex_expect("take_slot_unchecked cannot take an absent slot")
.clone();

let mut new_slots = self.slots().to_vec();
new_slots[slot_idx] = None;

Ok((self, child))
// SAFETY: ensured by the caller — the None slot is either put back or driven to completion
// via the builder path before the parent escapes the executor.
let new_parent = unsafe { self.0.with_slots_unchecked(&self, new_slots) };
Ok((new_parent, child))
}

/// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive
Expand Down Expand Up @@ -532,12 +551,28 @@ impl ArrayRef {
self.0.reduce_parent(self, parent, child_idx)
}

pub(crate) fn execute_encoding(
pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
let inner = Arc::as_ptr(&self.0);
// Safety the Arc outline the DynArray function call
unsafe { (&*inner).execute(self, ctx) }
}

/// Execute a single encoding step without applying `Done`-result postconditions.
///
/// This is for the iterative executor only. It may operate on suspended executor-private
/// arrays whose slots temporarily contain `None`, so the executor itself must interpret
/// `Done`, enforce any `len`/`dtype` invariants, and transfer statistics.
pub(crate) fn execute_encoding_unchecked(
self,
ctx: &mut ExecutionCtx,
) -> VortexResult<crate::ExecutionResult> {
let inner = Arc::clone(&self.0);
inner.execute(self, ctx)
) -> VortexResult<ExecutionResult> {
let inner = Arc::as_ptr(&self.0);
// Safety the Arc outline the DynArray function call
let inner = unsafe { &*inner };
// SAFETY: `inner` points at the allocation owned by `self.0`. `self` stays alive for the
// duration of the call, so the pointee remains valid. Avoiding an extra `Arc` clone here
// preserves uniqueness so execute-time metadata cursors can use `Arc::get_mut`.
unsafe { inner.execute_unchecked(self, ctx) }
}

pub fn execute_parent(
Expand Down
91 changes: 84 additions & 7 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
/// Returns the array as a reference to a generic [`Any`] trait object.
fn as_any(&self) -> &dyn Any;

/// Returns the array as a mutable reference to a generic [`Any`] trait object.
fn as_any_mut(&mut self) -> &mut dyn Any;

/// Converts an owned array allocation into an owned [`Any`] allocation for downcasting.
fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync>;

Expand Down Expand Up @@ -143,6 +146,24 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
/// Returns a new array with the given slots.
fn with_slots(&self, this: ArrayRef, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef>;

/// Returns a new array with the given slots, bypassing encoding-level validation.
///
/// Used by the executor to temporarily carry an array that has had one of its child slots
/// taken out (leaving `None`) without panicking `V::validate`. The caller must ensure the
/// missing slot is filled back in (via `put_slot_unchecked`) or driven to completion by the
/// builder path before the array becomes externally observable.
///
/// # Safety
///
/// The array returned may have slots whose content does not match the encoding's normal
/// invariants. Callers must re-establish those invariants before handing the array to
/// anything outside the executor.
unsafe fn with_slots_unchecked(
&self,
this: &ArrayRef,
slots: Vec<Option<ArrayRef>>,
) -> ArrayRef;

/// Attempt to reduce the array to a simpler representation.
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>>;

Expand All @@ -155,8 +176,30 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
) -> VortexResult<Option<ArrayRef>>;

/// Execute the array by taking a single encoding-specific execution step.
///
/// This is the checked entry point. If the encoding reports
/// [`ExecutionStep::Done`](crate::ExecutionStep::Done), implementations must validate that the
/// returned array preserves this array's logical `len` and `dtype`, and must transfer this
/// array's statistics to the returned array.
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult>;

/// Execute the array by taking a single encoding-specific execution step without applying
/// `Done`-result postconditions.
///
/// This exists for the iterative executor, which may call into `execute` on suspended
/// executor-private arrays whose slots temporarily contain `None`. In that mode the executor
/// itself is responsible for deciding when a `Done` result represents a real logical array,
/// enforcing any `len`/`dtype` invariants, and transferring statistics.
///
/// # Safety
/// The `array` returned should have it's `DType` and len checked
/// (optionally it should have its stats propagated from `this`).
unsafe fn execute_unchecked(
&self,
this: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ExecutionResult>;

/// Attempt to execute the parent of this array.
fn execute_parent(
&self,
Expand Down Expand Up @@ -203,6 +246,10 @@ impl<V: VTable> DynArray for ArrayInner<V> {
self
}

fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync> {
self
}
Expand Down Expand Up @@ -387,6 +434,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
.into_array())
}

unsafe fn with_slots_unchecked(
&self,
this: &ArrayRef,
slots: Vec<Option<ArrayRef>>,
) -> ArrayRef {
// SAFETY: we intentionally skip `V::validate` here. Caller guarantees that the resulting
// array is either repaired or not externally observed.
let inner = unsafe {
ArrayInner::<V>::from_data_unchecked(
self.vtable.clone(),
this.dtype().clone(),
self.len,
self.data.clone(),
slots,
self.stats.clone(),
)
};
ArrayRef::from_inner(std::sync::Arc::new(inner))
}

fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
let Some(reduced) = V::reduce(view)? else {
Expand Down Expand Up @@ -437,12 +504,8 @@ impl<V: VTable> DynArray for ArrayInner<V> {
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
let len = this.len();
let dtype = this.dtype().clone();
let stats = this.statistics().to_owned();

let typed = Array::<V>::try_from_array_ref(this)
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
.vortex_expect("Failed to downcast array for execute");
let result = V::execute(typed, ctx)?;
let stats = this.statistics().to_array_stats();
let result = unsafe { self.execute_unchecked(this, ctx)? };

if matches!(result.step(), ExecutionStep::Done) {
if cfg!(debug_assertions) {
Expand All @@ -458,12 +521,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
);
}

result.array().statistics().set_iter(stats.into_iter());
result
.array()
.statistics()
.set_iter(crate::stats::StatsSet::from(stats).into_iter());
}

Ok(result)
}

unsafe fn execute_unchecked(
&self,
this: ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<ExecutionResult> {
let typed = Array::<V>::try_from_array_ref(this)
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
.vortex_expect("Failed to downcast array for execute");
V::execute(typed, ctx)
}

fn execute_parent(
&self,
this: &ArrayRef,
Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/array/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ impl<V: VTable> Array<V> {
&self.downcast_inner().data
}

/// Try to fetch a mut ref to the inner ArrayData.
pub fn data_mut(&mut self) -> Option<&mut V::ArrayData> {
let m = self.inner.inner_mut();
let inner = Arc::get_mut(m)?;
let array_inner = inner.as_any_mut().downcast_mut::<ArrayInner<V>>();
Some(&mut array_inner?.data)
}

/// Returns the full typed array construction parts if this handle owns the allocation.
pub fn try_into_parts(self) -> Result<ArrayParts<V>, Self> {
let Self { inner, _phantom } = self;
Expand Down
Loading
Loading