diff --git a/encodings/fastlanes/public-api.lock b/encodings/fastlanes/public-api.lock index 79c2a0c111d..5f7fadeb151 100644 --- a/encodings/fastlanes/public-api.lock +++ b/encodings/fastlanes/public-api.lock @@ -186,6 +186,14 @@ impl vortex_array::arrays::slice::SliceReduce for vortex_fastlanes::BitPacked pub fn vortex_fastlanes::BitPacked::slice(vortex_array::array::view::ArrayView<'_, Self>, core::ops::range::Range) -> vortex_error::VortexResult> +impl vortex_array::scalar_fn::fns::between::kernel::BetweenKernel for vortex_fastlanes::BitPacked + +pub fn vortex_fastlanes::BitPacked::between(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::array::erased::ArrayRef, &vortex_array::array::erased::ArrayRef, &vortex_array::scalar_fn::fns::between::BetweenOptions, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + +impl vortex_array::scalar_fn::fns::binary::compare::CompareKernel for vortex_fastlanes::BitPacked + +pub fn vortex_fastlanes::BitPacked::compare(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::array::erased::ArrayRef, vortex_array::scalar_fn::fns::operators::CompareOperator, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + impl vortex_array::scalar_fn::fns::cast::kernel::CastKernel for vortex_fastlanes::BitPacked pub fn vortex_fastlanes::BitPacked::cast(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::dtype::DType, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> diff --git a/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs b/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs index a02dfb6b998..5e0673186e3 100644 --- a/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs +++ b/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs @@ -3,6 +3,7 @@ use std::mem; use std::mem::MaybeUninit; +use std::ops::Range; use fastlanes::BitPacking; use lending_iterator::gat; @@ -215,33 +216,35 @@ impl> UnpackedChunks { mut f: impl FnMut(T) -> U, ) { debug_assert_eq!(output.len(), self.len); + + self.for_each_unpacked_chunk(|chunk, range| { + write_map(chunk, &mut output[range], &mut f); + }); + } + + /// Walk every unpacked chunk in array order, reusing the internal scratch buffer. + pub(crate) fn for_each_unpacked_chunk(&mut self, mut f: F) + where + F: FnMut(&mut [T], Range), + { let mut local_idx = 0; if let Some(initial) = self.initial() { let chunk_len = initial.len(); - write_map(initial, &mut output[..chunk_len], &mut f); + f(initial, local_idx..local_idx + chunk_len); local_idx += chunk_len; } - if self.num_chunks != 1 { - let first_chunk_is_sliced = self.first_chunk_is_sliced(); - let last_chunk_is_sliced = self.last_chunk_is_sliced(); - let full_chunks_range = - (first_chunk_is_sliced as usize)..(self.num_chunks - last_chunk_is_sliced as usize); - + if self.num_chunks > 1 { let packed_slice: &[T::Physical] = buffer_as_slice(&self.packed); let elems_per_chunk = self.elems_per_chunk(); - for i in full_chunks_range { + for i in self.full_chunks_range() { let chunk = &packed_slice[i * elems_per_chunk..][..elems_per_chunk]; unsafe { let dst: &mut [T::Physical] = mem::transmute(&mut self.buffer[..]); self.strategy.unpack_chunk(self.bit_width, chunk, dst); - let unpacked: &[T] = mem::transmute(&self.buffer[..]); - write_map( - unpacked, - &mut output[local_idx..local_idx + CHUNK_SIZE], - &mut f, - ); + let unpacked: &mut [T] = mem::transmute(&mut self.buffer[..]); + f(unpacked, local_idx..local_idx + CHUNK_SIZE); } local_idx += CHUNK_SIZE; } @@ -249,11 +252,7 @@ impl> UnpackedChunks { if let Some(trailer) = self.trailer() { let chunk_len = trailer.len(); - write_map( - trailer, - &mut output[local_idx..local_idx + chunk_len], - &mut f, - ); + f(trailer, local_idx..local_idx + chunk_len); local_idx += chunk_len; } @@ -270,16 +269,11 @@ impl> UnpackedChunks { return start_idx; } - let first_chunk_is_sliced = self.first_chunk_is_sliced(); - let last_chunk_is_sliced = self.last_chunk_is_sliced(); - let full_chunks_range = - (first_chunk_is_sliced as usize)..(self.num_chunks - last_chunk_is_sliced as usize); - let mut local_idx = start_idx; let packed_slice: &[T::Physical] = buffer_as_slice(&self.packed); let elems_per_chunk = self.elems_per_chunk(); - for i in full_chunks_range { + for i in self.full_chunks_range() { let chunk = &packed_slice[i * elems_per_chunk..][..elems_per_chunk]; unsafe { @@ -293,6 +287,11 @@ impl> UnpackedChunks { local_idx } + fn full_chunks_range(&self) -> Range { + (self.first_chunk_is_sliced() as usize) + ..(self.num_chunks - self.last_chunk_is_sliced() as usize) + } + /// Access last chunk of the array if the last chunk has fewer than 1024 due to slicing pub fn trailer(&mut self) -> Option<&mut [T]> { (self.last_chunk_is_sliced() && self.num_chunks > 1).then(|| { diff --git a/encodings/fastlanes/src/bitpacking/compute/between.rs b/encodings/fastlanes/src/bitpacking/compute/between.rs new file mode 100644 index 00000000000..0611d111125 --- /dev/null +++ b/encodings/fastlanes/src/bitpacking/compute/between.rs @@ -0,0 +1,248 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Block-streaming between kernel for [`BitPackedArray`] against constant bounds. +//! +//! Reuses the same single-block scratch buffer as the compare kernel and folds a +//! `lower op_l v op_u upper` predicate per element, so the full primitive never +//! materialises. + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::dtype::NativePType; +use vortex_array::dtype::Nullability; +use vortex_array::match_each_integer_ptype; +use vortex_array::scalar_fn::fns::between::BetweenKernel; +use vortex_array::scalar_fn::fns::between::BetweenOptions; +use vortex_array::scalar_fn::fns::between::StrictComparison; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; + +use crate::BitPacked; +use crate::bitpacking::compute::stream_predicate::stream_predicate; + +impl BetweenKernel for BitPacked { + fn between( + array: ArrayView<'_, Self>, + lower: &ArrayRef, + upper: &ArrayRef, + options: &BetweenOptions, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // Only accelerate constant-bounds between; vary-by-row bounds fall through to the + // default `compare + and` pipeline. + let (Some(lower_const), Some(upper_const)) = (lower.as_constant(), upper.as_constant()) + else { + return Ok(None); + }; + let (Some(lower_prim), Some(upper_prim)) = ( + lower_const.as_primitive_opt(), + upper_const.as_primitive_opt(), + ) else { + return Ok(None); + }; + + let nullability = + array.dtype().nullability() | lower.dtype().nullability() | upper.dtype().nullability(); + let arr_ptype = array.dtype().as_ptype(); + if lower_prim.ptype() != arr_ptype || upper_prim.ptype() != arr_ptype { + return Ok(None); + } + + let result = match_each_integer_ptype!(arr_ptype, |T| { + let lo: T = lower_prim + .typed_value::() + .vortex_expect("between precondition strips null lower"); + let up: T = upper_prim + .typed_value::() + .vortex_expect("between precondition strips null upper"); + between_constant_typed::(array, lo, up, options, nullability, ctx)? + }); + Ok(Some(result)) + } +} + +fn between_constant_typed( + array: ArrayView<'_, BitPacked>, + lower: T, + upper: T, + options: &BetweenOptions, + nullability: Nullability, + ctx: &mut ExecutionCtx, +) -> VortexResult +where + T: NativePType + Copy + crate::unpack_iter::BitPacked, +{ + // Branch on strictness once at the top so each call into `between_impl` monomorphises + // a single tight predicate — same shape as `Primitive::between` in `vortex-array`. + match (options.lower_strict, options.upper_strict) { + (StrictComparison::Strict, StrictComparison::Strict) => between_impl( + array, + lower, + NativePType::is_lt, + upper, + NativePType::is_lt, + nullability, + ctx, + ), + (StrictComparison::Strict, StrictComparison::NonStrict) => between_impl( + array, + lower, + NativePType::is_lt, + upper, + NativePType::is_le, + nullability, + ctx, + ), + (StrictComparison::NonStrict, StrictComparison::Strict) => between_impl( + array, + lower, + NativePType::is_le, + upper, + NativePType::is_lt, + nullability, + ctx, + ), + (StrictComparison::NonStrict, StrictComparison::NonStrict) => between_impl( + array, + lower, + NativePType::is_le, + upper, + NativePType::is_le, + nullability, + ctx, + ), + } +} + +fn between_impl( + array: ArrayView<'_, BitPacked>, + lower: T, + lower_fn: Lo, + upper: T, + upper_fn: Up, + nullability: Nullability, + ctx: &mut ExecutionCtx, +) -> VortexResult +where + T: NativePType + Copy + crate::unpack_iter::BitPacked, + Lo: Fn(T, T) -> bool, + Up: Fn(T, T) -> bool, +{ + stream_predicate::( + array, + nullability, + |v| lower_fn(lower, v) & upper_fn(v, upper), + ctx, + ) +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::ConstantArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::scalar_fn::fns::between::BetweenOptions; + use vortex_array::scalar_fn::fns::between::StrictComparison; + use vortex_array::validity::Validity; + use vortex_buffer::BufferMut; + use vortex_error::VortexResult; + + use crate::BitPackedArrayExt; + use crate::BitPackedData; + + fn opts(lower: StrictComparison, upper: StrictComparison) -> BetweenOptions { + BetweenOptions { + lower_strict: lower, + upper_strict: upper, + } + } + + #[rstest] + #[case(StrictComparison::NonStrict, StrictComparison::NonStrict)] + #[case(StrictComparison::Strict, StrictComparison::NonStrict)] + #[case(StrictComparison::NonStrict, StrictComparison::Strict)] + #[case(StrictComparison::Strict, StrictComparison::Strict)] + fn multi_chunk_against_primitive_baseline( + #[case] lower_strict: StrictComparison, + #[case] upper_strict: StrictComparison, + ) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: BufferMut = (0..3000u32).map(|i| i % 257).collect(); + let prim = PrimitiveArray::new(values.freeze(), Validity::NonNullable); + let packed = BitPackedData::encode(&prim.clone().into_array(), 9, &mut ctx)?; + + let lower = ConstantArray::new(40u32, prim.len()).into_array(); + let upper = ConstantArray::new(200u32, prim.len()).into_array(); + let options = opts(lower_strict, upper_strict); + + let expected = prim + .into_array() + .between(lower.clone(), upper.clone(), options.clone())? + .execute::(&mut ctx)?; + let actual = packed + .into_array() + .between(lower, upper, options)? + .execute::(&mut ctx)?; + + assert_arrays_eq!(actual, expected); + Ok(()) + } + + #[test] + fn signed_with_patches_against_primitive_baseline() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = (0..1500) + .map(|i| if i % 73 == 0 { 100_000 + i } else { i % 100 }) + .collect(); + let prim = PrimitiveArray::from_iter(values); + let packed = BitPackedData::encode(&prim.clone().into_array(), 7, &mut ctx)?; + assert!(packed.patches().is_some(), "test setup expects patches"); + + let lower = ConstantArray::new(20i32, prim.len()).into_array(); + let upper = ConstantArray::new(80i32, prim.len()).into_array(); + let options = opts(StrictComparison::NonStrict, StrictComparison::NonStrict); + + let expected = prim + .into_array() + .between(lower.clone(), upper.clone(), options.clone())? + .execute::(&mut ctx)?; + let actual = packed + .into_array() + .between(lower, upper, options)? + .execute::(&mut ctx)?; + + assert_arrays_eq!(actual, expected); + Ok(()) + } + + #[test] + fn nullable_propagates_validity() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let prim = + PrimitiveArray::from_option_iter([Some(1u32), None, Some(3), Some(4), None, Some(6)]); + let packed = BitPackedData::encode(&prim.clone().into_array(), 3, &mut ctx)?; + + let lower = ConstantArray::new(2u32, packed.len()).into_array(); + let upper = ConstantArray::new(5u32, packed.len()).into_array(); + let options = opts(StrictComparison::NonStrict, StrictComparison::NonStrict); + + let actual = packed + .into_array() + .between(lower.clone(), upper.clone(), options.clone())? + .execute::(&mut ctx)?; + let expected = prim + .into_array() + .between(lower, upper, options)? + .execute::(&mut ctx)?; + assert_arrays_eq!(actual, expected); + Ok(()) + } +} diff --git a/encodings/fastlanes/src/bitpacking/compute/compare.rs b/encodings/fastlanes/src/bitpacking/compute/compare.rs new file mode 100644 index 00000000000..f5c5c81c5cb --- /dev/null +++ b/encodings/fastlanes/src/bitpacking/compute/compare.rs @@ -0,0 +1,212 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Block-streaming compare kernel for [`BitPackedArray`] against a constant. +//! +//! Avoids materialising the full primitive: the array is walked one 1024-element FastLanes +//! block at a time through a reusable scratch buffer, and a per-element bool is folded into +//! a [`BitBuffer`]. Patches are re-applied at the end by overwriting bits at the patched +//! indices with `predicate(patch_value)`. + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::dtype::NativePType; +use vortex_array::dtype::Nullability; +use vortex_array::match_each_integer_ptype; +use vortex_array::scalar_fn::fns::binary::CompareKernel; +use vortex_array::scalar_fn::fns::operators::CompareOperator; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; + +use crate::BitPacked; +use crate::bitpacking::compute::stream_predicate::stream_predicate; + +impl CompareKernel for BitPacked { + fn compare( + lhs: ArrayView<'_, Self>, + rhs: &ArrayRef, + operator: CompareOperator, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // Only accelerate compare-against-constant. + let Some(constant) = rhs.as_constant() else { + return Ok(None); + }; + let Some(constant_prim) = constant.as_primitive_opt() else { + return Ok(None); + }; + + // Adaptor strips null-constant RHS, and the binary scalar-fn coerce_args step has + // already promoted both sides to a common ptype. + let nullability = lhs.dtype().nullability() | rhs.dtype().nullability(); + let lhs_ptype = lhs.dtype().as_ptype(); + if constant_prim.ptype() != lhs_ptype { + return Ok(None); + } + + let result = match_each_integer_ptype!(lhs_ptype, |T| { + let rhs: T = constant_prim + .typed_value::() + .vortex_expect("compare adaptor strips null constants"); + compare_constant_typed::(lhs, rhs, operator, nullability, ctx)? + }); + Ok(Some(result)) + } +} + +fn compare_constant_typed( + lhs: ArrayView<'_, BitPacked>, + rhs: T, + operator: CompareOperator, + nullability: Nullability, + ctx: &mut ExecutionCtx, +) -> VortexResult +where + T: NativePType + Copy + crate::unpack_iter::BitPacked, +{ + // `NativePType::is_eq` / `is_lt` etc. provide total comparison (matching the primitive + // between kernel's dispatch shape). `NotEq` has no direct method, so use `!is_eq`. + match operator { + CompareOperator::Eq => stream_predicate::(lhs, nullability, |v| v.is_eq(rhs), ctx), + CompareOperator::NotEq => { + stream_predicate::(lhs, nullability, |v| !v.is_eq(rhs), ctx) + } + CompareOperator::Lt => stream_predicate::(lhs, nullability, |v| v.is_lt(rhs), ctx), + CompareOperator::Lte => stream_predicate::(lhs, nullability, |v| v.is_le(rhs), ctx), + CompareOperator::Gt => stream_predicate::(lhs, nullability, |v| v.is_gt(rhs), ctx), + CompareOperator::Gte => stream_predicate::(lhs, nullability, |v| v.is_ge(rhs), ctx), + } +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::BoolArray; + use vortex_array::arrays::ConstantArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::scalar_fn::fns::binary::CompareKernel; + use vortex_array::scalar_fn::fns::operators::CompareOperator; + use vortex_array::scalar_fn::fns::operators::Operator; + use vortex_error::VortexResult; + + use crate::BitPacked; + use crate::BitPackedArrayExt; + use crate::BitPackedData; + + /// All six operators on a small in-range input. + #[rstest] + #[case(Operator::Eq, vec![false, false, false, true, false, false, true])] + #[case(Operator::NotEq, vec![true, true, true, false, true, true, false])] + #[case(Operator::Lt, vec![true, true, true, false, false, false, false])] + #[case(Operator::Lte, vec![true, true, true, true, false, false, true])] + #[case(Operator::Gt, vec![false, false, false, false, true, true, false])] + #[case(Operator::Gte, vec![false, false, false, true, true, true, true])] + fn small(#[case] op: Operator, #[case] expected: Vec) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values = PrimitiveArray::from_iter([0u32, 1, 2, 3, 4, 5, 3]); + let packed = BitPackedData::encode(&values.into_array(), 3, &mut ctx).unwrap(); + let rhs = ConstantArray::new(3u32, packed.len()).into_array(); + let result = packed + .into_array() + .binary(rhs, op) + .unwrap() + .execute::(&mut ctx) + .unwrap(); + assert_arrays_eq!(result, BoolArray::from_iter(expected)); + } + + /// Sweep every native int type across several bit-widths. 2048 elements spans two + /// FastLanes blocks, exercising the per-type monomorphised inner loop. The kernel is + /// invoked *directly* and asserted `Some`, proving the streaming path engages (rather + /// than silently falling back to the Arrow compare), and its output is checked against + /// the Primitive fallback. + macro_rules! sweep { + ($name:ident, $T:ty, $($bw:expr),+) => { + #[test] + fn $name() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + for bw in [$($bw),+] { + let cap: u128 = 1u128 << bw; + let values: Vec<$T> = (0..2048u128).map(|i| (i % cap) as $T).collect(); + let prim = PrimitiveArray::from_iter(values); + let packed = BitPackedData::encode(&prim.clone().into_array(), bw, &mut ctx)?; + let rhs_val = (cap.min(2048) / 2) as $T; + let rhs = ConstantArray::new(rhs_val, prim.len()).into_array(); + for op in [CompareOperator::Eq, CompareOperator::Lt, CompareOperator::Gte] { + let got = ::compare( + packed.as_view(), &rhs, op, &mut ctx, + )? + .expect("streaming compare kernel must engage") + .execute::(&mut ctx)?; + let want = prim + .clone() + .into_array() + .binary(rhs.clone(), Operator::from(op))? + .execute::(&mut ctx)?; + assert_arrays_eq!(got, want); + } + } + Ok(()) + } + }; + } + + sweep!(sweep_u8, u8, 1, 4, 7); + sweep!(sweep_u16, u16, 1, 8, 15); + sweep!(sweep_u32, u32, 1, 16, 31); + sweep!(sweep_u64, u64, 1, 32, 63); + sweep!(sweep_i8, i8, 1, 4, 7); + sweep!(sweep_i16, i16, 1, 8, 15); + sweep!(sweep_i32, i32, 1, 16, 31); + sweep!(sweep_i64, i64, 1, 32, 63); + + /// Inline-patch path: encode signed i32 values that exceed the bit-width range so they + /// end up in `Patches`. The streaming kernel must splice the patches in before the + /// predicate runs. + #[test] + fn signed_with_patches_matches_primitive() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = (0..1500) + .map(|i| if i % 73 == 0 { 100_000 + i } else { i % 100 }) + .collect(); + let prim = PrimitiveArray::from_iter(values); + let packed = BitPackedData::encode(&prim.clone().into_array(), 7, &mut ctx)?; + assert!(packed.patches().is_some(), "test setup expects patches"); + let rhs = ConstantArray::new(50i32, prim.len()).into_array(); + let expected = prim + .into_array() + .binary(rhs.clone(), Operator::Eq)? + .execute::(&mut ctx)?; + let actual = packed + .into_array() + .binary(rhs, Operator::Eq)? + .execute::(&mut ctx)?; + assert_arrays_eq!(actual, expected); + Ok(()) + } + + /// Nullable input — the result must carry the array's validity. + #[test] + fn nullable_propagates_validity() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let prim = PrimitiveArray::from_option_iter([Some(1u32), None, Some(3), Some(4), None]); + let packed = BitPackedData::encode(&prim.clone().into_array(), 3, &mut ctx)?; + let rhs = ConstantArray::new(3u32, packed.len()).into_array(); + let actual = packed + .into_array() + .binary(rhs.clone(), Operator::Eq)? + .execute::(&mut ctx)?; + let expected = prim + .into_array() + .binary(rhs, Operator::Eq)? + .execute::(&mut ctx)?; + assert_arrays_eq!(actual, expected); + Ok(()) + } +} diff --git a/encodings/fastlanes/src/bitpacking/compute/mod.rs b/encodings/fastlanes/src/bitpacking/compute/mod.rs index 2501d952356..518f8319eb1 100644 --- a/encodings/fastlanes/src/bitpacking/compute/mod.rs +++ b/encodings/fastlanes/src/bitpacking/compute/mod.rs @@ -1,10 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +mod between; mod cast; +mod compare; mod filter; pub(crate) mod is_constant; mod slice; +mod stream_predicate; mod take; // TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`. diff --git a/encodings/fastlanes/src/bitpacking/compute/stream_predicate.rs b/encodings/fastlanes/src/bitpacking/compute/stream_predicate.rs new file mode 100644 index 00000000000..4ed9de913ec --- /dev/null +++ b/encodings/fastlanes/src/bitpacking/compute/stream_predicate.rs @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Streaming, cache-reusable predicate evaluation over a [`BitPackedArray`]. +//! +//! Walks the encoded array one 1024-element FastLanes block at a time through a single +//! reusable scratch buffer, splices any [`crate::patches::Patches`] into the unpacked block +//! in place via a sorted-index cursor, then folds a `Fn(T) -> bool` predicate over the +//! block. The fold matches the canonical [`vortex_buffer::BitBuffer::collect_bool`] shape +//! (pack 64 bools into a `u64` in a tight auto-vectorisable inner loop) and writes the +//! resulting words straight into the output bit buffer, so the materialised primitive +//! never appears anywhere. + +use num_traits::AsPrimitive; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::BoolArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::dtype::NativePType; +use vortex_array::dtype::Nullability; +use vortex_array::match_each_unsigned_integer_ptype; +use vortex_buffer::BitBufferMut; +use vortex_buffer::BufferMut; +use vortex_buffer::pack_bools_into_words; +use vortex_error::VortexResult; + +use crate::BitPacked; +use crate::BitPackedArrayExt; +use crate::unpack_iter::BitPacked as BitPackedIter; + +/// Stream `predicate` over the unpacked values of a [`BitPackedArray`], one FastLanes +/// block at a time, producing a [`BoolArray`]. +pub(super) fn stream_predicate( + array: ArrayView<'_, BitPacked>, + nullability: Nullability, + predicate: P, + ctx: &mut ExecutionCtx, +) -> VortexResult +where + T: BitPackedIter + NativePType + Copy, + P: Fn(T) -> bool, +{ + let len = array.len(); + let mut words: BufferMut = BufferMut::zeroed(len.div_ceil(u64::BITS as usize)); + + if len > 0 { + let mut chunks = array.unpacked_chunks::()?; + let words = words.as_mut_slice(); + + if let Some(p) = array.patches() { + let p_idx_arr = p.indices().clone().execute::(ctx)?; + let p_val_arr = p.values().clone().execute::(ctx)?; + let p_off = p.offset(); + match_each_unsigned_integer_ptype!(p_idx_arr.ptype(), |I| { + let p_idx = p_idx_arr.as_slice::(); + let p_val = p_val_arr.as_slice::(); + let mut p_cur: usize = 0; + chunks.for_each_unpacked_chunk(|block, range| { + p_cur = splice_patches::(block, range.start, p_cur, p_idx, p_val, p_off); + pack_bools_into_words(words, range.start, block.len(), |i| predicate(block[i])); + }); + }); + } else { + chunks.for_each_unpacked_chunk(|block, range| { + pack_bools_into_words(words, range.start, block.len(), |i| predicate(block[i])); + }); + } + } + + let bits = BitBufferMut::from_buffer(words.into_byte_buffer(), 0, len); + let validity = array.validity()?.union_nullability(nullability); + Ok(BoolArray::new(bits.freeze(), validity).into_array()) +} + +/// Overwrite the unpacked block in place with any patches falling in +/// `[chunk_start, chunk_start + block.len())`, starting from `cursor` and returning the +/// advanced cursor. Sorted indices mean the cursor only moves forward across the walk. +#[inline] +fn splice_patches( + block: &mut [T], + chunk_start: usize, + mut cursor: usize, + indices: &[I], + values: &[T], + patch_offset: usize, +) -> usize +where + T: Copy, + I: AsPrimitive, +{ + let end = chunk_start + block.len(); + while cursor < indices.len() { + let global: usize = indices[cursor].as_(); + let local = global - patch_offset; + if local >= end { + break; + } + debug_assert!(local >= chunk_start); + block[local - chunk_start] = values[cursor]; + cursor += 1; + } + cursor +} diff --git a/encodings/fastlanes/src/bitpacking/vtable/kernels.rs b/encodings/fastlanes/src/bitpacking/vtable/kernels.rs index cb020dc2ce9..87332f736a7 100644 --- a/encodings/fastlanes/src/bitpacking/vtable/kernels.rs +++ b/encodings/fastlanes/src/bitpacking/vtable/kernels.rs @@ -5,12 +5,16 @@ use vortex_array::arrays::dict::TakeExecuteAdaptor; use vortex_array::arrays::filter::FilterExecuteAdaptor; use vortex_array::arrays::slice::SliceExecuteAdaptor; use vortex_array::kernel::ParentKernelSet; +use vortex_array::scalar_fn::fns::between::BetweenExecuteAdaptor; +use vortex_array::scalar_fn::fns::binary::CompareExecuteAdaptor; use vortex_array::scalar_fn::fns::cast::CastExecuteAdaptor; use crate::BitPacked; pub(crate) const PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ + ParentKernelSet::lift(&BetweenExecuteAdaptor(BitPacked)), ParentKernelSet::lift(&CastExecuteAdaptor(BitPacked)), + ParentKernelSet::lift(&CompareExecuteAdaptor(BitPacked)), ParentKernelSet::lift(&FilterExecuteAdaptor(BitPacked)), ParentKernelSet::lift(&SliceExecuteAdaptor(BitPacked)), ParentKernelSet::lift(&TakeExecuteAdaptor(BitPacked)), diff --git a/vortex-buffer/public-api.lock b/vortex-buffer/public-api.lock index 44104a50daf..512f8be6fb1 100644 --- a/vortex-buffer/public-api.lock +++ b/vortex-buffer/public-api.lock @@ -1020,12 +1020,20 @@ pub fn B::copy_to_aligned(&mut self, usize, vortex_buffer::Alignment) -> vortex_ pub fn B::copy_to_const_aligned(&mut self, usize) -> vortex_buffer::ConstByteBuffer +pub fn vortex_buffer::collect_bool_word(usize, F) -> u64 where F: core::ops::function::FnMut(usize) -> bool + +pub fn vortex_buffer::collect_bool_words(&mut [u64], usize, F) where F: core::ops::function::FnMut(usize) -> bool + pub fn vortex_buffer::get_bit(&[u8], usize) -> bool pub unsafe fn vortex_buffer::get_bit_unchecked(*const u8, usize) -> bool +pub fn vortex_buffer::pack_bools_into_words(&mut [u64], usize, usize, F) where F: core::ops::function::FnMut(usize) -> bool + pub unsafe fn vortex_buffer::set_bit_unchecked(*mut u8, usize) +pub fn vortex_buffer::splice_word_at_bit(&mut [u64], usize, u64) + pub unsafe fn vortex_buffer::unset_bit_unchecked(*mut u8, usize) pub type vortex_buffer::ByteBuffer = vortex_buffer::Buffer diff --git a/vortex-buffer/src/bit/buf.rs b/vortex-buffer/src/bit/buf.rs index 9b98d1cd5d6..83b28e24b4a 100644 --- a/vortex-buffer/src/bit/buf.rs +++ b/vortex-buffer/src/bit/buf.rs @@ -21,6 +21,7 @@ use crate::bit::BitIndexIterator; use crate::bit::BitIterator; use crate::bit::BitSliceIterator; use crate::bit::UnalignedBitChunk; +use crate::bit::collect_bool_word; use crate::bit::count_ones::count_ones; use crate::bit::get_bit_unchecked; use crate::bit::ops::bitwise_binary_op; @@ -180,12 +181,11 @@ impl BitBuffer { let chunks = self.chunks(); for (chunk_idx, src_chunk) in chunks.iter().enumerate() { - let mut packed = 0u64; - for bit_idx in 0..64 { + let packed = collect_bool_word(64, |bit_idx| { let i = bit_idx + chunk_idx * 64; let bit_value = (src_chunk >> bit_idx) & 1 == 1; - packed |= (f(i, bit_value) as u64) << bit_idx; - } + f(i, bit_value) + }); // SAFETY: Already allocated sufficient capacity unsafe { buffer.push_unchecked(packed) } @@ -193,12 +193,11 @@ impl BitBuffer { if remainder != 0 { let src_chunk = chunks.remainder_bits(); - let mut packed = 0u64; - for bit_idx in 0..remainder { + let packed = collect_bool_word(remainder, |bit_idx| { let i = bit_idx + chunks_count * 64; let bit_value = (src_chunk >> bit_idx) & 1 == 1; - packed |= (f(i, bit_value) as u64) << bit_idx; - } + f(i, bit_value) + }); // SAFETY: Already allocated sufficient capacity unsafe { buffer.push_unchecked(packed) } diff --git a/vortex-buffer/src/bit/buf_mut.rs b/vortex-buffer/src/bit/buf_mut.rs index d1c96f069e8..a366eaae9a8 100644 --- a/vortex-buffer/src/bit/buf_mut.rs +++ b/vortex-buffer/src/bit/buf_mut.rs @@ -8,6 +8,7 @@ use bitvec::view::BitView; use crate::BitBuffer; use crate::BufferMut; use crate::ByteBufferMut; +use crate::bit::collect_bool_words; use crate::bit::get_bit_unchecked; use crate::bit::ops; use crate::bit::set_bit_unchecked; @@ -184,37 +185,20 @@ impl BitBufferMut { /// Invokes `f` with indexes `0..len` collecting the boolean results into a new `BitBufferMut` #[inline] - pub fn collect_bool bool>(len: usize, mut f: F) -> Self { - let mut buffer = BufferMut::with_capacity(len.div_ceil(64) * 8); - - let chunks = len / 64; - let remainder = len % 64; - for chunk in 0..chunks { - let mut packed = 0; - for bit_idx in 0..64 { - let i = bit_idx + chunk * 64; - packed |= (f(i) as u64) << bit_idx; - } - - // SAFETY: Already allocated sufficient capacity - unsafe { buffer.push_unchecked(packed) } - } - - if remainder != 0 { - let mut packed = 0; - for bit_idx in 0..remainder { - let i = bit_idx + chunks * 64; - packed |= (f(i) as u64) << bit_idx; - } - - // SAFETY: Already allocated sufficient capacity - unsafe { buffer.push_unchecked(packed) } - } - - buffer.truncate(len.div_ceil(8)); + pub fn collect_bool bool>(len: usize, f: F) -> Self { + let num_words = len.div_ceil(64); + let mut buffer: BufferMut = BufferMut::with_capacity(num_words); + // SAFETY: `collect_bool_words` writes every word in `0..num_words` below + // before any read; `u64` has no invalid bit patterns and the assignments + // inside `collect_bool_words` are pure writes. + unsafe { buffer.set_len(num_words) }; + collect_bool_words(buffer.as_mut_slice(), len, f); + + let mut bytes = buffer.into_byte_buffer(); + bytes.truncate(len.div_ceil(8)); Self { - buffer: buffer.into_byte_buffer(), + buffer: bytes, offset: 0, len, } diff --git a/vortex-buffer/src/bit/mod.rs b/vortex-buffer/src/bit/mod.rs index 37930d788b7..a4fe1345e60 100644 --- a/vortex-buffer/src/bit/mod.rs +++ b/vortex-buffer/src/bit/mod.rs @@ -25,6 +25,111 @@ pub use arrow_buffer::bit_iterator::BitSliceIterator; pub use buf::*; pub use buf_mut::*; +/// Packs up to 64 boolean values into a little-endian `u64` word. +#[inline] +pub fn collect_bool_word(len: usize, mut f: F) -> u64 +where + F: FnMut(usize) -> bool, +{ + assert!(len <= 64, "cannot pack {len} bits into a u64 word"); + + let mut packed = 0; + for bit_idx in 0..len { + packed |= (f(bit_idx) as u64) << bit_idx; + } + packed +} + +/// Pack `len` boolean values returned by `f` into the prefix of `words`, LSB-first, +/// 64 bits per `u64`. `words` must have capacity for at least `len.div_ceil(64)` entries. +/// +/// Writes via `=` (not `|=`), so the destination need not be zero-initialised. +#[inline] +pub fn collect_bool_words(words: &mut [u64], len: usize, mut f: F) +where + F: FnMut(usize) -> bool, +{ + let num_words = len.div_ceil(64); + assert!( + words.len() >= num_words, + "words slice has {} entries, need at least {num_words}", + words.len(), + ); + + let full = len / 64; + let remainder = len % 64; + + for word_idx in 0..full { + let offset = word_idx * 64; + words[word_idx] = collect_bool_word(64, |bit_idx| f(offset + bit_idx)); + } + + if remainder != 0 { + let offset = full * 64; + words[full] = collect_bool_word(remainder, |bit_idx| f(offset + bit_idx)); + } +} + +/// Splice a packed word `w` (whose bits above the highest valid bit are zero) into +/// `words` at the given bit position. +/// +/// The destination word at `bit_offset / 64` is OR'd, preserving any bits below +/// `bit_offset % 64`. When `w` has high bits that spill into the next word, those +/// bits are *assigned* (not OR'd) — so callers must ensure that next slot is zero +/// (e.g. via `BufferMut::zeroed`). +/// +/// `words.len()` need only cover the slots `w` actually writes to: skipping the +/// spillover when its bits are all zero means a tail that fits entirely in the +/// leading word never touches `words[dest_word + 1]`. +#[inline] +pub fn splice_word_at_bit(words: &mut [u64], bit_offset: usize, word: u64) { + let dest_word = bit_offset / 64; + let bit_in_word = bit_offset % 64; + words[dest_word] |= word << bit_in_word; + if bit_in_word != 0 { + let high = word >> (64 - bit_in_word); + if high != 0 { + words[dest_word + 1] = high; + } + } +} + +/// Pack `len` boolean values returned by `f` into `words` starting at bit position +/// `bit_offset`, LSB-first. +/// +/// Composes [`collect_bool_word`] (pack up to 64 bools into a u64) with +/// [`splice_word_at_bit`] (merge the packed word into the destination via shift-OR). +/// +/// `words` must have at least `(bit_offset + len).div_ceil(64)` entries; see +/// [`splice_word_at_bit`] for zero-init requirements on words above the cursor. +#[inline] +pub fn pack_bools_into_words(words: &mut [u64], bit_offset: usize, len: usize, mut f: F) +where + F: FnMut(usize) -> bool, +{ + if len == 0 { + return; + } + let num_words = (bit_offset + len).div_ceil(64); + assert!( + words.len() >= num_words, + "words slice has {} entries, need at least {num_words}", + words.len(), + ); + + let mut done = 0; + while len - done >= 64 { + let word = collect_bool_word(64, |bit| f(done + bit)); + splice_word_at_bit(words, bit_offset + done, word); + done += 64; + } + let tail = len - done; + if tail > 0 { + let word = collect_bool_word(tail, |bit| f(done + bit)); + splice_word_at_bit(words, bit_offset + done, word); + } +} + /// Get the bit value at `index` out of `buf`. /// /// # Panics @@ -64,3 +169,65 @@ pub unsafe fn set_bit_unchecked(buf: *mut u8, index: usize) { pub unsafe fn unset_bit_unchecked(buf: *mut u8, index: usize) { unsafe { *buf.add(index / 8) &= !(1 << (index % 8)) }; } + +#[cfg(test)] +mod tests { + use super::collect_bool_word; + use super::pack_bools_into_words; + + #[test] + fn collect_bool_word_packs_lsb_first() { + let word = collect_bool_word(5, |idx| idx.is_multiple_of(2)); + assert_eq!(word, 0b10101); + } + + #[test] + fn collect_bool_word_empty() { + assert_eq!(collect_bool_word(0, |_| true), 0); + } + + #[test] + #[should_panic(expected = "cannot pack 65 bits into a u64 word")] + fn collect_bool_word_rejects_too_many_bits() { + let _ = collect_bool_word(65, |_| true); + } + + fn pack(bit_offset: usize, len: usize, f: impl Fn(usize) -> bool) -> Vec { + let num_words = (bit_offset + len).div_ceil(64); + let mut words = vec![0u64; num_words]; + pack_bools_into_words(&mut words, bit_offset, len, &f); + (0..bit_offset + len) + .map(|i| (words[i / 64] >> (i % 64)) & 1 == 1) + .collect() + } + + #[test] + fn pack_bools_aligned_multi_word_with_tail() { + let bits = pack(0, 130, |i| i.is_multiple_of(3)); + for i in 0..130 { + assert_eq!(bits[i], i.is_multiple_of(3), "bit {i}"); + } + } + + #[test] + fn pack_bools_unaligned_crossing_words() { + let bits = pack(40, 200, |i| i.is_multiple_of(7)); + assert!(bits[..40].iter().all(|&b| !b)); + for i in 0..200 { + assert_eq!(bits[40 + i], i.is_multiple_of(7), "bit {}", 40 + i); + } + } + + #[test] + fn pack_bools_preserves_low_bits_of_leading_word() { + let mut words = vec![0u64; 2]; + words[0] = 0b11111; + pack_bools_into_words(&mut words, 5, 70, |_| true); + for i in 0..5 { + assert_eq!((words[0] >> i) & 1, 1, "preserved bit {i}"); + } + for i in 5..75 { + assert_eq!((words[i / 64] >> (i % 64)) & 1, 1, "extended bit {i}"); + } + } +}