Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use log::debug;
use num_enum::{IntoPrimitive, TryFromPrimitive};
pub use stats_set::*;
use vortex_dtype::Nullability::{NonNullable, Nullable};
use vortex_dtype::{DECIMAL256_MAX_PRECISION, DType, DecimalDType, PType};
use vortex_dtype::{DType, DecimalDType, NativeDecimalType, PType, i256};

mod array;
mod bound;
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Stat {
// - https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66
// - https://github.com/apache/datafusion/blob/4153adf2c0f6e317ef476febfdc834208bd46622/datafusion/functions-aggregate/src/sum.rs#L188
let precision =
u8::min(DECIMAL256_MAX_PRECISION, decimal_dtype.precision() + 10);
u8::min(i256::MAX_PRECISION, decimal_dtype.precision() + 10);
DType::Decimal(
DecimalDType::new(precision, decimal_dtype.scale()),
Nullable,
Expand Down
7 changes: 6 additions & 1 deletion vortex-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ workspace = true
[dependencies]
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-mask = { workspace = true }
vortex-vector = { workspace = true }

arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
num-traits = { workspace = true }

[features]
default = ["arithmetic", "comparison", "filter", "logical", "mask"]
default = ["arithmetic", "arrow", "comparison", "filter", "logical", "mask"]

arithmetic = []
arrow = ["dep:arrow-array", "dep:arrow-buffer", "dep:arrow-schema"]
comparison = []
filter = []
logical = []
Expand Down
20 changes: 20 additions & 0 deletions vortex-compute/src/arrow/bool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray};
use vortex_error::VortexResult;
use vortex_vector::BoolVector;

use crate::arrow::IntoArrow;

impl IntoArrow<ArrayRef> for BoolVector {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (bits, validity) = self.into_parts();
Ok(Arc::new(BooleanArray::new(
bits.into(),
validity.into_arrow()?,
)))
}
}
81 changes: 81 additions & 0 deletions vortex-compute/src/arrow/decimal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::types::{Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type};
use arrow_array::{ArrayRef, PrimitiveArray};
use vortex_buffer::Buffer;
use vortex_dtype::i256;
use vortex_error::VortexResult;
use vortex_vector::{DVector, DecimalVector};

use crate::arrow::IntoArrow;

impl IntoArrow<ArrayRef> for DecimalVector {
fn into_arrow(self) -> VortexResult<ArrayRef> {
match self {
DecimalVector::D8(v) => v.into_arrow(),
DecimalVector::D16(v) => v.into_arrow(),
DecimalVector::D32(v) => v.into_arrow(),
DecimalVector::D64(v) => v.into_arrow(),
DecimalVector::D128(v) => v.into_arrow(),
DecimalVector::D256(v) => v.into_arrow(),
}
}
}

macro_rules! impl_decimal_upcast_i32 {
($T:ty) => {
impl IntoArrow<ArrayRef> for DVector<$T> {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (_, elements, validity) = self.into_parts();
// Upcast the DVector to Arrow's smallest decimal type (Decimal32)
let elements =
Buffer::<i32>::from_trusted_len_iter(elements.iter().map(|i| *i as i32));
Ok(Arc::new(PrimitiveArray::<Decimal32Type>::new(
elements.into_arrow_scalar_buffer(),
validity.into_arrow()?,
)))
}
}
};
}

impl_decimal_upcast_i32!(i8);
impl_decimal_upcast_i32!(i16);

/// Direct Arrow conversion for vectors that map directly to Arrow decimal types.
macro_rules! impl_decimal {
($T:ty, $A:ty) => {
impl IntoArrow<ArrayRef> for DVector<$T> {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (_, elements, validity) = self.into_parts();
Ok(Arc::new(PrimitiveArray::<$A>::new(
elements.into_arrow_scalar_buffer(),
validity.into_arrow()?,
)))
}
}
};
}

impl_decimal!(i32, Decimal32Type);
impl_decimal!(i64, Decimal64Type);
impl_decimal!(i128, Decimal128Type);

impl IntoArrow<ArrayRef> for DVector<i256> {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (_, elements, validity) = self.into_parts();

// Transmute the elements from our i256 to Arrow's.
// SAFETY: we use Arrow's type internally for our layout.
let elements =
unsafe { std::mem::transmute::<Buffer<i256>, Buffer<arrow_buffer::i256>>(elements) };

Ok(Arc::new(PrimitiveArray::<Decimal256Type>::new(
elements.into_arrow_scalar_buffer(),
validity.into_arrow()?,
)))
}
}
26 changes: 26 additions & 0 deletions vortex-compute/src/arrow/mask.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrow_buffer::NullBuffer;
use vortex_error::VortexResult;
use vortex_mask::Mask;

use crate::arrow::IntoArrow;

impl IntoArrow<Option<NullBuffer>> for Mask {
fn into_arrow(self) -> VortexResult<Option<NullBuffer>> {
Ok(match self {
Mask::AllTrue(_) => None,
Mask::AllFalse(len) => Some(NullBuffer::new_null(len)),
Mask::Values(values) => {
// SAFETY: we maintain our own validated true count.
Some(unsafe {
NullBuffer::new_unchecked(
values.bit_buffer().clone().into(),
values.len() - values.true_count(),
)
})
}
})
}
}
21 changes: 21 additions & 0 deletions vortex-compute/src/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Conversion logic from Vortex vector types to Arrow types.

use vortex_error::VortexResult;

mod bool;
mod decimal;
mod mask;
mod null;
mod primitive;
mod struct_;
mod varbin;
mod vector;

/// Trait for converting Vortex vector types into Arrow types.
pub trait IntoArrow<Output> {
/// Convert the Vortex type into an Arrow type.
fn into_arrow(self) -> VortexResult<Output>;
}
16 changes: 16 additions & 0 deletions vortex-compute/src/arrow/null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::{ArrayRef, NullArray};
use vortex_error::VortexResult;
use vortex_vector::{NullVector, VectorOps};

use crate::arrow::IntoArrow;

impl IntoArrow<ArrayRef> for NullVector {
fn into_arrow(self) -> VortexResult<ArrayRef> {
Ok(Arc::new(NullArray::new(self.len())))
}
}
47 changes: 47 additions & 0 deletions vortex-compute/src/arrow/primitive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::types::{
Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type,
UInt16Type, UInt32Type, UInt64Type,
};
use arrow_array::{ArrayRef, PrimitiveArray};
use vortex_dtype::half::f16;
use vortex_error::VortexResult;
use vortex_vector::{PVector, PrimitiveVector, match_each_pvector};

use crate::arrow::IntoArrow;

impl IntoArrow<ArrayRef> for PrimitiveVector {
fn into_arrow(self) -> VortexResult<ArrayRef> {
match_each_pvector!(self, |v| { v.into_arrow() })
}
}

macro_rules! impl_primitive {
($T:ty, $A:ty) => {
impl IntoArrow<ArrayRef> for PVector<$T> {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (elements, validity) = self.into_parts();
Ok(Arc::new(PrimitiveArray::<$A>::new(
elements.into_arrow_scalar_buffer(),
validity.into_arrow()?,
)))
}
}
};
}

impl_primitive!(u8, UInt8Type);
impl_primitive!(u16, UInt16Type);
impl_primitive!(u32, UInt32Type);
impl_primitive!(u64, UInt64Type);
impl_primitive!(i8, Int8Type);
impl_primitive!(i16, Int16Type);
impl_primitive!(i32, Int32Type);
impl_primitive!(i64, Int64Type);
impl_primitive!(f16, Float16Type);
impl_primitive!(f32, Float32Type);
impl_primitive!(f64, Float64Type);
34 changes: 34 additions & 0 deletions vortex-compute/src/arrow/struct_.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::{ArrayRef, StructArray};
use arrow_schema::{Field, Fields};
use vortex_error::VortexResult;
use vortex_vector::StructVector;

use crate::arrow::IntoArrow;

impl IntoArrow<ArrayRef> for StructVector {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (fields, validity) = self.into_parts();
let arrow_fields = fields
.iter()
.map(|field| field.clone().into_arrow())
.collect::<VortexResult<Vec<ArrayRef>>>()?;

// We need to make up the field names since vectors are unnamed.
let fields = Fields::from(
(0..arrow_fields.len())
.map(|i| Field::new(i.to_string(), arrow_fields[i].data_type().clone(), true))
.collect::<Vec<Field>>(),
);

Ok(Arc::new(StructArray::new(
fields,
arrow_fields,
validity.into_arrow()?,
)))
}
}
42 changes: 42 additions & 0 deletions vortex-compute/src/arrow/varbin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use arrow_array::{ArrayRef, GenericByteViewArray};
use vortex_buffer::Buffer;
use vortex_error::VortexResult;
use vortex_vector::{BinaryType, StringType, VarBinVector};

use crate::arrow::IntoArrow;

macro_rules! impl_varbin {
($T:ty, $A:ty) => {
impl IntoArrow<ArrayRef> for VarBinVector<$T> {
fn into_arrow(self) -> VortexResult<ArrayRef> {
let (views, buffers, validity) = self.into_parts();

let views = Buffer::<u128>::from_byte_buffer(views.into_byte_buffer())
.into_arrow_scalar_buffer();
let buffers: Vec<_> = buffers
.iter()
.cloned()
.map(|b| b.into_arrow_buffer())
.collect();

// SAFETY: our own guarantees are the same as Arrow's guarantees for BinaryViewArray
let array = unsafe {
GenericByteViewArray::<$A>::new_unchecked(
views,
buffers,
validity.into_arrow()?,
)
};
Ok(Arc::new(array))
}
}
};
}

impl_varbin!(BinaryType, arrow_array::types::BinaryViewType);
impl_varbin!(StringType, arrow_array::types::StringViewType);
14 changes: 14 additions & 0 deletions vortex-compute/src/arrow/vector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrow_array::ArrayRef;
use vortex_error::VortexResult;
use vortex_vector::{Vector, match_each_vector};

use crate::arrow::IntoArrow;

impl IntoArrow<ArrayRef> for Vector {
fn into_arrow(self) -> VortexResult<ArrayRef> {
match_each_vector!(self, |v| { v.into_arrow() })
}
}
2 changes: 2 additions & 0 deletions vortex-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#[cfg(feature = "arithmetic")]
pub mod arithmetic;
#[cfg(feature = "arrow")]
pub mod arrow;
#[cfg(feature = "comparison")]
pub mod comparison;
#[cfg(feature = "filter")]
Expand Down
Loading
Loading