Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19042,6 +19042,10 @@ pub fn vortex_array::validity::Validity::union_nullability(self, nullability: vo

impl vortex_array::validity::Validity

pub fn vortex_array::validity::Validity::concat(validities: alloc::vec::Vec<(vortex_array::validity::Validity, usize)>) -> core::option::Option<Self>

impl vortex_array::validity::Validity

pub fn vortex_array::validity::Validity::execute(self, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::validity::Validity>

impl vortex_array::validity::Validity
Expand Down Expand Up @@ -21628,6 +21632,10 @@ pub fn vortex_array::Array<vortex_array::arrays::Struct>::project(&self, project

pub fn vortex_array::Array<vortex_array::arrays::Struct>::remove_column(&self, name: impl core::convert::Into<vortex_array::dtype::FieldName>) -> core::option::Option<(Self, vortex_array::ArrayRef)>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::remove_column_owned(&self, name: impl core::convert::Into<vortex_array::dtype::FieldName>) -> core::option::Option<(Self, vortex_array::ArrayRef)>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::try_concat<T>(chunks: impl core::iter::traits::collect::IntoIterator<Item = T>) -> vortex_error::VortexResult<Self> where T: core::borrow::Borrow<vortex_array::Array<vortex_array::arrays::Struct>>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::try_from_iter<N: core::convert::AsRef<str>, A: vortex_array::IntoArray, T: core::iter::traits::collect::IntoIterator<Item = (N, A)>>(iter: T) -> vortex_error::VortexResult<Self>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::try_from_iter_with_validity<N: core::convert::AsRef<str>, A: vortex_array::IntoArray, T: core::iter::traits::collect::IntoIterator<Item = (N, A)>>(iter: T, validity: vortex_array::validity::Validity) -> vortex_error::VortexResult<Self>
Expand All @@ -21636,15 +21644,11 @@ pub fn vortex_array::Array<vortex_array::arrays::Struct>::try_new(names: vortex_

pub fn vortex_array::Array<vortex_array::arrays::Struct>::try_new_with_dtype(fields: impl core::convert::Into<alloc::sync::Arc<[vortex_array::ArrayRef]>>, dtype: vortex_array::dtype::StructFields, length: usize, validity: vortex_array::validity::Validity) -> vortex_error::VortexResult<Self>

impl vortex_array::Array<vortex_array::arrays::Struct>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::into_record_batch_with_schema(self, schema: impl core::convert::AsRef<arrow_schema::schema::Schema>) -> vortex_error::VortexResult<arrow_array::record_batch::RecordBatch>
pub fn vortex_array::Array<vortex_array::arrays::Struct>::with_column(&self, name: impl core::convert::Into<vortex_array::dtype::FieldName>, array: vortex_array::ArrayRef) -> vortex_error::VortexResult<Self>

impl vortex_array::Array<vortex_array::arrays::Struct>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::remove_column_owned(&self, name: impl core::convert::Into<vortex_array::dtype::FieldName>) -> core::option::Option<(Self, vortex_array::ArrayRef)>

pub fn vortex_array::Array<vortex_array::arrays::Struct>::with_column(&self, name: impl core::convert::Into<vortex_array::dtype::FieldName>, array: vortex_array::ArrayRef) -> vortex_error::VortexResult<Self>
pub fn vortex_array::Array<vortex_array::arrays::Struct>::into_record_batch_with_schema(self, schema: impl core::convert::AsRef<arrow_schema::schema::Schema>) -> vortex_error::VortexResult<arrow_array::record_batch::RecordBatch>

impl vortex_array::Array<vortex_array::arrays::VarBin>

Expand Down
43 changes: 8 additions & 35 deletions vortex-array/src/arrays/chunked/vtable/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use itertools::Itertools as _;
use vortex_buffer::Buffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
Expand All @@ -18,13 +19,11 @@ use crate::arrays::StructArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::arrays::listview::ListViewArrayExt;
use crate::arrays::listview::ListViewRebuildMode;
use crate::arrays::struct_::StructArrayExt;
use crate::builders::builder_with_capacity_in;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;
use crate::dtype::StructFields;
use crate::memory::HostAllocatorExt;
use crate::validity::Validity;

Expand All @@ -41,9 +40,8 @@ pub(super) fn _canonicalize(

let owned_chunks: Vec<ArrayRef> = array.iter_chunks().cloned().collect();
Ok(match array.dtype() {
DType::Struct(struct_dtype, _) => {
let struct_array =
pack_struct_chunks(&owned_chunks, array.array().validity()?, struct_dtype, ctx)?;
DType::Struct(..) => {
let struct_array = pack_struct_chunks(owned_chunks, ctx)?;
Canonical::Struct(struct_array)
}
DType::List(elem_dtype, _) => Canonical::List(swizzle_list_chunks(
Expand All @@ -64,36 +62,11 @@ pub(super) fn _canonicalize(
/// field is a [`ChunkedArray`].
///
/// The caller guarantees there are at least 2 chunks.
fn pack_struct_chunks(
chunks: &[ArrayRef],
validity: Validity,
struct_dtype: &StructFields,
ctx: &mut ExecutionCtx,
) -> VortexResult<StructArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut field_arrays = Vec::new();

let executed_chunks: Vec<StructArray> = chunks
.iter()
.map(|c| c.clone().execute::<StructArray>(ctx))
.collect::<VortexResult<_>>()?;

for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
let mut field_chunks = Vec::with_capacity(chunks.len());
for struct_array in &executed_chunks {
let field = struct_array.unmasked_field(field_idx).clone();
field_chunks.push(field);
}

// SAFETY: field_chunks are extracted from valid StructArrays with matching dtypes.
// Each chunk's field array is guaranteed to be valid for field_dtype.
let field_array = unsafe { ChunkedArray::new_unchecked(field_chunks, field_dtype.clone()) };
field_arrays.push(field_array.into_array());
}

// SAFETY: field_arrays are built from corresponding chunks of same length, dtypes match by
// construction.
Ok(unsafe { StructArray::new_unchecked(field_arrays, struct_dtype.clone(), len, validity) })
fn pack_struct_chunks(chunks: Vec<ArrayRef>, ctx: &mut ExecutionCtx) -> VortexResult<StructArray> {
chunks
.into_iter()
.map(|c| c.execute::<StructArray>(ctx))
.process_results(|iter| StructArray::try_concat(iter))?
}

/// Packs [`ListViewArray`]s together into a chunked `ListViewArray`.
Expand Down
48 changes: 10 additions & 38 deletions vortex-array/src/arrays/chunked/vtable/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,24 @@
use itertools::Itertools;
use vortex_error::VortexResult;

use crate::IntoArray;
use crate::array::ArrayView;
use crate::array::ValidityVTable;
use crate::arrays::Chunked;
use crate::arrays::ChunkedArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::validity::Validity;

impl ValidityVTable<Chunked> for Chunked {
fn validity(array: ArrayView<'_, Chunked>) -> VortexResult<Validity> {
let validities: Vec<Validity> =
array.chunks().iter().map(|c| c.validity()).try_collect()?;
let validities = array
.chunks()
.iter()
.map(|chunk| chunk.validity().map(|v| (v, chunk.len())))
.try_collect()?;
let Some(validity) = Validity::concat(validities) else {
// If there are no chunks:
return Ok(array.dtype().nullability().into());
};

match validities.first() {
// If there are no chunks, return the array's dtype nullability
None => return Ok(array.dtype().nullability().into()),
// If all chunks have the same non-array validity, return that validity directly
// We skip Validity::Array since equality is very expensive.
Some(first) if !matches!(first, Validity::Array(_)) => {
let target = std::mem::discriminant(first);
if validities
.iter()
.all(|v| std::mem::discriminant(v) == target)
{
return Ok(first.clone());
}
}
_ => {
// Array validity or mixed validities, proceed to build the validity array
}
}

Ok(Validity::Array(
unsafe {
ChunkedArray::new_unchecked(
validities
.into_iter()
.zip(array.iter_chunks())
.map(|(v, chunk)| v.to_array(chunk.len()))
.collect(),
DType::Bool(Nullability::NonNullable),
)
}
.into_array(),
))
Ok(validity)
}
}
72 changes: 70 additions & 2 deletions vortex-array/src/arrays/struct_/array.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::borrow::Borrow;
use std::iter::once;
use std::sync::Arc;

use itertools::Itertools;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_err;

use crate::ArrayRef;
Expand All @@ -16,6 +19,7 @@ use crate::array::EmptyArrayData;
use crate::array::TypedArrayRef;
use crate::array::child_to_validity;
use crate::array::validity_to_child;
use crate::arrays::ChunkedArray;
use crate::arrays::Struct;
use crate::dtype::DType;
use crate::dtype::FieldName;
Expand Down Expand Up @@ -430,9 +434,7 @@ impl Array<Struct> {
};
Some((new_array, field))
}
}

impl Array<Struct> {
pub fn with_column(&self, name: impl Into<FieldName>, array: ArrayRef) -> VortexResult<Self> {
let name = name.into();
let struct_dtype = self.struct_fields();
Expand All @@ -453,4 +455,70 @@ impl Array<Struct> {
pub fn remove_column_owned(&self, name: impl Into<FieldName>) -> Option<(Self, ArrayRef)> {
self.remove_column(name)
}

pub fn try_concat<T>(chunks: impl IntoIterator<Item = T>) -> VortexResult<Self>
where
T: Borrow<Array<Struct>>,
{
let mut it = chunks.into_iter();
let Some(first) = it.next() else {
vortex_bail!("cannot concat empty iterator of arrays");
};
let first_dtype = first.borrow().dtype().clone();
let struct_fields = first_dtype.as_struct_fields().clone();
let names = struct_fields.names();

let it = [first].into_iter().chain(it);
let (field_arrays_per_chunk, validities) = it
.map(|chunk| {
let chunk = chunk.borrow();
if &first_dtype != chunk.dtype() {
vortex_bail!(
"cannot concatenate struct arrays with differing dtypes: {}, {}",
first_dtype,
chunk.dtype(),
);
}

let fields = names
.iter()
.map(|name| {
chunk
.unmasked_field_by_name(name)
.vortex_expect("field exists because it is in dtype")
.clone()
})
.collect::<Vec<_>>();
let validity = chunk.validity()?;

Ok((fields, (validity, chunk.len())))
})
.process_results(|iter| iter.unzip::<_, _, Vec<_>, Vec<_>>())?;

let field_arrays = struct_fields
.fields()
.enumerate()
.map(|(i, dtype)| {
// SAFETY: We establish above that every array has the same type.
let chunks = field_arrays_per_chunk
.iter()
.map(|x| x[i].clone())
.collect();
unsafe { ChunkedArray::new_unchecked(chunks, dtype) }.into_array()
})
.collect::<Vec<_>>();
let len = validities.iter().map(|(_v, len)| len).sum();
let validity = Validity::concat(validities).vortex_expect("verified non-empty above");

// SAFETY:
//
// 1. The field arrays, by construction, have the type specified in fields.
//
// 2. Each Array<Struct> has a valid len, therefore the sum of those lens should be valid
// for the concatenation of each field.
//
// 3. Each Array<Struct> has a valid validity, so the concatenation of those validities has
// the correct length and dtype harmony.
Ok(unsafe { Array::<Struct>::new_unchecked(field_arrays, struct_fields, len, validity) })
}
}
41 changes: 41 additions & 0 deletions vortex-array/src/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::fmt::Debug;
use std::ops::Range;

use itertools::Itertools as _;
use vortex_buffer::BitBuffer;
use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
Expand All @@ -22,6 +23,7 @@ use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::arrays::BoolArray;
use crate::arrays::ChunkedArray;
use crate::arrays::ConstantArray;
use crate::arrays::scalar_fn::ScalarFnFactoryExt;
use crate::builtins::ArrayBuiltins;
Expand Down Expand Up @@ -447,6 +449,45 @@ impl From<&Nullability> for Validity {
}
}

impl Validity {
/// Concatenate one or more validities together.
///
/// Returns None if the vector is empty.
pub fn concat(validities: Vec<(Validity, usize)>) -> Option<Self> {
let mut validity_kinds = validities
.iter()
.map(|(v, _)| std::mem::discriminant(v))
.unique();
let validity_kind = validity_kinds.next()?;
if validity_kinds.next().is_none() {
// If there is only one kind of validity and its not Validity::Array, avoid constructing
// a Validity::Array.
if validity_kind == std::mem::discriminant(&Validity::AllValid) {
return Some(Validity::AllValid);
}
if validity_kind == std::mem::discriminant(&Validity::AllInvalid) {
return Some(Validity::AllInvalid);
}
if validity_kind == std::mem::discriminant(&Validity::NonNullable) {
return Some(Validity::NonNullable);
}
}

Some(Validity::Array(
unsafe {
ChunkedArray::new_unchecked(
validities
.into_iter()
.map(|(v, len)| v.to_array(len))
.collect(),
DType::Bool(Nullability::NonNullable),
)
}
.into_array(),
))
}
}

impl Validity {
pub fn from_bit_buffer(buffer: BitBuffer, nullability: Nullability) -> Self {
if buffer.true_count() == buffer.len() {
Expand Down
Loading