Skip to content

Commit b70fa7e

Browse files
committed
fix
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent c0c6fb9 commit b70fa7e

10 files changed

Lines changed: 618 additions & 184 deletions

File tree

encodings/runend/src/compute/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,14 @@ mod tests {
158158
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
159159
#[test]
160160
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
161+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
162+
161163
// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
162164
// the run-preserving path (true_count >= 25).
163165
let values: Vec<i32> = [10, 20, 30, 40]
164166
.iter()
165167
.flat_map(|&v| std::iter::repeat_n(v, 32))
166168
.collect();
167-
let mut ctx = LEGACY_SESSION.create_execution_ctx();
168169
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array(), &mut ctx)?;
169170

170171
// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.

vortex-array/public-api.lock

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21466,6 +21466,8 @@ pub vortex_array::ColumnarView::Constant(vortex_array::ArrayView<'a, vortex_arra
2146621466

2146721467
pub enum vortex_array::ExecutionStep
2146821468

21469+
pub vortex_array::ExecutionStep::AppendChild(usize)
21470+
2146921471
pub vortex_array::ExecutionStep::Done
2147021472

2147121473
pub vortex_array::ExecutionStep::ExecuteSlot(usize, vortex_array::DonePredicate)
@@ -22586,6 +22588,8 @@ pub struct vortex_array::ExecutionResult
2258622588

2258722589
impl vortex_array::ExecutionResult
2258822590

22591+
pub fn vortex_array::ExecutionResult::append_child(array: impl vortex_array::IntoArray, slot_idx: usize) -> Self
22592+
2258922593
pub fn vortex_array::ExecutionResult::array(&self) -> &vortex_array::ArrayRef
2259022594

2259122595
pub fn vortex_array::ExecutionResult::done(result: impl vortex_array::IntoArray) -> Self
@@ -25176,6 +25180,8 @@ pub fn vortex_session::VortexSession::create_execution_ctx(&self) -> vortex_arra
2517625180

2517725181
pub fn vortex_array::child_to_validity(child: &core::option::Option<vortex_array::ArrayRef>, nullability: vortex_array::dtype::Nullability) -> vortex_array::validity::Validity
2517825182

25183+
pub fn vortex_array::execute_into_builder(array: vortex_array::ArrayRef, builder: alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>>
25184+
2517925185
pub fn vortex_array::patches_child(patches: &vortex_array::patches::Patches, idx: usize) -> vortex_array::ArrayRef
2518025186

2518125187
pub fn vortex_array::patches_child_name(idx: usize) -> &'static str

vortex-array/src/array/erased.rs

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::ArrayHash;
2323
use crate::ArrayView;
2424
use crate::Canonical;
2525
use crate::ExecutionCtx;
26+
use crate::ExecutionResult;
2627
use crate::IntoArray;
2728
use crate::LEGACY_SESSION;
2829
use crate::VTable;
@@ -93,6 +94,12 @@ impl ArrayRef {
9394
&self.0
9495
}
9596

97+
/// Returns a reference to the inner Arc.
98+
#[inline(always)]
99+
pub(crate) fn inner_mut(&mut self) -> &mut Arc<dyn DynArray> {
100+
&mut self.0
101+
}
102+
96103
/// Consumes the array reference, returning the owned backing allocation.
97104
#[inline(always)]
98105
pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
@@ -431,10 +438,13 @@ impl ArrayRef {
431438
self.with_slots(slots)
432439
}
433440

434-
/// Take a slot for executor-owned physical rewrites. This has the result that the array may
435-
/// either be taken or cloned from the parent.
441+
/// Take a slot for executor-owned physical rewrites.
436442
///
437-
/// The array can be put back with [`put_slot_unchecked`].
443+
/// On return the produced parent has the taken slot set to `None`
444+
/// callers must put the slot back (typically via [`put_slot_unchecked`]) before the parent is
445+
/// returned from the execution loop.
446+
///
447+
/// When the `Arc` was shared this allocates a fresh parent.
438448
///
439449
/// # Safety
440450
/// The caller must put back a slot with the same logical dtype and length before exposing the
@@ -443,18 +453,27 @@ impl ArrayRef {
443453
mut self,
444454
slot_idx: usize,
445455
) -> VortexResult<(ArrayRef, ArrayRef)> {
446-
let child = if let Some(inner) = Arc::get_mut(&mut self.0) {
447-
// # Safety: ensured by the caller.
448-
unsafe { inner.slots_mut()[slot_idx].take() }
449-
.vortex_expect("take_slot_unchecked cannot take an absent slot")
450-
} else {
451-
self.slots()[slot_idx]
452-
.as_ref()
453-
.vortex_expect("take_slot_unchecked cannot take an absent slot")
454-
.clone()
455-
};
456+
if let Some(inner) = Arc::get_mut(&mut self.0) {
457+
// SAFETY: ensured by the caller.
458+
let child = unsafe { inner.slots_mut()[slot_idx].take() }
459+
.vortex_expect("take_slot_unchecked cannot take an absent slot");
460+
return Ok((self, child));
461+
}
462+
463+
// Arc is shared: clone the child out and build a fresh parent with slot_idx = None,
464+
// bypassing encoding-level validation so the absent slot does not panic `V::validate`.
465+
let child = self.slots()[slot_idx]
466+
.as_ref()
467+
.vortex_expect("take_slot_unchecked cannot take an absent slot")
468+
.clone();
469+
470+
let mut new_slots = self.slots().to_vec();
471+
new_slots[slot_idx] = None;
456472

457-
Ok((self, child))
473+
// SAFETY: ensured by the caller — the None slot is either put back or driven to completion
474+
// via the builder path before the parent escapes the executor.
475+
let new_parent = unsafe { self.0.with_slots_unchecked(&self, new_slots) };
476+
Ok((new_parent, child))
458477
}
459478

460479
/// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive
@@ -532,12 +551,28 @@ impl ArrayRef {
532551
self.0.reduce_parent(self, parent, child_idx)
533552
}
534553

535-
pub(crate) fn execute_encoding(
554+
pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
555+
let inner = Arc::as_ptr(&self.0);
556+
// Safety the Arc outline the DynArray function call
557+
unsafe { (&*inner).execute(self, ctx) }
558+
}
559+
560+
/// Execute a single encoding step without applying `Done`-result postconditions.
561+
///
562+
/// This is for the iterative executor only. It may operate on suspended executor-private
563+
/// arrays whose slots temporarily contain `None`, so the executor itself must interpret
564+
/// `Done`, enforce any `len`/`dtype` invariants, and transfer statistics.
565+
pub(crate) fn execute_encoding_unchecked(
536566
self,
537567
ctx: &mut ExecutionCtx,
538-
) -> VortexResult<crate::ExecutionResult> {
539-
let inner = Arc::clone(&self.0);
540-
inner.execute(self, ctx)
568+
) -> VortexResult<ExecutionResult> {
569+
let inner = Arc::as_ptr(&self.0);
570+
// Safety the Arc outline the DynArray function call
571+
let inner = unsafe { &*inner };
572+
// SAFETY: `inner` points at the allocation owned by `self.0`. `self` stays alive for the
573+
// duration of the call, so the pointee remains valid. Avoiding an extra `Arc` clone here
574+
// preserves uniqueness so execute-time metadata cursors can use `Arc::get_mut`.
575+
unsafe { inner.execute_unchecked(self, ctx) }
541576
}
542577

543578
pub fn execute_parent(

vortex-array/src/array/mod.rs

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
5656
/// Returns the array as a reference to a generic [`Any`] trait object.
5757
fn as_any(&self) -> &dyn Any;
5858

59+
/// Returns the array as a mutable reference to a generic [`Any`] trait object.
60+
fn as_any_mut(&mut self) -> &mut dyn Any;
61+
5962
/// Converts an owned array allocation into an owned [`Any`] allocation for downcasting.
6063
fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync>;
6164

@@ -143,6 +146,24 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
143146
/// Returns a new array with the given slots.
144147
fn with_slots(&self, this: ArrayRef, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef>;
145148

149+
/// Returns a new array with the given slots, bypassing encoding-level validation.
150+
///
151+
/// Used by the executor to temporarily carry an array that has had one of its child slots
152+
/// taken out (leaving `None`) without panicking `V::validate`. The caller must ensure the
153+
/// missing slot is filled back in (via `put_slot_unchecked`) or driven to completion by the
154+
/// builder path before the array becomes externally observable.
155+
///
156+
/// # Safety
157+
///
158+
/// The array returned may have slots whose content does not match the encoding's normal
159+
/// invariants. Callers must re-establish those invariants before handing the array to
160+
/// anything outside the executor.
161+
unsafe fn with_slots_unchecked(
162+
&self,
163+
this: &ArrayRef,
164+
slots: Vec<Option<ArrayRef>>,
165+
) -> ArrayRef;
166+
146167
/// Attempt to reduce the array to a simpler representation.
147168
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>>;
148169

@@ -155,8 +176,30 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
155176
) -> VortexResult<Option<ArrayRef>>;
156177

157178
/// Execute the array by taking a single encoding-specific execution step.
179+
///
180+
/// This is the checked entry point. If the encoding reports
181+
/// [`ExecutionStep::Done`](crate::ExecutionStep::Done), implementations must validate that the
182+
/// returned array preserves this array's logical `len` and `dtype`, and must transfer this
183+
/// array's statistics to the returned array.
158184
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult>;
159185

186+
/// Execute the array by taking a single encoding-specific execution step without applying
187+
/// `Done`-result postconditions.
188+
///
189+
/// This exists for the iterative executor, which may call into `execute` on suspended
190+
/// executor-private arrays whose slots temporarily contain `None`. In that mode the executor
191+
/// itself is responsible for deciding when a `Done` result represents a real logical array,
192+
/// enforcing any `len`/`dtype` invariants, and transferring statistics.
193+
///
194+
/// # Safety
195+
/// The `array` returned should have it's `DType` and len checked
196+
/// (optionally it should have its stats propagated from `this`).
197+
unsafe fn execute_unchecked(
198+
&self,
199+
this: ArrayRef,
200+
ctx: &mut ExecutionCtx,
201+
) -> VortexResult<ExecutionResult>;
202+
160203
/// Attempt to execute the parent of this array.
161204
fn execute_parent(
162205
&self,
@@ -203,6 +246,10 @@ impl<V: VTable> DynArray for ArrayInner<V> {
203246
self
204247
}
205248

249+
fn as_any_mut(&mut self) -> &mut dyn Any {
250+
self
251+
}
252+
206253
fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync> {
207254
self
208255
}
@@ -387,6 +434,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
387434
.into_array())
388435
}
389436

437+
unsafe fn with_slots_unchecked(
438+
&self,
439+
this: &ArrayRef,
440+
slots: Vec<Option<ArrayRef>>,
441+
) -> ArrayRef {
442+
// SAFETY: we intentionally skip `V::validate` here. Caller guarantees that the resulting
443+
// array is either repaired or not externally observed.
444+
let inner = unsafe {
445+
ArrayInner::<V>::from_data_unchecked(
446+
self.vtable.clone(),
447+
this.dtype().clone(),
448+
self.len,
449+
self.data.clone(),
450+
slots,
451+
self.stats.clone(),
452+
)
453+
};
454+
ArrayRef::from_inner(std::sync::Arc::new(inner))
455+
}
456+
390457
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
391458
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
392459
let Some(reduced) = V::reduce(view)? else {
@@ -437,12 +504,8 @@ impl<V: VTable> DynArray for ArrayInner<V> {
437504
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
438505
let len = this.len();
439506
let dtype = this.dtype().clone();
440-
let stats = this.statistics().to_owned();
441-
442-
let typed = Array::<V>::try_from_array_ref(this)
443-
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
444-
.vortex_expect("Failed to downcast array for execute");
445-
let result = V::execute(typed, ctx)?;
507+
let stats = this.statistics().to_array_stats();
508+
let result = self.execute_unchecked(this, ctx)?;
446509

447510
if matches!(result.step(), ExecutionStep::Done) {
448511
if cfg!(debug_assertions) {
@@ -458,12 +521,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
458521
);
459522
}
460523

461-
result.array().statistics().set_iter(stats.into_iter());
524+
result
525+
.array()
526+
.statistics()
527+
.set_iter(crate::stats::StatsSet::from(stats).into_iter());
462528
}
463529

464530
Ok(result)
465531
}
466532

533+
fn execute_unchecked(
534+
&self,
535+
this: ArrayRef,
536+
ctx: &mut ExecutionCtx,
537+
) -> VortexResult<ExecutionResult> {
538+
let typed = Array::<V>::try_from_array_ref(this)
539+
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
540+
.vortex_expect("Failed to downcast array for execute");
541+
V::execute(typed, ctx)
542+
}
543+
467544
fn execute_parent(
468545
&self,
469546
this: &ArrayRef,

vortex-array/src/array/typed.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,14 @@ impl<V: VTable> Array<V> {
280280
&self.downcast_inner().data
281281
}
282282

283+
/// Try to fetch a mut ref to the inner ArrayData.
284+
pub fn data_mut(&mut self) -> Option<&mut V::ArrayData> {
285+
let m = self.inner.inner_mut();
286+
let inner = Arc::get_mut(m)?;
287+
let array_inner = inner.as_any_mut().downcast_mut::<ArrayInner<V>>();
288+
Some(&mut array_inner?.data)
289+
}
290+
283291
/// Returns the full typed array construction parts if this handle owns the allocation.
284292
pub fn try_into_parts(self) -> Result<ArrayParts<V>, Self> {
285293
let Self { inner, _phantom } = self;

0 commit comments

Comments
 (0)