Skip to content

Commit 2bdb707

Browse files
committed
fix
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent c0c6fb9 commit 2bdb707

10 files changed

Lines changed: 611 additions & 178 deletions

File tree

encodings/runend/src/compute/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,14 @@ mod tests {
158158
/// Filter unwrap one layer at a time so RunEnd's FilterKernel can fire.
159159
#[test]
160160
fn filter_sliced_run_end_preserves_encoding() -> VortexResult<()> {
161+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
162+
161163
// 4 runs of 32 each = 128 rows. Large enough that FilterKernel takes
162164
// the run-preserving path (true_count >= 25).
163165
let values: Vec<i32> = [10, 20, 30, 40]
164166
.iter()
165167
.flat_map(|&v| std::iter::repeat_n(v, 32))
166168
.collect();
167-
let mut ctx = LEGACY_SESSION.create_execution_ctx();
168169
let arr = RunEnd::encode(PrimitiveArray::from_iter(values).into_array(), &mut ctx)?;
169170

170171
// Slice off the first 16 rows. Slice(RunEnd), 112 rows, 4 runs.

vortex-array/public-api.lock

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21466,6 +21466,8 @@ pub vortex_array::ColumnarView::Constant(vortex_array::ArrayView<'a, vortex_arra
2146621466

2146721467
pub enum vortex_array::ExecutionStep
2146821468

21469+
pub vortex_array::ExecutionStep::AppendChild(usize)
21470+
2146921471
pub vortex_array::ExecutionStep::Done
2147021472

2147121473
pub vortex_array::ExecutionStep::ExecuteSlot(usize, vortex_array::DonePredicate)
@@ -22586,6 +22588,8 @@ pub struct vortex_array::ExecutionResult
2258622588

2258722589
impl vortex_array::ExecutionResult
2258822590

22591+
pub fn vortex_array::ExecutionResult::append_child(array: impl vortex_array::IntoArray, slot_idx: usize) -> Self
22592+
2258922593
pub fn vortex_array::ExecutionResult::array(&self) -> &vortex_array::ArrayRef
2259022594

2259122595
pub fn vortex_array::ExecutionResult::done(result: impl vortex_array::IntoArray) -> Self
@@ -25176,6 +25180,8 @@ pub fn vortex_session::VortexSession::create_execution_ctx(&self) -> vortex_arra
2517625180

2517725181
pub fn vortex_array::child_to_validity(child: &core::option::Option<vortex_array::ArrayRef>, nullability: vortex_array::dtype::Nullability) -> vortex_array::validity::Validity
2517825182

25183+
pub fn vortex_array::execute_into_builder(array: vortex_array::ArrayRef, builder: alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::builders::ArrayBuilder>>
25184+
2517925185
pub fn vortex_array::patches_child(patches: &vortex_array::patches::Patches, idx: usize) -> vortex_array::ArrayRef
2518025186

2518125187
pub fn vortex_array::patches_child_name(idx: usize) -> &'static str

vortex-array/src/array/erased.rs

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::ArrayHash;
2323
use crate::ArrayView;
2424
use crate::Canonical;
2525
use crate::ExecutionCtx;
26+
use crate::ExecutionResult;
2627
use crate::IntoArray;
2728
use crate::LEGACY_SESSION;
2829
use crate::VTable;
@@ -93,6 +94,12 @@ impl ArrayRef {
9394
&self.0
9495
}
9596

97+
/// Returns a reference to the inner Arc.
98+
#[inline(always)]
99+
pub(crate) fn inner_mut(&mut self) -> &mut Arc<dyn DynArray> {
100+
&mut self.0
101+
}
102+
96103
/// Consumes the array reference, returning the owned backing allocation.
97104
#[inline(always)]
98105
pub(crate) fn into_inner(self) -> Arc<dyn DynArray> {
@@ -431,10 +438,12 @@ impl ArrayRef {
431438
self.with_slots(slots)
432439
}
433440

434-
/// Take a slot for executor-owned physical rewrites. This has the result that the array may
435-
/// either be taken or cloned from the parent.
441+
/// Take a slot for executor-owned physical rewrites.
436442
///
437-
/// The array can be put back with [`put_slot_unchecked`].
443+
/// On return the produced parent has the taken slot set to `None`, regardless of whether the
444+
/// `Arc` was unique. When the `Arc` was shared this allocates a fresh parent via
445+
/// [`DynArray::with_slots_unchecked`], which bypasses `V::validate` — callers must put the
446+
/// slot back (typically via [`put_slot_unchecked`]) before the parent is observed externally.
438447
///
439448
/// # Safety
440449
/// The caller must put back a slot with the same logical dtype and length before exposing the
@@ -443,18 +452,27 @@ impl ArrayRef {
443452
mut self,
444453
slot_idx: usize,
445454
) -> VortexResult<(ArrayRef, ArrayRef)> {
446-
let child = if let Some(inner) = Arc::get_mut(&mut self.0) {
447-
// # Safety: ensured by the caller.
448-
unsafe { inner.slots_mut()[slot_idx].take() }
449-
.vortex_expect("take_slot_unchecked cannot take an absent slot")
450-
} else {
451-
self.slots()[slot_idx]
452-
.as_ref()
453-
.vortex_expect("take_slot_unchecked cannot take an absent slot")
454-
.clone()
455-
};
455+
if let Some(inner) = Arc::get_mut(&mut self.0) {
456+
// SAFETY: ensured by the caller.
457+
let child = unsafe { inner.slots_mut()[slot_idx].take() }
458+
.vortex_expect("take_slot_unchecked cannot take an absent slot");
459+
return Ok((self, child));
460+
}
461+
462+
// Arc is shared: clone the child out and build a fresh parent with slot_idx = None,
463+
// bypassing encoding-level validation so the absent slot does not panic `V::validate`.
464+
let child = self.slots()[slot_idx]
465+
.as_ref()
466+
.vortex_expect("take_slot_unchecked cannot take an absent slot")
467+
.clone();
468+
469+
let mut new_slots = self.slots().to_vec();
470+
new_slots[slot_idx] = None;
456471

457-
Ok((self, child))
472+
// SAFETY: ensured by the caller — the None slot is either put back or driven to completion
473+
// via the builder path before the parent escapes the executor.
474+
let new_parent = unsafe { self.0.with_slots_unchecked(&self, new_slots) };
475+
Ok((new_parent, child))
458476
}
459477

460478
/// Puts an array into `slot_idx` by either, cloning the inner array if the Arc is not exclusive
@@ -532,14 +550,27 @@ impl ArrayRef {
532550
self.0.reduce_parent(self, parent, child_idx)
533551
}
534552

535-
pub(crate) fn execute_encoding(
536-
self,
537-
ctx: &mut ExecutionCtx,
538-
) -> VortexResult<crate::ExecutionResult> {
553+
pub(crate) fn execute_encoding(self, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
539554
let inner = Arc::clone(&self.0);
540555
inner.execute(self, ctx)
541556
}
542557

558+
/// Execute a single encoding step without applying `Done`-result postconditions.
559+
///
560+
/// This is for the iterative executor only. It may operate on suspended executor-private
561+
/// arrays whose slots temporarily contain `None`, so the executor itself must interpret
562+
/// `Done`, enforce any `len`/`dtype` invariants, and transfer statistics.
563+
pub(crate) fn execute_encoding_unchecked(
564+
self,
565+
ctx: &mut ExecutionCtx,
566+
) -> VortexResult<ExecutionResult> {
567+
let inner = Arc::as_ptr(&self.0);
568+
// SAFETY: `inner` points at the allocation owned by `self.0`. `self` stays alive for the
569+
// duration of the call, so the pointee remains valid. Avoiding an extra `Arc` clone here
570+
// preserves uniqueness so execute-time metadata cursors can use `Arc::get_mut`.
571+
unsafe { (&*inner).execute_unchecked(self, ctx) }
572+
}
573+
543574
pub fn execute_parent(
544575
&self,
545576
parent: &ArrayRef,

vortex-array/src/array/mod.rs

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
5656
/// Returns the array as a reference to a generic [`Any`] trait object.
5757
fn as_any(&self) -> &dyn Any;
5858

59+
/// Returns the array as a mutable reference to a generic [`Any`] trait object.
60+
fn as_any_mut(&mut self) -> &mut dyn Any;
61+
5962
/// Converts an owned array allocation into an owned [`Any`] allocation for downcasting.
6063
fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync>;
6164

@@ -143,6 +146,24 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
143146
/// Returns a new array with the given slots.
144147
fn with_slots(&self, this: ArrayRef, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef>;
145148

149+
/// Returns a new array with the given slots, bypassing encoding-level validation.
150+
///
151+
/// Used by the executor to temporarily carry an array that has had one of its child slots
152+
/// taken out (leaving `None`) without panicking `V::validate`. The caller must ensure the
153+
/// missing slot is filled back in (via `put_slot_unchecked`) or driven to completion by the
154+
/// builder path before the array becomes externally observable.
155+
///
156+
/// # Safety
157+
///
158+
/// The array returned may have slots whose content does not match the encoding's normal
159+
/// invariants. Callers must re-establish those invariants before handing the array to
160+
/// anything outside the executor.
161+
unsafe fn with_slots_unchecked(
162+
&self,
163+
this: &ArrayRef,
164+
slots: Vec<Option<ArrayRef>>,
165+
) -> ArrayRef;
166+
146167
/// Attempt to reduce the array to a simpler representation.
147168
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>>;
148169

@@ -155,8 +176,26 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
155176
) -> VortexResult<Option<ArrayRef>>;
156177

157178
/// Execute the array by taking a single encoding-specific execution step.
179+
///
180+
/// This is the checked entry point. If the encoding reports
181+
/// [`ExecutionStep::Done`](crate::ExecutionStep::Done), implementations must validate that the
182+
/// returned array preserves this array's logical `len` and `dtype`, and must transfer this
183+
/// array's statistics to the returned array.
158184
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult>;
159185

186+
/// Execute the array by taking a single encoding-specific execution step without applying
187+
/// `Done`-result postconditions.
188+
///
189+
/// This exists for the iterative executor, which may call into `execute` on suspended
190+
/// executor-private arrays whose slots temporarily contain `None`. In that mode the executor
191+
/// itself is responsible for deciding when a `Done` result represents a real logical array,
192+
/// enforcing any `len`/`dtype` invariants, and transferring statistics.
193+
fn execute_unchecked(
194+
&self,
195+
this: ArrayRef,
196+
ctx: &mut ExecutionCtx,
197+
) -> VortexResult<ExecutionResult>;
198+
160199
/// Attempt to execute the parent of this array.
161200
fn execute_parent(
162201
&self,
@@ -203,6 +242,10 @@ impl<V: VTable> DynArray for ArrayInner<V> {
203242
self
204243
}
205244

245+
fn as_any_mut(&mut self) -> &mut dyn Any {
246+
self
247+
}
248+
206249
fn into_any_arc(self: std::sync::Arc<Self>) -> std::sync::Arc<dyn Any + Send + Sync> {
207250
self
208251
}
@@ -387,6 +430,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
387430
.into_array())
388431
}
389432

433+
unsafe fn with_slots_unchecked(
434+
&self,
435+
this: &ArrayRef,
436+
slots: Vec<Option<ArrayRef>>,
437+
) -> ArrayRef {
438+
// SAFETY: we intentionally skip `V::validate` here. Caller guarantees that the resulting
439+
// array is either repaired or not externally observed.
440+
let inner = unsafe {
441+
ArrayInner::<V>::from_data_unchecked(
442+
self.vtable.clone(),
443+
this.dtype().clone(),
444+
self.len,
445+
self.data.clone(),
446+
slots,
447+
self.stats.clone(),
448+
)
449+
};
450+
ArrayRef::from_inner(std::sync::Arc::new(inner))
451+
}
452+
390453
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
391454
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
392455
let Some(reduced) = V::reduce(view)? else {
@@ -437,12 +500,8 @@ impl<V: VTable> DynArray for ArrayInner<V> {
437500
fn execute(&self, this: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
438501
let len = this.len();
439502
let dtype = this.dtype().clone();
440-
let stats = this.statistics().to_owned();
441-
442-
let typed = Array::<V>::try_from_array_ref(this)
443-
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
444-
.vortex_expect("Failed to downcast array for execute");
445-
let result = V::execute(typed, ctx)?;
503+
let stats = this.statistics().to_array_stats();
504+
let result = self.execute_unchecked(this, ctx)?;
446505

447506
if matches!(result.step(), ExecutionStep::Done) {
448507
if cfg!(debug_assertions) {
@@ -458,12 +517,26 @@ impl<V: VTable> DynArray for ArrayInner<V> {
458517
);
459518
}
460519

461-
result.array().statistics().set_iter(stats.into_iter());
520+
result
521+
.array()
522+
.statistics()
523+
.set_iter(crate::stats::StatsSet::from(stats).into_iter());
462524
}
463525

464526
Ok(result)
465527
}
466528

529+
fn execute_unchecked(
530+
&self,
531+
this: ArrayRef,
532+
ctx: &mut ExecutionCtx,
533+
) -> VortexResult<ExecutionResult> {
534+
let typed = Array::<V>::try_from_array_ref(this)
535+
.map_err(|_| vortex_err!("Failed to downcast array for execute"))
536+
.vortex_expect("Failed to downcast array for execute");
537+
V::execute(typed, ctx)
538+
}
539+
467540
fn execute_parent(
468541
&self,
469542
this: &ArrayRef,

vortex-array/src/array/typed.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,14 @@ impl<V: VTable> Array<V> {
280280
&self.downcast_inner().data
281281
}
282282

283+
/// Try to fetch a mut ref to the inner ArrayData.
284+
pub fn data_mut(&mut self) -> Option<&mut V::ArrayData> {
285+
let m = self.inner.inner_mut();
286+
let inner = Arc::get_mut(m)?;
287+
let array_inner = inner.as_any_mut().downcast_mut::<ArrayInner<V>>();
288+
Some(&mut array_inner?.data)
289+
}
290+
283291
/// Returns the full typed array construction parts if this handle owns the allocation.
284292
pub fn try_into_parts(self) -> Result<ArrayParts<V>, Self> {
285293
let Self { inner, _phantom } = self;

vortex-array/src/arrays/chunked/array.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub(super) const CHUNKS_OFFSET: usize = 1;
3737
#[derive(Clone, Debug)]
3838
pub struct ChunkedData {
3939
pub(super) chunk_offsets: Vec<usize>,
40+
/// This is used to find the next child to execute when in executing into a builder.
41+
pub(super) next_builder_slot: usize,
4042
}
4143

4244
impl Display for ChunkedData {
@@ -114,6 +116,13 @@ pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
114116
impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
115117

116118
impl ChunkedData {
119+
pub(super) fn new(chunk_offsets: Vec<usize>) -> Self {
120+
Self {
121+
chunk_offsets,
122+
next_builder_slot: CHUNKS_OFFSET,
123+
}
124+
}
125+
117126
pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
118127
let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
119128
chunk_offsets.push(0);
@@ -159,22 +168,35 @@ impl ChunkedData {
159168
}
160169

161170
impl Array<Chunked> {
171+
pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self {
172+
if let Some(data) = self.data_mut() {
173+
data.next_builder_slot = next_builder_slot;
174+
return self;
175+
}
176+
// This is the slow path that will be hit at most once per execution since the second one
177+
// *MUST* have execlusive access due to this copy.
178+
let stats = self.statistics().to_owned();
179+
let mut data = self.data().clone();
180+
data.next_builder_slot = next_builder_slot;
181+
// SAFETY: we only modified next_builder_slot which doesn't affect array invariants.
182+
unsafe {
183+
Array::from_parts_unchecked(
184+
ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data)
185+
.with_slots(self.slots().to_vec()),
186+
)
187+
}
188+
.with_stats_set(stats)
189+
}
190+
162191
/// Constructs a new `ChunkedArray`.
163192
pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
164193
ChunkedData::validate(&chunks, &dtype)?;
165194
let len = chunks.iter().map(|chunk| chunk.len()).sum();
166195
let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
167196
Ok(unsafe {
168197
Array::from_parts_unchecked(
169-
ArrayParts::new(
170-
Chunked,
171-
dtype,
172-
len,
173-
ChunkedData {
174-
chunk_offsets: chunk_offsets.clone(),
175-
},
176-
)
177-
.with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
198+
ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
199+
.with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
178200
)
179201
})
180202
}
@@ -238,15 +260,8 @@ impl Array<Chunked> {
238260
let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
239261
unsafe {
240262
Array::from_parts_unchecked(
241-
ArrayParts::new(
242-
Chunked,
243-
dtype,
244-
len,
245-
ChunkedData {
246-
chunk_offsets: chunk_offsets.clone(),
247-
},
248-
)
249-
.with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
263+
ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
264+
.with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
250265
)
251266
}
252267
}

0 commit comments

Comments
 (0)