diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 8ee3c97f17b..a6f622e85df 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -19042,6 +19042,10 @@ pub fn vortex_array::validity::Validity::union_nullability(self, nullability: vo impl vortex_array::validity::Validity +pub fn vortex_array::validity::Validity::concat(validities: alloc::vec::Vec<(vortex_array::validity::Validity, usize)>) -> core::option::Option + +impl vortex_array::validity::Validity + pub fn vortex_array::validity::Validity::execute(self, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult impl vortex_array::validity::Validity @@ -21628,6 +21632,10 @@ pub fn vortex_array::Array::project(&self, project pub fn vortex_array::Array::remove_column(&self, name: impl core::convert::Into) -> core::option::Option<(Self, vortex_array::ArrayRef)> +pub fn vortex_array::Array::remove_column_owned(&self, name: impl core::convert::Into) -> core::option::Option<(Self, vortex_array::ArrayRef)> + +pub fn vortex_array::Array::try_concat(chunks: impl core::iter::traits::collect::IntoIterator) -> vortex_error::VortexResult where T: core::borrow::Borrow> + pub fn vortex_array::Array::try_from_iter, A: vortex_array::IntoArray, T: core::iter::traits::collect::IntoIterator>(iter: T) -> vortex_error::VortexResult pub fn vortex_array::Array::try_from_iter_with_validity, A: vortex_array::IntoArray, T: core::iter::traits::collect::IntoIterator>(iter: T, validity: vortex_array::validity::Validity) -> vortex_error::VortexResult @@ -21636,15 +21644,11 @@ pub fn vortex_array::Array::try_new(names: vortex_ pub fn vortex_array::Array::try_new_with_dtype(fields: impl core::convert::Into>, dtype: vortex_array::dtype::StructFields, length: usize, validity: vortex_array::validity::Validity) -> vortex_error::VortexResult -impl vortex_array::Array - -pub fn vortex_array::Array::into_record_batch_with_schema(self, schema: impl core::convert::AsRef) -> vortex_error::VortexResult +pub fn vortex_array::Array::with_column(&self, name: impl core::convert::Into, array: vortex_array::ArrayRef) -> vortex_error::VortexResult impl vortex_array::Array -pub fn vortex_array::Array::remove_column_owned(&self, name: impl core::convert::Into) -> core::option::Option<(Self, vortex_array::ArrayRef)> - -pub fn vortex_array::Array::with_column(&self, name: impl core::convert::Into, array: vortex_array::ArrayRef) -> vortex_error::VortexResult +pub fn vortex_array::Array::into_record_batch_with_schema(self, schema: impl core::convert::AsRef) -> vortex_error::VortexResult impl vortex_array::Array diff --git a/vortex-array/src/arrays/chunked/vtable/canonical.rs b/vortex-array/src/arrays/chunked/vtable/canonical.rs index 0cae05a78d4..42e0b5d2073 100644 --- a/vortex-array/src/arrays/chunked/vtable/canonical.rs +++ b/vortex-array/src/arrays/chunked/vtable/canonical.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use itertools::Itertools as _; use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -18,13 +19,11 @@ use crate::arrays::StructArray; use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::listview::ListViewArrayExt; use crate::arrays::listview::ListViewRebuildMode; -use crate::arrays::struct_::StructArrayExt; use crate::builders::builder_with_capacity_in; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; -use crate::dtype::StructFields; use crate::memory::HostAllocatorExt; use crate::validity::Validity; @@ -41,9 +40,8 @@ pub(super) fn _canonicalize( let owned_chunks: Vec = array.iter_chunks().cloned().collect(); Ok(match array.dtype() { - DType::Struct(struct_dtype, _) => { - let struct_array = - pack_struct_chunks(&owned_chunks, array.array().validity()?, struct_dtype, ctx)?; + DType::Struct(..) => { + let struct_array = pack_struct_chunks(owned_chunks, ctx)?; Canonical::Struct(struct_array) } DType::List(elem_dtype, _) => Canonical::List(swizzle_list_chunks( @@ -64,36 +62,11 @@ pub(super) fn _canonicalize( /// field is a [`ChunkedArray`]. /// /// The caller guarantees there are at least 2 chunks. -fn pack_struct_chunks( - chunks: &[ArrayRef], - validity: Validity, - struct_dtype: &StructFields, - ctx: &mut ExecutionCtx, -) -> VortexResult { - let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let mut field_arrays = Vec::new(); - - let executed_chunks: Vec = chunks - .iter() - .map(|c| c.clone().execute::(ctx)) - .collect::>()?; - - for (field_idx, field_dtype) in struct_dtype.fields().enumerate() { - let mut field_chunks = Vec::with_capacity(chunks.len()); - for struct_array in &executed_chunks { - let field = struct_array.unmasked_field(field_idx).clone(); - field_chunks.push(field); - } - - // SAFETY: field_chunks are extracted from valid StructArrays with matching dtypes. - // Each chunk's field array is guaranteed to be valid for field_dtype. - let field_array = unsafe { ChunkedArray::new_unchecked(field_chunks, field_dtype.clone()) }; - field_arrays.push(field_array.into_array()); - } - - // SAFETY: field_arrays are built from corresponding chunks of same length, dtypes match by - // construction. - Ok(unsafe { StructArray::new_unchecked(field_arrays, struct_dtype.clone(), len, validity) }) +fn pack_struct_chunks(chunks: Vec, ctx: &mut ExecutionCtx) -> VortexResult { + chunks + .into_iter() + .map(|c| c.execute::(ctx)) + .process_results(|iter| StructArray::try_concat(iter))? } /// Packs [`ListViewArray`]s together into a chunked `ListViewArray`. diff --git a/vortex-array/src/arrays/chunked/vtable/validity.rs b/vortex-array/src/arrays/chunked/vtable/validity.rs index 5f5c11c7832..265c2843ee1 100644 --- a/vortex-array/src/arrays/chunked/vtable/validity.rs +++ b/vortex-array/src/arrays/chunked/vtable/validity.rs @@ -4,52 +4,24 @@ use itertools::Itertools; use vortex_error::VortexResult; -use crate::IntoArray; use crate::array::ArrayView; use crate::array::ValidityVTable; use crate::arrays::Chunked; -use crate::arrays::ChunkedArray; use crate::arrays::chunked::ChunkedArrayExt; -use crate::dtype::DType; -use crate::dtype::Nullability; use crate::validity::Validity; impl ValidityVTable for Chunked { fn validity(array: ArrayView<'_, Chunked>) -> VortexResult { - let validities: Vec = - array.chunks().iter().map(|c| c.validity()).try_collect()?; + let validities = array + .chunks() + .iter() + .map(|chunk| chunk.validity().map(|v| (v, chunk.len()))) + .try_collect()?; + let Some(validity) = Validity::concat(validities) else { + // If there are no chunks: + return Ok(array.dtype().nullability().into()); + }; - match validities.first() { - // If there are no chunks, return the array's dtype nullability - None => return Ok(array.dtype().nullability().into()), - // If all chunks have the same non-array validity, return that validity directly - // We skip Validity::Array since equality is very expensive. - Some(first) if !matches!(first, Validity::Array(_)) => { - let target = std::mem::discriminant(first); - if validities - .iter() - .all(|v| std::mem::discriminant(v) == target) - { - return Ok(first.clone()); - } - } - _ => { - // Array validity or mixed validities, proceed to build the validity array - } - } - - Ok(Validity::Array( - unsafe { - ChunkedArray::new_unchecked( - validities - .into_iter() - .zip(array.iter_chunks()) - .map(|(v, chunk)| v.to_array(chunk.len())) - .collect(), - DType::Bool(Nullability::NonNullable), - ) - } - .into_array(), - )) + Ok(validity) } } diff --git a/vortex-array/src/arrays/struct_/array.rs b/vortex-array/src/arrays/struct_/array.rs index 8e3192728e0..8e0a2e42e49 100644 --- a/vortex-array/src/arrays/struct_/array.rs +++ b/vortex-array/src/arrays/struct_/array.rs @@ -1,11 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::borrow::Borrow; use std::iter::once; use std::sync::Arc; +use itertools::Itertools; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use vortex_error::vortex_err; use crate::ArrayRef; @@ -16,6 +19,7 @@ use crate::array::EmptyArrayData; use crate::array::TypedArrayRef; use crate::array::child_to_validity; use crate::array::validity_to_child; +use crate::arrays::ChunkedArray; use crate::arrays::Struct; use crate::dtype::DType; use crate::dtype::FieldName; @@ -430,9 +434,7 @@ impl Array { }; Some((new_array, field)) } -} -impl Array { pub fn with_column(&self, name: impl Into, array: ArrayRef) -> VortexResult { let name = name.into(); let struct_dtype = self.struct_fields(); @@ -453,4 +455,70 @@ impl Array { pub fn remove_column_owned(&self, name: impl Into) -> Option<(Self, ArrayRef)> { self.remove_column(name) } + + pub fn try_concat(chunks: impl IntoIterator) -> VortexResult + where + T: Borrow>, + { + let mut it = chunks.into_iter(); + let Some(first) = it.next() else { + vortex_bail!("cannot concat empty iterator of arrays"); + }; + let first_dtype = first.borrow().dtype().clone(); + let struct_fields = first_dtype.as_struct_fields().clone(); + let names = struct_fields.names(); + + let it = [first].into_iter().chain(it); + let (field_arrays_per_chunk, validities) = it + .map(|chunk| { + let chunk = chunk.borrow(); + if &first_dtype != chunk.dtype() { + vortex_bail!( + "cannot concatenate struct arrays with differing dtypes: {}, {}", + first_dtype, + chunk.dtype(), + ); + } + + let fields = names + .iter() + .map(|name| { + chunk + .unmasked_field_by_name(name) + .vortex_expect("field exists because it is in dtype") + .clone() + }) + .collect::>(); + let validity = chunk.validity()?; + + Ok((fields, (validity, chunk.len()))) + }) + .process_results(|iter| iter.unzip::<_, _, Vec<_>, Vec<_>>())?; + + let field_arrays = struct_fields + .fields() + .enumerate() + .map(|(i, dtype)| { + // SAFETY: We establish above that every array has the same type. + let chunks = field_arrays_per_chunk + .iter() + .map(|x| x[i].clone()) + .collect(); + unsafe { ChunkedArray::new_unchecked(chunks, dtype) }.into_array() + }) + .collect::>(); + let len = validities.iter().map(|(_v, len)| len).sum(); + let validity = Validity::concat(validities).vortex_expect("verified non-empty above"); + + // SAFETY: + // + // 1. The field arrays, by construction, have the type specified in fields. + // + // 2. Each Array has a valid len, therefore the sum of those lens should be valid + // for the concatenation of each field. + // + // 3. Each Array has a valid validity, so the concatenation of those validities has + // the correct length and dtype harmony. + Ok(unsafe { Array::::new_unchecked(field_arrays, struct_fields, len, validity) }) + } } diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 66a92f66e73..0389566bbba 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -6,6 +6,7 @@ use std::fmt::Debug; use std::ops::Range; +use itertools::Itertools as _; use vortex_buffer::BitBuffer; use vortex_error::VortexExpect as _; use vortex_error::VortexResult; @@ -22,6 +23,7 @@ use crate::IntoArray; use crate::LEGACY_SESSION; use crate::VortexSessionExecute; use crate::arrays::BoolArray; +use crate::arrays::ChunkedArray; use crate::arrays::ConstantArray; use crate::arrays::scalar_fn::ScalarFnFactoryExt; use crate::builtins::ArrayBuiltins; @@ -447,6 +449,45 @@ impl From<&Nullability> for Validity { } } +impl Validity { + /// Concatenate one or more validities together. + /// + /// Returns None if the vector is empty. + pub fn concat(validities: Vec<(Validity, usize)>) -> Option { + let mut validity_kinds = validities + .iter() + .map(|(v, _)| std::mem::discriminant(v)) + .unique(); + let validity_kind = validity_kinds.next()?; + if validity_kinds.next().is_none() { + // If there is only one kind of validity and its not Validity::Array, avoid constructing + // a Validity::Array. + if validity_kind == std::mem::discriminant(&Validity::AllValid) { + return Some(Validity::AllValid); + } + if validity_kind == std::mem::discriminant(&Validity::AllInvalid) { + return Some(Validity::AllInvalid); + } + if validity_kind == std::mem::discriminant(&Validity::NonNullable) { + return Some(Validity::NonNullable); + } + } + + Some(Validity::Array( + unsafe { + ChunkedArray::new_unchecked( + validities + .into_iter() + .map(|(v, len)| v.to_array(len)) + .collect(), + DType::Bool(Nullability::NonNullable), + ) + } + .into_array(), + )) + } +} + impl Validity { pub fn from_bit_buffer(buffer: BitBuffer, nullability: Nullability) -> Self { if buffer.true_count() == buffer.len() {