diff --git a/Cargo.lock b/Cargo.lock index c95d367061d..875abf25e53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9623,6 +9623,7 @@ dependencies = [ "url", "vortex", "vortex-array", + "vortex-parquet-variant", "vortex-runend", "vortex-sequence", "vortex-utils", diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index b6bb0109861..6d94ae9e555 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -39,6 +39,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } url = { workspace = true } vortex = { workspace = true, features = ["files", "tokio", "object_store"] } +vortex-parquet-variant = { workspace = true } vortex-utils = { workspace = true, features = ["dashmap"] } [dev-dependencies] diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index 868b417bf6c..4d99873be99 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -23,7 +23,7 @@ const DEFAULT_DUCKDB_VERSION: &str = "1.5.3"; const BUILD_ARTIFACTS: [&str; 3] = ["libduckdb.dylib", "libduckdb.so", "libduckdb_static.a"]; -const SOURCE_FILES: [&str; 17] = [ +const SOURCE_FILES: [&str; 18] = [ "cpp/client_context.cpp", "cpp/config.cpp", "cpp/copy_function.cpp", @@ -39,6 +39,7 @@ const SOURCE_FILES: [&str; 17] = [ "cpp/table_filter.cpp", "cpp/table_function.cpp", "cpp/value.cpp", + "cpp/variant.cpp", "cpp/vector.cpp", "cpp/vector_buffer.cpp", ]; @@ -349,12 +350,22 @@ fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path) { } } -fn cpp(duckdb_include_dir: &Path) { +fn cpp( + duckdb_include_dir: &Path, + duckdb_parquet_include_dir: &Path, + duckdb_parquet_third_party_dir: &Path, + duckdb_thrift_include_dir: &Path, + duckdb_yyjson_include_dir: &Path, +) { cc::Build::new() .std("c++20") .flags(["-Wall", "-Wextra", "-Wpedantic"]) .cpp(true) .include(duckdb_include_dir) + .include(duckdb_parquet_include_dir) + .include(duckdb_parquet_third_party_dir) + .include(duckdb_thrift_include_dir) + .include(duckdb_yyjson_include_dir) .include("cpp/include") .files(SOURCE_FILES) .compile("vortex-duckdb-extras"); @@ -470,6 +481,16 @@ fn main() { let duckdb_include_dir = inner_dir.join("src").join("include"); println!("cargo:rerun-if-changed=cpp/include"); c2rust(&crate_dir, &duckdb_include_dir); - cpp(&duckdb_include_dir); + let duckdb_parquet_include_dir = inner_dir.join("extension").join("parquet").join("include"); + let duckdb_parquet_third_party_dir = inner_dir.join("third_party").join("parquet"); + let duckdb_thrift_include_dir = inner_dir.join("third_party").join("thrift"); + let duckdb_yyjson_include_dir = inner_dir.join("third_party").join("yyjson").join("include"); + cpp( + &duckdb_include_dir, + &duckdb_parquet_include_dir, + &duckdb_parquet_third_party_dir, + &duckdb_thrift_include_dir, + &duckdb_yyjson_include_dir, + ); rust2c(&crate_dir); } diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index dcad0ae1487..b59390fdb9c 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -18,5 +18,6 @@ #include "duckdb_vx/table_filter.h" #include "duckdb_vx/table_function.h" #include "duckdb_vx/value.h" +#include "duckdb_vx/variant.h" #include "duckdb_vx/vector.h" #include "duckdb_vx/vector_buffer.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/logical_type.h b/vortex-duckdb/cpp/include/duckdb_vx/logical_type.h index 2dcc91350ba..f76ac0eb885 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/logical_type.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/logical_type.h @@ -15,6 +15,7 @@ extern "C" { char *duckdb_vx_logical_type_stringify(duckdb_logical_type ty); duckdb_logical_type duckdb_vx_logical_type_copy(duckdb_logical_type ty); +duckdb_logical_type duckdb_vx_logical_type_variant(void); #ifdef __cplusplus /* End C ABI */ } diff --git a/vortex-duckdb/cpp/include/duckdb_vx/value.h b/vortex-duckdb/cpp/include/duckdb_vx/value.h index 34c38b50c11..adbd7cbcc33 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/value.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/value.h @@ -10,6 +10,10 @@ extern "C" { // Create a null value with a reference to a logical type. duckdb_value duckdb_vx_value_create_null(duckdb_logical_type ty); +// Clone a DuckDB value. +// The returned value must be freed with duckdb_destroy_value. +duckdb_value duckdb_vx_value_clone(duckdb_value value); + #ifdef __cplusplus /* End C ABI */ } #endif diff --git a/vortex-duckdb/cpp/include/duckdb_vx/variant.h b/vortex-duckdb/cpp/include/duckdb_vx/variant.h new file mode 100644 index 00000000000..2b38c0ed276 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/variant.h @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#pragma once + +#include "duckdb_vx/duckdb_diagnostics.h" + +DUCKDB_INCLUDES_BEGIN +#include "duckdb.h" +DUCKDB_INCLUDES_END + +#include "duckdb_vx/error.h" + +#ifdef __cplusplus /* If compiled as C++, use C ABI */ +extern "C" { +#endif + +duckdb_vector duckdb_vx_variant_to_parquet(duckdb_vector variant, idx_t len, duckdb_vx_error *err); + +void duckdb_vx_variant_from_parquet(duckdb_vector metadata, + duckdb_vector value, + duckdb_vector typed_value, + bool has_typed_value, + duckdb_vector out, + idx_t len, + duckdb_vx_error *err); + +#ifdef __cplusplus /* End C ABI */ +} +#endif diff --git a/vortex-duckdb/cpp/logical_type.cpp b/vortex-duckdb/cpp/logical_type.cpp index eb3ac4f3768..9e4beb194b0 100644 --- a/vortex-duckdb/cpp/logical_type.cpp +++ b/vortex-duckdb/cpp/logical_type.cpp @@ -16,6 +16,11 @@ duckdb_logical_type duckdb_vx_logical_type_copy(duckdb_logical_type ty) { return reinterpret_cast(copy.release()); } +duckdb_logical_type duckdb_vx_logical_type_variant(void) { + auto type = duckdb::make_uniq(duckdb::LogicalType::VARIANT()); + return reinterpret_cast(type.release()); +} + char *duckdb_vx_logical_type_stringify(duckdb_logical_type c_type) { auto type = reinterpret_cast(c_type); auto str = type->ToString(); diff --git a/vortex-duckdb/cpp/value.cpp b/vortex-duckdb/cpp/value.cpp index 61e96131aff..0d911740163 100644 --- a/vortex-duckdb/cpp/value.cpp +++ b/vortex-duckdb/cpp/value.cpp @@ -14,3 +14,12 @@ extern "C" duckdb_value duckdb_vx_value_create_null(duckdb_logical_type ty) { auto value = duckdb::make_uniq(*logical_type); return reinterpret_cast(value.release()); } + +extern "C" duckdb_value duckdb_vx_value_clone(duckdb_value value) { + if (!value) { + return nullptr; + } + + const auto ddb_value = reinterpret_cast(value); + return reinterpret_cast(new duckdb::Value(*ddb_value)); +} diff --git a/vortex-duckdb/cpp/variant.cpp b/vortex-duckdb/cpp/variant.cpp new file mode 100644 index 00000000000..a0779086857 --- /dev/null +++ b/vortex-duckdb/cpp/variant.cpp @@ -0,0 +1,111 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "duckdb_vx/variant.h" +#include "duckdb_vx/error.hpp" + +DUCKDB_INCLUDES_BEGIN +#include "duckdb/common/types.hpp" +#include "duckdb/common/types/data_chunk.hpp" +#include "duckdb/common/types/variant_value.hpp" +#include "duckdb/common/types/vector.hpp" +#include "duckdb/execution/expression_executor_state.hpp" +#include "duckdb/function/scalar_function.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "reader/variant/variant_shredded_conversion.hpp" +#include "writer/variant_column_writer.hpp" +DUCKDB_INCLUDES_END + +#include + +using namespace duckdb; + +namespace { + +duckdb::LogicalType UnshreddedParquetVariantType() { + duckdb::child_list_t children; + children.emplace_back("metadata", duckdb::LogicalType::BLOB); + children.emplace_back("value", duckdb::LogicalType::BLOB); + auto type = duckdb::LogicalType::STRUCT(std::move(children)); + type.SetAlias("PARQUET_VARIANT"); + return type; +} + +duckdb::LogicalType ParquetVariantGroupType(Vector &value, optional_ptr typed_value) { + duckdb::child_list_t children; + children.emplace_back("value", value.GetType()); + if (typed_value) { + children.emplace_back("typed_value", typed_value->GetType()); + } + return duckdb::LogicalType::STRUCT(std::move(children)); +} + +void SetException(duckdb_vx_error *err, const std::exception &e) { + vortex::SetError(err, e.what()); +} + +void SetUnknownException(duckdb_vx_error *err) { + vortex::SetError(err, "unknown DuckDB Variant conversion error"); +} + +} // namespace + +extern "C" duckdb_vector duckdb_vx_variant_to_parquet(duckdb_vector variant, idx_t len, duckdb_vx_error *err) { + try { + auto &variant_vector = *reinterpret_cast(variant); + auto result_type = UnshreddedParquetVariantType(); + auto result = make_uniq(result_type, len); + + DataChunk input; + input.Initialize(Allocator::DefaultAllocator(), {duckdb::LogicalType::VARIANT()}, len); + input.SetCardinality(len); + input.data[0].Reference(variant_vector); + + auto transform = VariantColumnWriter::GetTransformFunction(); + ExpressionExecutorState root; + BoundFunctionExpression expr(result_type, transform, {}, nullptr); + ExpressionState state(expr, root); + transform.function(input, state, *result); + + *err = nullptr; + return reinterpret_cast(result.release()); + } catch (const std::exception &e) { + SetException(err, e); + } catch (...) { + SetUnknownException(err); + } + return nullptr; +} + +extern "C" void duckdb_vx_variant_from_parquet(duckdb_vector metadata, + duckdb_vector value, + duckdb_vector typed_value, + bool has_typed_value, + duckdb_vector out, + idx_t len, + duckdb_vx_error *err) { + try { + auto &metadata_vector = *reinterpret_cast(metadata); + auto &value_vector = *reinterpret_cast(value); + optional_ptr typed_value_vector; + if (has_typed_value) { + typed_value_vector = reinterpret_cast(typed_value); + } + + Vector group(ParquetVariantGroupType(value_vector, typed_value_vector), len); + auto &entries = StructVector::GetEntries(group); + entries[0]->Reference(value_vector); + if (typed_value_vector) { + entries[1]->Reference(*typed_value_vector); + } + + auto values = VariantShreddedConversion::Convert(metadata_vector, group, 0, len, len); + auto &out_vector = *reinterpret_cast(out); + VariantValue::ToVARIANT(values, out_vector); + *err = nullptr; + } catch (const std::exception &e) { + SetException(err, e); + } catch (...) { + SetUnknownException(err); + } +} diff --git a/vortex-duckdb/src/convert/dtype.rs b/vortex-duckdb/src/convert/dtype.rs index f1f8ae0de9b..ddb90c5c2a0 100644 --- a/vortex-duckdb/src/convert/dtype.rs +++ b/vortex-duckdb/src/convert/dtype.rs @@ -27,6 +27,7 @@ //! | `Date` | `DATE` | //! | `Time` | `TIME` | //! | `Timestamp` | `TIMESTAMP` | +//! | `Variant` | `VARIANT` | use std::ffi::CString; use std::sync::Arc; @@ -239,9 +240,7 @@ impl TryFrom<&DType> for LogicalType { return LogicalType::try_from(struct_type); } DType::Union(..) => todo!("TODO(connor)[Union]: unimplemented"), - DType::Variant(_) => { - vortex_bail!("Vortex Variant array aren't supported in DuckDB") - } + DType::Variant(_) => return Ok(LogicalType::variant()), DType::Extension(ext_dtype) => { let Some(temporal) = ext_dtype.metadata_opt::() else { vortex_bail!("Unsupported extension type \"{}\"", ext_dtype.id()); @@ -422,6 +421,16 @@ mod tests { ); } + #[test] + fn test_variant_type() { + let dtype = DType::Variant(Nullability::Nullable); + let logical_type = LogicalType::try_from(&dtype).unwrap(); + assert_eq!( + logical_type.as_type_id(), + cpp::DUCKDB_TYPE::DUCKDB_TYPE_VARIANT + ); + } + #[test] fn test_struct_type() { let field_names = FieldNames::from([FieldName::from("field1"), FieldName::from("field2")]); diff --git a/vortex-duckdb/src/convert/scalar.rs b/vortex-duckdb/src/convert/scalar.rs index 7d0654151b3..dc260f9020a 100644 --- a/vortex-duckdb/src/convert/scalar.rs +++ b/vortex-duckdb/src/convert/scalar.rs @@ -345,6 +345,9 @@ impl<'a> TryFrom<&'a ValueRef> for Scalar { vortex_bail!("List value must be a list or struct dtype") } }, + ExtractedValue::Variant(_) => { + vortex_bail!("DuckDB Variant scalars aren't supported in Vortex") + } } } } diff --git a/vortex-duckdb/src/convert/vector.rs b/vortex-duckdb/src/convert/vector.rs index a9a58c752c4..130f93babd2 100644 --- a/vortex-duckdb/src/convert/vector.rs +++ b/vortex-duckdb/src/convert/vector.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ptr; use std::sync::Arc; use num_traits::AsPrimitive; @@ -30,6 +31,7 @@ use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::extension::datetime::TimeUnit; use vortex::mask::Mask; +use vortex_parquet_variant::ParquetVariant; use crate::cpp::DUCKDB_TYPE; use crate::cpp::duckdb_date; @@ -44,6 +46,7 @@ use crate::cpp::duckdb_timestamp_ms; use crate::cpp::duckdb_timestamp_ns; use crate::cpp::duckdb_timestamp_s; use crate::duckdb::DataChunkRef; +use crate::duckdb::Vector; use crate::duckdb::VectorRef; use crate::exporter::precision_to_duckdb_storage_size; @@ -110,6 +113,20 @@ fn vector_as_string_blob(vector: &VectorRef, len: usize, dtype: DType) -> ArrayR builder.finish() } +fn duckdb_variant_to_parquet(vector: &VectorRef, len: usize) -> VortexResult { + let mut err = ptr::null_mut(); + let parquet_vector = unsafe { + crate::cpp::duckdb_vx_variant_to_parquet(vector.as_ptr(), len as _, &raw mut err) + }; + if !err.is_null() { + return Err(crate::duckdb::ffi_error(err)); + } + if parquet_vector.is_null() { + vortex_bail!("DuckDB Variant conversion returned a null vector"); + } + Ok(unsafe { Vector::own(parquet_vector) }) +} + /// Converts a valid [`duckdb_list_entry`] to `(offset, size)`, updating tracking state. /// /// Updates `child_min_length` with the maximum end offset seen so far, and `previous_end` with this @@ -255,6 +272,27 @@ pub fn flat_vector_to_vortex(vector: &VectorRef, len: usize) -> VortexResult { + let parquet_vector = duckdb_variant_to_parquet(vector, len)?; + let metadata = vector_as_string_blob( + parquet_vector.struct_vector_get_child(0), + len, + DType::Binary(Nullability::NonNullable), + ); + let value = vector_as_string_blob( + parquet_vector.struct_vector_get_child(1), + len, + DType::Binary(Nullability::Nullable), + ); + + ParquetVariant::try_new( + vector.validity_ref(len).to_validity(), + metadata, + Some(value), + None, + ) + .map(|array| array.into_array()) + } DUCKDB_TYPE::DUCKDB_TYPE_BOOLEAN => { let data = vector.as_slice_with_len::(len); @@ -374,6 +412,7 @@ pub fn data_chunk_to_vortex( #[cfg(test)] mod tests { use std::ffi::CString; + use std::ptr; use vortex::array::LEGACY_SESSION; use vortex::array::VortexSessionExecute; @@ -383,10 +422,14 @@ mod tests { use vortex::array::arrays::struct_::StructArrayExt; use vortex::array::assert_arrays_eq; use vortex::error::VortexExpect; + use vortex::error::VortexResult; use vortex::mask::Mask; + use vortex_parquet_variant::ParquetVariant; + use vortex_parquet_variant::ParquetVariantArrayExt; use super::*; use crate::cpp::DUCKDB_TYPE; + use crate::duckdb::Database; use crate::duckdb::LogicalType; use crate::duckdb::Vector; @@ -411,6 +454,79 @@ mod tests { assert_arrays_eq!(result, expected); } + #[test] + fn test_variant_vector_bridge_round_trip() -> VortexResult<()> { + let db = Database::open_in_memory()?; + let conn = db.connect()?; + let result = conn.query( + " + SELECT v + FROM ( + VALUES + (1::VARIANT), + ('duck'::VARIANT), + ([1, 2]::VARIANT), + ({'name': 'Ada', 'age': 37}::VARIANT), + (NULL::VARIANT) + ) AS t(v) + ", + )?; + let chunk = result.into_iter().next().unwrap(); + let len = chunk.len().as_(); + let vector = chunk.get_vector(0); + vector.flatten(chunk.len()); + + let array = flat_vector_to_vortex(vector, len)?; + let parquet_variant = array.as_opt::().unwrap(); + assert_eq!( + parquet_variant.dtype(), + &DType::Variant(Nullability::Nullable) + ); + assert!(parquet_variant.value_array().is_some()); + assert!(parquet_variant.typed_value_array().is_none()); + assert!(ParquetVariantArrayExt::validity(&parquet_variant).is_null(4)?); + + let parquet_vector = duckdb_variant_to_parquet(vector, len)?; + let logical_type = LogicalType::variant(); + let mut out = Vector::with_capacity(&logical_type, len); + let mut err = ptr::null_mut(); + unsafe { + crate::cpp::duckdb_vx_variant_from_parquet( + parquet_vector.struct_vector_get_child(0).as_ptr(), + parquet_vector.struct_vector_get_child(1).as_ptr(), + ptr::null_mut(), + false, + out.as_ptr(), + len as _, + &raw mut err, + ); + } + if !err.is_null() { + return Err(crate::duckdb::ffi_error(err)); + } + unsafe { + out.set_validity(&vector.validity_ref(len).execute_mask(), 0, len); + } + + assert!(out.row_is_null(4)); + let roundtripped = duckdb_variant_to_parquet(&out, len)?; + assert_eq!( + roundtripped + .struct_vector_get_child(0) + .logical_type() + .as_type_id(), + DUCKDB_TYPE::DUCKDB_TYPE_BLOB + ); + assert_eq!( + roundtripped + .struct_vector_get_child(1) + .logical_type() + .as_type_id(), + DUCKDB_TYPE::DUCKDB_TYPE_BLOB + ); + Ok(()) + } + #[test] fn test_timestamp_vector_conversion() { let mut ctx = LEGACY_SESSION.create_execution_ctx(); diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index fa6b25b5ce7..b04d8b3a6bb 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -8,8 +8,10 @@ use futures::SinkExt; use futures::TryStreamExt; use futures::channel::mpsc; use futures::channel::mpsc::Sender; +use itertools::Itertools; use parking_lot::Mutex; use vortex::array::ArrayRef; +use vortex::array::session::ArraySessionExt; use vortex::array::stream::ArrayStreamAdapter; use vortex::dtype::DType; use vortex::dtype::Nullability::NonNullable; @@ -19,6 +21,7 @@ use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::file::WriteOptionsSessionExt; +use vortex::file::WriteStrategyBuilder; use vortex::file::WriteSummary; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::Task; @@ -135,8 +138,27 @@ impl CopyFunction for VortexCopyFunction { let writer = DuckDbFsWriter::new(ctx, &file_path) .map_err(|e| vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}"))?; - let write_task = - handle.spawn(async move { SESSION.write_options().write(writer, array_stream).await }); + // let mut allowed_encodings = ALLOWED_ENCODINGS.clone(); + // allowed_encodings.insert(ParquetVariant.id()); + let strategy = WriteStrategyBuilder::default() + .with_allow_encodings( + SESSION + .arrays() + .registry() + .ids() + .unique() + .sorted() + .collect(), + ) + .build(); + + let write_task = handle.spawn(async move { + SESSION + .write_options() + .with_strategy(strategy) + .write(writer, array_stream) + .await + }); let worker_pool = RUNTIME.new_pool(); worker_pool.set_workers_to_available_parallelism(); diff --git a/vortex-duckdb/src/duckdb/logical_type.rs b/vortex-duckdb/src/duckdb/logical_type.rs index 83bea019e32..a1a73674d5f 100644 --- a/vortex-duckdb/src/duckdb/logical_type.rs +++ b/vortex-duckdb/src/duckdb/logical_type.rs @@ -155,6 +155,10 @@ impl LogicalType { Self::new(DUCKDB_TYPE::DUCKDB_TYPE_BOOLEAN) } + pub fn variant() -> Self { + unsafe { Self::own(duckdb_vx_logical_type_variant()) } + } + pub fn float32() -> Self { Self::new(DUCKDB_TYPE::DUCKDB_TYPE_FLOAT) } diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index c42fbdaf1e4..f3553e2909a 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -23,6 +23,7 @@ mod value; mod vector; mod vector_buffer; +use std::ffi::CStr; use std::ffi::c_void; use std::ptr; @@ -46,7 +47,9 @@ pub use table_function::*; pub use value::*; pub use vector::*; pub use vector_buffer::*; +use vortex::error::VortexError; use vortex::error::VortexResult; +use vortex::error::vortex_err; use crate::cpp; @@ -88,6 +91,17 @@ pub(crate) fn try_or( } } +pub(crate) fn ffi_error(err: cpp::duckdb_vx_error) -> VortexError { + if err.is_null() { + return vortex_err!("DuckDB error (unknown)"); + } + let message = unsafe { CStr::from_ptr(cpp::duckdb_vx_error_value(err)) } + .to_string_lossy() + .to_string(); + unsafe { cpp::duckdb_vx_error_free(err) }; + vortex_err!("{message}") +} + /// Creates a function that drops a `Box` when called. extern "C-unwind" fn drop_boxed(ptr: *mut c_void) { // Safety: We assume that the pointer is valid and points to a Box. diff --git a/vortex-duckdb/src/duckdb/value.rs b/vortex-duckdb/src/duckdb/value.rs index 752f1ccca70..26e86bc5a04 100644 --- a/vortex-duckdb/src/duckdb/value.rs +++ b/vortex-duckdb/src/duckdb/value.rs @@ -156,6 +156,9 @@ impl ValueRef { }) .collect::>(), ), + DUCKDB_TYPE::DUCKDB_TYPE_VARIANT => ExtractedValue::Variant(unsafe { + Value::own(cpp::duckdb_vx_value_clone(self.as_ptr())) + }), // ...other types remain unimplemented.. other => vortex_panic!("Unsupported DuckDB value type {other:?}"), } @@ -404,13 +407,41 @@ pub enum ExtractedValue { TimestampS(i64), Decimal(u8, i8, i128), List(Vec), + Variant(Value), } #[cfg(test)] mod tests { + use vortex::error::VortexResult; + use vortex::error::vortex_err; + + use crate::duckdb::Database; + use crate::duckdb::ExtractedValue; use crate::duckdb::i128_from_parts; use crate::duckdb::u128_from_parts; + #[test] + fn extract_variant_value() -> VortexResult<()> { + let db = Database::open_in_memory()?; + let conn = db.connect()?; + let result = conn.query("SELECT 42::VARIANT")?; + let chunk = result + .into_iter() + .next() + .ok_or_else(|| vortex_err!("expected a result chunk"))?; + let value = chunk + .get_vector(0) + .get_value(0, chunk.len()) + .ok_or_else(|| vortex_err!("expected a result value"))?; + + let ExtractedValue::Variant(extracted) = value.extract() else { + return Err(vortex_err!("expected Variant extracted value")); + }; + + assert_eq!(extracted.to_string(), "42"); + Ok(()) + } + #[test] fn test_huge_int_from_parts() { assert_eq!(i128_from_parts(0, 0), 0i128); diff --git a/vortex-duckdb/src/exporter/canonical.rs b/vortex-duckdb/src/exporter/canonical.rs index a46b22df6f9..77762502e04 100644 --- a/vortex-duckdb/src/exporter/canonical.rs +++ b/vortex-duckdb/src/exporter/canonical.rs @@ -4,8 +4,10 @@ use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::ExecutionCtx; use vortex::array::arrays::TemporalArray; +use vortex::array::arrays::variant::VariantArrayExt; use vortex::error::VortexResult; use vortex::error::vortex_bail; +use vortex_parquet_variant::ParquetVariant; use crate::exporter::ColumnExporter; use crate::exporter::ConversionCache; @@ -18,12 +20,14 @@ use crate::exporter::primitive; use crate::exporter::struct_; use crate::exporter::temporal; use crate::exporter::varbinview; +use crate::exporter::variant; pub(crate) fn new_exporter( array: ArrayRef, cache: &ConversionCache, ctx: &mut ExecutionCtx, ) -> VortexResult> { + let encoding_id = array.encoding_id(); match array.execute::(ctx)? { Canonical::Null(_) => Ok(all_invalid::new_exporter()), Canonical::Bool(array) => bool::new_exporter(array, ctx), @@ -39,8 +43,79 @@ pub(crate) fn new_exporter( } vortex_bail!("no non-temporal extension exporter") } - Canonical::Variant(_) => { - vortex_bail!("Variant arrays can't be exported to DuckDB") + Canonical::Variant(array) => { + let core_storage = array.core_storage().clone(); + let Ok(parquet_variant) = core_storage.execute_until::(ctx) else { + vortex_bail!( + "Variant arrays can't be exported to DuckDB from {encoding_id}: core storage is not ParquetVariant" + ); + }; + let Ok(parquet_variant) = parquet_variant.try_downcast::() else { + vortex_bail!( + "Variant arrays can't be exported to DuckDB from {encoding_id}: core storage is not ParquetVariant" + ); + }; + variant::new_exporter(parquet_variant, cache, ctx) } } } + +#[cfg(test)] +mod tests { + use vortex::array::Canonical; + use vortex::array::IntoArray; + use vortex::array::VortexSessionExecute; + use vortex::array::arrays::VarBinViewArray; + use vortex::array::validity::Validity; + use vortex::dtype::DType; + use vortex::dtype::Nullability; + use vortex::error::vortex_err; + use vortex_parquet_variant::ParquetVariant; + + use super::*; + use crate::SESSION; + use crate::duckdb::DataChunk; + use crate::duckdb::LogicalType; + + #[test] + fn exports_canonical_variant_backed_by_parquet_variant() -> VortexResult<()> { + let metadata = VarBinViewArray::from_iter( + [Some(&b"unused"[..]); 3], + DType::Binary(Nullability::NonNullable), + ) + .into_array(); + let value = VarBinViewArray::from_iter( + [Option::<&[u8]>::None; 3], + DType::Binary(Nullability::Nullable), + ) + .into_array(); + let parquet_variant = + ParquetVariant::try_new(Validity::AllInvalid, metadata, Some(value), None)?; + + let mut ctx = SESSION.create_execution_ctx(); + let Canonical::Variant(variant) = parquet_variant + .into_array() + .execute::(&mut ctx)? + else { + return Err(vortex_err!("expected canonical Variant")); + }; + + let mut chunk = DataChunk::new([LogicalType::variant()]); + let cache = ConversionCache::default(); + new_exporter(variant.into_array(), &cache, &mut ctx)?.export( + 0, + 3, + chunk.get_vector_mut(0), + &mut ctx, + )?; + chunk.set_len(3); + + assert_eq!( + format!("{}", String::try_from(&*chunk)?), + r#"Chunk - [1 Columns] +- CONSTANT VARIANT: 3 = [ NULL] +"# + ); + Ok(()) + } +} diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index a438adc59d8..1688c5f7931 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -18,6 +18,7 @@ mod struct_; mod temporal; mod validity; mod varbinview; +mod variant; mod vector; pub use cache::ConversionCache; @@ -35,6 +36,7 @@ use vortex::encodings::sequence::Sequence; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; +use vortex_parquet_variant::ParquetVariant; use crate::duckdb::DataChunkRef; use crate::duckdb::VectorRef; @@ -204,6 +206,18 @@ fn new_array_exporter_with_flatten( Err(array) => array, }; + let array = match array.try_downcast::() { + Ok(array) => return variant::new_exporter(array, cache, ctx), + Err(array) => array, + }; + + if array.dtype().is_variant() + && let Ok(executed) = array.clone().execute_until::(ctx) + && let Ok(array) = executed.try_downcast::() + { + return variant::new_exporter(array, cache, ctx); + } + canonical::new_exporter(array, cache, ctx) } diff --git a/vortex-duckdb/src/exporter/variant.rs b/vortex-duckdb/src/exporter/variant.rs new file mode 100644 index 00000000000..b45d3ade491 --- /dev/null +++ b/vortex-duckdb/src/exporter/variant.rs @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ptr; + +use vortex::array::ArrayRef; +use vortex::array::ExecutionCtx; +use vortex::array::validity::Validity; +use vortex::error::VortexResult; +use vortex::mask::Mask; +use vortex_parquet_variant::ParquetVariantArray; +use vortex_parquet_variant::ParquetVariantArrayExt; + +use crate::cpp; +use crate::duckdb::LogicalType; +use crate::duckdb::Value; +use crate::duckdb::Vector; +use crate::duckdb::VectorRef; +use crate::exporter::ColumnExporter; +use crate::exporter::ConversionCache; +use crate::exporter::all_invalid; + +struct VariantExporter { + validity: Mask, + metadata: Vector, + value: Vector, + typed_value: Option, +} + +pub(crate) fn new_exporter( + array: ParquetVariantArray, + cache: &ConversionCache, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let len = array.len(); + let validity = ParquetVariantArrayExt::validity(&array); + if matches!(validity, Validity::AllInvalid) { + return Ok(all_invalid::new_exporter()); + } + let validity = validity.to_array(len).execute::(ctx)?; + + let metadata = export_child( + array.metadata_array().clone(), + &LogicalType::blob(), + len, + cache, + ctx, + )?; + let value = match array.value_array() { + Some(value) => export_child(value.clone(), &LogicalType::blob(), len, cache, ctx)?, + None => all_null_blob_vector(len), + }; + let typed_value = array + .typed_value_array() + .map(|typed_value| { + let logical_type = LogicalType::try_from(typed_value.dtype())?; + export_child(typed_value.clone(), &logical_type, len, cache, ctx) + }) + .transpose()?; + + Ok(Box::new(VariantExporter { + validity, + metadata, + value, + typed_value, + })) +} + +fn export_child( + array: ArrayRef, + logical_type: &LogicalType, + len: usize, + cache: &ConversionCache, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let mut vector = Vector::with_capacity(logical_type, len); + super::new_array_exporter_with_flatten(array, cache, ctx, true)?.export( + 0, + len, + &mut vector, + ctx, + )?; + Ok(vector) +} + +fn all_null_blob_vector(len: usize) -> Vector { + let logical_type = LogicalType::blob(); + let mut vector = Vector::with_capacity(&logical_type, len); + vector.reference_value(&Value::null(&logical_type)); + vector +} + +impl ColumnExporter for VariantExporter { + fn export( + &self, + offset: usize, + len: usize, + vector: &mut VectorRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + if len == 0 { + return Ok(()); + } + + let range = offset as u64..(offset + len) as u64; + let metadata = Vector::slice(&self.metadata, range.clone()); + let value = Vector::slice(&self.value, range.clone()); + let typed_value = self + .typed_value + .as_ref() + .map(|typed_value| Vector::slice(typed_value, range)); + + let mut err = ptr::null_mut(); + unsafe { + cpp::duckdb_vx_variant_from_parquet( + metadata.as_ptr(), + value.as_ptr(), + typed_value + .as_ref() + .map_or(ptr::null_mut(), |typed_value| typed_value.as_ptr()), + typed_value.is_some(), + vector.as_ptr(), + len as _, + &raw mut err, + ); + } + if !err.is_null() { + return Err(crate::duckdb::ffi_error(err)); + } + + unsafe { + vector.set_validity(&self.validity, offset, len); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use vortex::array::IntoArray; + use vortex::array::VortexSessionExecute; + use vortex::array::arrays::VarBinViewArray; + use vortex::array::validity::Validity; + use vortex::dtype::DType; + use vortex::dtype::Nullability; + use vortex_parquet_variant::ParquetVariant; + + use super::*; + use crate::SESSION; + use crate::duckdb::DataChunk; + use crate::exporter::ConversionCache; + + #[test] + fn all_invalid_variant() -> VortexResult<()> { + let metadata = VarBinViewArray::from_iter( + [Some(&b"unused"[..]); 3], + DType::Binary(Nullability::NonNullable), + ) + .into_array(); + let value = VarBinViewArray::from_iter( + [Option::<&[u8]>::None; 3], + DType::Binary(Nullability::Nullable), + ) + .into_array(); + let array = ParquetVariant::try_new(Validity::AllInvalid, metadata, Some(value), None)?; + + let mut chunk = DataChunk::new([LogicalType::variant()]); + let mut ctx = SESSION.create_execution_ctx(); + let cache = ConversionCache::default(); + new_exporter(array, &cache, &mut ctx)?.export(0, 3, chunk.get_vector_mut(0), &mut ctx)?; + chunk.set_len(3); + + assert_eq!( + format!("{}", String::try_from(&*chunk)?), + r#"Chunk - [1 Columns] +- CONSTANT VARIANT: 3 = [ NULL] +"# + ); + Ok(()) + } +} diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 410d241a766..f4f19475532 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -9,12 +9,14 @@ use std::sync::LazyLock; use std::sync::OnceLock; use vortex::VortexSessionDefault; +use vortex::array::session::ArraySessionExt; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::io::session::RuntimeSessionExt; use vortex::session::VortexSession; +use vortex_parquet_variant::ParquetVariant; use crate::copy::VortexCopyFunction; use crate::duckdb::Database; @@ -43,8 +45,11 @@ mod e2e_test; // A global runtime for Vortex operations within DuckDB. static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); -static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::default().with_handle(RUNTIME.handle()); + session.arrays().register(ParquetVariant); + session +}); // Duckdb's logger requires a *Context as first argument which // would be hard to integrate with tracing::. We use logging for diff --git a/vortex-file/src/writer.rs b/vortex-file/src/writer.rs index 25db11e4280..e74d89fe2b9 100644 --- a/vortex-file/src/writer.rs +++ b/vortex-file/src/writer.rs @@ -149,9 +149,17 @@ impl VortexWriteOptions { // serialised array order is deterministic. The serialisation of arrays are done // parallel and with an empty context they can register their encodings to the context // in different order, changing the written bytes from run to run. - let ctx = ArrayContext::new(ALLOWED_ENCODINGS.iter().cloned().sorted().collect()) - // Configure a registry just to ensure only known encodings are interned. - .with_registry(self.session.arrays().registry().clone()); + let ctx = ArrayContext::new( + ALLOWED_ENCODINGS + .iter() + .copied() + .chain(self.session.arrays().registry().ids()) + .unique() + .sorted() + .collect(), + ) + // Configure a registry just to ensure only known encodings are interned. + .with_registry(self.session.arrays().registry().clone()); let dtype = stream.dtype().clone(); let (mut ptr, eof) = SequenceId::root().split(); diff --git a/vortex-sqllogictest/slt/duckdb/variant.slt b/vortex-sqllogictest/slt/duckdb/variant.slt new file mode 100644 index 00000000000..be16f79a407 --- /dev/null +++ b/vortex-sqllogictest/slt/duckdb/variant.slt @@ -0,0 +1,151 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +include ../setup.slt.no + +statement ok +CREATE TABLE variant_events AS +SELECT * FROM (VALUES + (1, 42::VARIANT), + (2, 'hello world'::VARIANT), + (3, [1, 2, 3]::VARIANT), + (4, {'name': 'Alice', 'age': 30}::VARIANT), + (5, {'name': 'Bob', 'nested': {'score': 99}}::VARIANT), + (6, NULL::VARIANT) +) AS t(id, data); + +query IT +SELECT id, variant_typeof(data) FROM variant_events ORDER BY id; +---- +1 INT32 +2 VARCHAR +3 ARRAY(3) +4 OBJECT(name, age) +5 OBJECT(name, nested) +6 VARIANT_NULL + +query I +COPY variant_events TO '$__TEST_DIR__/variant_events.vortex'; +---- +6 + +query IT +SELECT id, variant_typeof(data) +FROM '$__TEST_DIR__/variant_events.vortex' +ORDER BY id; +---- +1 INT32 +2 VARCHAR +3 ARRAY(3) +4 OBJECT(age, name) +5 OBJECT(name, nested) +6 VARIANT_NULL + +query ITII +SELECT + id, + CAST(data.name AS VARCHAR), + CAST(variant_extract(data, 'age') AS INTEGER), + CAST(data.nested.score AS INTEGER) +FROM '$__TEST_DIR__/variant_events.vortex' +WHERE id IN (4, 5) +ORDER BY id; +---- +4 Alice 30 NULL +5 Bob NULL 99 + +query I? +SELECT id, data +FROM '$__TEST_DIR__/variant_events.vortex' +WHERE id <= 4 +ORDER BY id; +---- +1 42 +2 hello world +3 [1, 2, 3] +4 {'age': 30, 'name': Alice} + +query I +COPY variant_events TO '$__TEST_DIR__/variant_events.parquet' (FORMAT parquet); +---- +6 + +query I +COPY (SELECT * FROM read_parquet('$__TEST_DIR__/variant_events.parquet')) +TO '$__TEST_DIR__/variant_events_from_parquet.vortex'; +---- +6 + +query ITTI +SELECT + id, + variant_typeof(data), + CAST(data.name AS VARCHAR), + CAST(data.nested.score AS INTEGER) +FROM read_parquet('$__TEST_DIR__/variant_events.parquet') +WHERE id IN (4, 5) +ORDER BY id; +---- +4 OBJECT(age, name) Alice NULL +5 OBJECT(name, nested) Bob 99 + +query ITTI +SELECT + id, + variant_typeof(data), + CAST(data.name AS VARCHAR), + CAST(data.nested.score AS INTEGER) +FROM '$__TEST_DIR__/variant_events_from_parquet.vortex' +WHERE id IN (4, 5) +ORDER BY id; +---- +4 OBJECT(age, name) Alice NULL +5 OBJECT(name, nested) Bob 99 + +statement ok +CREATE TABLE variant_people AS +SELECT * FROM (VALUES + (1, {'name': 'Alice', 'age': 30}::VARIANT), + (2, {'name': 'Bob', 'age': 40}::VARIANT), + (3, NULL::VARIANT) +) AS t(id, data); + +query I +COPY variant_people TO '$__TEST_DIR__/variant_people_shredded.parquet' ( + FORMAT parquet, + SHREDDING {'data': 'STRUCT(name VARCHAR, age INTEGER)'} +); +---- +3 + +query I +COPY (SELECT * FROM read_parquet('$__TEST_DIR__/variant_people_shredded.parquet')) +TO '$__TEST_DIR__/variant_people_shredded.vortex'; +---- +3 + +query ITTI +SELECT + id, + variant_typeof(data), + CAST(data.name AS VARCHAR), + CAST(data.age AS INTEGER) +FROM read_parquet('$__TEST_DIR__/variant_people_shredded.parquet') +ORDER BY id; +---- +1 OBJECT(age, name) Alice 30 +2 OBJECT(age, name) Bob 40 +3 VARIANT_NULL NULL NULL + +query ITTI +SELECT + id, + variant_typeof(data), + CAST(data.name AS VARCHAR), + CAST(data.age AS INTEGER) +FROM '$__TEST_DIR__/variant_people_shredded.vortex' +ORDER BY id; +---- +1 OBJECT(age, name) Alice 30 +2 OBJECT(age, name) Bob 40 +3 VARIANT_NULL NULL NULL diff --git a/vortex-sqllogictest/src/duckdb.rs b/vortex-sqllogictest/src/duckdb.rs index 182110e1f45..1527f1c24ec 100644 --- a/vortex-sqllogictest/src/duckdb.rs +++ b/vortex-sqllogictest/src/duckdb.rs @@ -254,6 +254,7 @@ impl std::fmt::Display for ValueDisplayAdapter { ExtractedValue::Decimal(_, scale, value) => { write!(f, "{}", decimal_to_str(value, scale)) } + ExtractedValue::Variant(v) => write!(f, "{v}"), // For types not specially handled by cell_to_string (dates, times, timestamps, // blobs, lists), delegate to DuckDB's native string representation. ExtractedValue::Blob(_)