diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index d9c45a08a3..7cf29bd763 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -98,7 +98,8 @@ datafusion-functions-nested = { version = "53.1.0" } backtrace = ["datafusion/backtrace"] default = ["hdfs-opendal"] hdfs = ["datafusion-comet-objectstore-hdfs"] -hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] +hdfs-opendal = ["dep:opendal", "dep:object_store_opendal", "dep:hdfs-sys", "opendal/services-hdfs"] +s3-opendal = ["dep:opendal", "dep:object_store_opendal", "opendal/services-s3"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] # exclude optional packages from cargo machete verifications diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index d68252bd9b..04d8fe29f2 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -32,8 +32,10 @@ mod iceberg_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; mod csv_scan; +mod partition_writer; pub mod projection; mod scan; mod shuffle_scan; + pub use csv_scan::init_csv_datasource_exec; pub use shuffle_scan::ShuffleScanExec; diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8ba79098d4..9eeade077e 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,27 +17,15 @@ //! Parquet writer operator for writing RecordBatches to Parquet files -use std::{ - any::Any, - collections::HashMap, - fmt, - fmt::{Debug, Formatter}, - fs::File, - sync::Arc, -}; - -#[cfg(feature = "hdfs-opendal")] -use opendal::Operator; -#[cfg(feature = "hdfs-opendal")] -use std::io::Cursor; - +use crate::execution::operators::partition_writer::PartitionedWriter; use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::is_hdfs_scheme; #[cfg(feature = "hdfs-opendal")] -use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; -use arrow::datatypes::{Schema, SchemaRef}; +use crate::parquet::parquet_support::create_hdfs_operator; +use crate::parquet::parquet_support::{is_hdfs_scheme, prepare_object_store_with_configs}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, @@ -51,142 +39,351 @@ use datafusion::{ }, }; use futures::TryStreamExt; +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] +use opendal::Operator; +use parquet::errors::ParquetError; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; +use std::collections::HashSet; +use std::fs::create_dir_all; +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] +use std::io::Cursor; +use std::path::Path; +use std::{ + any::Any, + collections::HashMap, + fmt, + fmt::{Debug, Formatter}, + fs::File, + sync::Arc, +}; use url::Url; -/// Enum representing different types of Arrow writers based on storage backend -enum ParquetWriter { - /// Writer for local file system - LocalFile(ArrowWriter), - /// Writer for HDFS or other remote storage (writes to in-memory buffer) - /// Contains the arrow writer, HDFS operator, and destination path - /// an Arrow writer writes to in-memory buffer the data converted to Parquet format - /// The opendal::Writer is created lazily on first write - #[cfg(feature = "hdfs-opendal")] - Remote( - ArrowWriter>>, - Option, - Operator, - String, - ), +// A trait abstracting over different Parquet write targets (local filesystem, HDFS, S3, etc.) +#[async_trait] +pub(crate) trait ParquetWriter: Send { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError>; + async fn close(self: Box) -> Result<(), ParquetError>; } -impl ParquetWriter { - /// Write a RecordBatch to the underlying writer - async fn write( - &mut self, - batch: &RecordBatch, - ) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => writer.write(batch), - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - hdfs_writer_opt, - op, - output_path, - ) => { - // Write batch to in-memory buffer - arrow_parquet_buffer_writer.write(batch)?; - - // Flush and get the current buffer content - arrow_parquet_buffer_writer.flush()?; - let cursor = arrow_parquet_buffer_writer.inner_mut(); - let current_data = cursor.get_ref().clone(); - - // Create HDFS writer lazily on first write - if hdfs_writer_opt.is_none() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - *hdfs_writer_opt = Some(writer); - } - - // Write the accumulated data to HDFS - if let Some(hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(current_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write batch to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - } - - // Clear the buffer after upload - cursor.get_mut().clear(); - cursor.set_position(0); - - Ok(()) - } +// A `ParquetWriter` implementation that writes directly to a local file. +struct LocalFileWriter { + writer: ArrowWriter, +} + +impl LocalFileWriter { + fn try_new(path: &str, schema: SchemaRef, props: WriterProperties) -> Result { + let local_path = path + .strip_prefix("file://") + .or_else(|| path.strip_prefix("file:")) + .unwrap_or(path); + + let output_dir = Path::new(local_path).parent().ok_or_else(|| { + DataFusionError::Execution(format!( + "Failed to extract parent directory from path '{local_path}'" + )) + })?; + + create_dir_all(output_dir).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {e}", + output_dir.display() + )) + })?; + + let file = File::create(local_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to create output file '{local_path}': {e}")) + })?; + + let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create local writer: {e}")) + })?; + + Ok(Self { writer }) + } +} + +#[async_trait] +impl ParquetWriter for LocalFileWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.writer.write(batch) + } + + async fn close(mut self: Box) -> Result<(), ParquetError> { + self.writer.close()?; + Ok(()) + } +} + +// A `ParquetWriter` implementation that streams Parquet data to remote object storage +// (HDFS, S3, etc.) via the OpenDAL abstraction layer. +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] +struct OpendalWriter { + arrow_writer: ArrowWriter>>, + opendal_writer: Option, + operator: Operator, + path: String, +} + +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] +impl OpendalWriter { + fn try_new( + operator: Operator, + path: String, + schema: SchemaRef, + props: WriterProperties, + ) -> Result { + let cursor = Cursor::new(Vec::new()); + let arrow_writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create OpenDAL arrow writer: {e}")) + })?; + Ok(Self { + arrow_writer, + opendal_writer: None, + operator, + path, + }) + } +} + +#[async_trait] +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] +impl ParquetWriter for OpendalWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.arrow_writer.write(batch)?; + self.arrow_writer.flush()?; + + let cursor = self.arrow_writer.inner_mut(); + let data = cursor.get_ref().clone(); + + if data.is_empty() { + return Ok(()); + } + + if self.opendal_writer.is_none() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if let Some(w) = &mut self.opendal_writer { + w.write(data).await.map_err(|e| { + ParquetError::External(format!("Failed to write to '{}': {e}", self.path).into()) + })?; } + + let cursor = self.arrow_writer.inner_mut(); + cursor.get_mut().clear(); + cursor.set_position(0); + + Ok(()) } - /// Close the writer and finalize the file - async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => { - writer.close()?; - Ok(()) + async fn close(mut self: Box) -> Result<(), ParquetError> { + let cursor = self.arrow_writer.into_inner()?; + let final_data = cursor.into_inner(); + + if self.opendal_writer.is_none() && !final_data.is_empty() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if !final_data.is_empty() { + if let Some(mut writer) = self.opendal_writer { + writer.write(final_data).await.map_err(|e| { + ParquetError::External( + format!("Failed to write final data to file '{}': {}", self.path, e).into(), + ) + })?; + + writer.close().await.map_err(|e| { + ParquetError::External( + format!("Failed to close writer for '{}': {}", self.path, e).into(), + ) + })?; } + } + + Ok(()) + } +} + +// A factory that inspects the destination URL and produces the appropriate +// `ParquetWriter` implementation for the target storage backend. +pub(crate) struct StorageWriterFactory; + +impl StorageWriterFactory { + // Selects and constructs a `ParquetWriter` based on the URL scheme of `output_path`. + // Supported backends: + // - **HDFS** – detected via `is_hdfs_scheme`; backed by `OpendalWriter`. + // - **S3A** – detected by scheme; backed by `OpendalWriter`. + // - **Local filesystem** – `file://`, `file:`, or a bare path; backed by `LocalFileWriter`. + pub(crate) fn create( + output_path: &str, + schema: SchemaRef, + props: WriterProperties, + runtime_env: Arc, + object_store_options: &HashMap, + ) -> Result> { + let (_, object_store_path) = prepare_object_store_with_configs( + runtime_env, + output_path.to_string(), + object_store_options, + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to prepare object store for '{output_path}': {e}" + )) + })?; + + let url = Url::parse(output_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse URL '{output_path}': {e}")) + })?; + + if is_hdfs_scheme(&url, object_store_options) { #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - mut hdfs_writer_opt, - op, - output_path, - ) => { - // Close the arrow writer to finalize parquet format - let cursor = arrow_parquet_buffer_writer.into_inner()?; - let final_data = cursor.into_inner(); - - // Create HDFS writer if not already created - if hdfs_writer_opt.is_none() && !final_data.is_empty() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - hdfs_writer_opt = Some(writer); - } - - // Write any remaining data - if !final_data.is_empty() { - if let Some(mut hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(final_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write final data to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - - // Close the HDFS writer - hdfs_writer.close().await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to close HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - } - } - - Ok(()) + { + Self::create_hdfs_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + ) + } + #[cfg(not(feature = "hdfs-opendal"))] + { + Err(DataFusionError::Execution( + "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), + )) + } + } else if Self::is_s3_scheme(&url) { + #[cfg(feature = "s3-opendal")] + { + Self::create_s3_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + object_store_options, + ) + } + #[cfg(not(feature = "s3-opendal"))] + { + Err(DataFusionError::Execution( + "S3 support is not enabled. Rebuild with the 's3-opendal' feature.".into(), + )) } + } else if Self::is_local_path(output_path) { + Ok(Box::new(LocalFileWriter::try_new( + output_path, + schema, + props, + )?)) + } else { + Err(DataFusionError::Execution(format!( + "Unsupported storage scheme in path: {output_path}" + ))) } } + + #[cfg(feature = "hdfs-opendal")] + fn create_hdfs_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + ) -> Result> { + let url_str = url.as_str(); + let op = create_hdfs_operator(url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + #[cfg(feature = "s3-opendal")] + fn create_s3_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + object_store_options: &HashMap, + ) -> Result> { + let url_str = url.as_str(); + let access_key = object_store_options + .get("fs.s3a.access.key") + .ok_or_else(|| { + DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.access.key".to_string(), + ) + })?; + let secret_key = object_store_options + .get("fs.s3a.secret.key") + .ok_or_else(|| { + DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.secret.key".to_string(), + ) + })?; + let endpoint = object_store_options.get("fs.s3a.endpoint").ok_or_else(|| { + DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint".to_string(), + ) + })?; + let region = object_store_options + .get("fs.s3a.endpoint.region") + .map(|s| s.as_str()) + .unwrap_or("us-east-1"); + let bucket_name = url.host_str().ok_or_else(|| { + DataFusionError::Execution(format!("Missing bucket name in S3 URL: {}", url_str)) + })?; + let builder = opendal::services::S3::default() + .endpoint(endpoint) + .secret_access_key(secret_key) + .access_key_id(access_key) + .region(region) + .bucket(bucket_name); + let op = Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "s3-opendal", + source: error.into(), + }) + .map(|op| op.finish()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create S3 operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn is_local_path(path: &str) -> bool { + path.starts_with("file://") || path.starts_with("file:") || !path.contains("://") + } + + fn is_s3_scheme(url: &Url) -> bool { + url.scheme() == "s3a" + } } /// Parquet writer operator that writes input batches to a Parquet file @@ -214,6 +411,7 @@ pub struct ParquetWriterExec { metrics: ExecutionPlanMetricsSet, /// Cache for plan properties cache: Arc, + partition_columns: Vec, } impl ParquetWriterExec { @@ -229,6 +427,7 @@ impl ParquetWriterExec { partition_id: i32, column_names: Vec, object_store_options: HashMap, + partition_columns: Vec, ) -> Result { // Preserve the input's partitioning so each partition writes its own file let input_partitioning = input.output_partitioning().clone(); @@ -252,6 +451,7 @@ impl ParquetWriterExec { object_store_options, metrics: ExecutionPlanMetricsSet::new(), cache, + partition_columns, }) } @@ -263,128 +463,6 @@ impl ParquetWriterExec { CompressionCodec::Snappy => Ok(Compression::SNAPPY), } } - - /// Create an Arrow writer based on the storage scheme - /// - /// # Arguments - /// * `output_file_path` - The full path to the output file - /// * `schema` - The Arrow schema for the Parquet file - /// * `props` - Writer properties including compression - /// * `runtime_env` - Runtime environment for object store registration - /// * `object_store_options` - Configuration options for object store - /// - /// # Returns - /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme - /// * `Err(DataFusionError)` - If writer creation fails - fn create_arrow_writer( - output_file_path: &str, - schema: SchemaRef, - props: WriterProperties, - _runtime_env: Arc, - object_store_options: &HashMap, - ) -> Result { - // Parse URL and match on storage scheme directly - let url = Url::parse(output_file_path).map_err(|e| { - DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e)) - })?; - - if is_hdfs_scheme(&url, object_store_options) { - #[cfg(feature = "hdfs-opendal")] - { - // Use prepare_object_store_with_configs to create and register the object store - let (_object_store_url, object_store_path) = prepare_object_store_with_configs( - _runtime_env, - output_file_path.to_string(), - object_store_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to prepare object store for '{}': {}", - output_file_path, e - )) - })?; - - // For remote storage (HDFS, S3), write to an in-memory buffer - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) - })?; - - // Create HDFS operator with configuration options using the helper function - let op = create_hdfs_operator(&url).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create HDFS operator for '{}': {}", - output_file_path, e - )) - })?; - - // HDFS writer will be created lazily on first write - // Use the path from prepare_object_store_with_configs - Ok(ParquetWriter::Remote( - arrow_parquet_buffer_writer, - None, - op, - object_store_path.to_string(), - )) - } - #[cfg(not(feature = "hdfs-opendal"))] - { - Err(DataFusionError::Execution( - "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), - )) - } - } else if output_file_path.starts_with("file://") - || output_file_path.starts_with("file:") - || !output_file_path.contains("://") - { - // Local file system - { - // For a local file system, write directly to file - // Strip file:// or file: prefix if present - let local_path = output_file_path - .strip_prefix("file://") - .or_else(|| output_file_path.strip_prefix("file:")) - .unwrap_or(output_file_path); - - // Extract the parent directory from the file path - let output_dir = std::path::Path::new(local_path).parent().ok_or_else(|| { - DataFusionError::Execution(format!( - "Failed to extract parent directory from path '{}'", - local_path - )) - })?; - - // Create the parent directory if it doesn't exist - std::fs::create_dir_all(output_dir).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output directory '{}': {}", - output_dir.display(), - e - )) - })?; - - let file = File::create(local_path).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output file '{}': {}", - local_path, e - )) - })?; - - let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { - DataFusionError::Execution(format!("Failed to create local file writer: {}", e)) - })?; - Ok(ParquetWriter::LocalFile(writer)) - } - } else { - // Unsupported storage scheme - Err(DataFusionError::Execution(format!( - "Unsupported storage scheme in path: {}", - output_file_path - ))) - } - } } impl DisplayAs for ParquetWriterExec { @@ -443,6 +521,7 @@ impl ExecutionPlan for ParquetWriterExec { self.partition_id, self.column_names.clone(), self.object_store_options.clone(), + self.partition_columns.clone(), )?)), _ => Err(DataFusionError::Internal( "ParquetWriterExec requires exactly one child".to_string(), @@ -470,7 +549,13 @@ impl ExecutionPlan for ParquetWriterExec { let compression = self.compression_to_parquet()?; let column_names = self.column_names.clone(); - assert_eq!(input_schema.fields().len(), column_names.len()); + if input_schema.fields().len() != column_names.len() { + return Err(DataFusionError::Internal(format!( + "ParquetWriterExec: column_names length ({}) does not match input schema fields ({})", + column_names.len(), + input_schema.fields().len() + ))); + } // Replace the generic column names (col_0, col_1, etc.) with the actual names let fields: Vec<_> = input_schema @@ -479,31 +564,43 @@ impl ExecutionPlan for ParquetWriterExec { .enumerate() .map(|(i, field)| Arc::new(field.as_ref().clone().with_name(&column_names[i]))) .collect(); - let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); - - // Generate part file name for this partition - // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename - let part_file = if let Some(attempt_id) = task_attempt_id { - format!( - "{}/part-{:05}-{:05}.parquet", - work_dir, self.partition_id, attempt_id - ) - } else { - format!("{}/part-{:05}.parquet", work_dir, self.partition_id) - }; - // Configure writer properties - let props = WriterProperties::builder() - .set_compression(compression) - .build(); + let output_schema = Arc::new(Schema::new(fields)); let object_store_options = self.object_store_options.clone(); - let mut writer = Self::create_arrow_writer( - &part_file, - Arc::clone(&output_schema), - props, + + // Resolve partition column indices, preserving the declared order. + let mut partition_col_indices = Vec::with_capacity(self.partition_columns.len()); + for name in &self.partition_columns { + let idx = column_names.iter().position(|c| c == name).ok_or_else(|| { + DataFusionError::Execution(format!( + "Partition column '{name}' not found among output columns {column_names:?}" + )) + })?; + partition_col_indices.push(idx); + } + let part_set: HashSet = partition_col_indices.iter().copied().collect(); + let data_col_indices: Vec = (0..output_schema.fields().len()) + .filter(|i| !part_set.contains(i)) + .collect(); + let data_fields: Vec = data_col_indices + .iter() + .map(|&i| Arc::clone(&output_schema.fields()[i])) + .collect(); + + let data_schema = Arc::new(Schema::new(data_fields)); + + let mut writer = PartitionedWriter::new( + work_dir, + self.partition_id, + task_attempt_id, + Arc::clone(&data_schema), + partition_col_indices, + self.partition_columns.clone(), + data_col_indices, + compression, runtime_env, - &object_store_options, + object_store_options, )?; // Clone schema for use in async closure @@ -533,33 +630,36 @@ impl ExecutionPlan for ParquetWriterExec { batch }; - writer.write(&renamed_batch).await.map_err(|e| { + writer.write_batch(&renamed_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; } - writer.close().await.map_err(|e| { + let written_paths = writer.close().await.map_err(|e| { DataFusionError::Execution(format!("Failed to close writer: {}", e)) })?; // Get file size - strip file:// prefix if present for local filesystem access - let local_path = part_file - .strip_prefix("file://") - .or_else(|| part_file.strip_prefix("file:")) - .unwrap_or(&part_file); - let file_size = std::fs::metadata(local_path) - .map(|m| m.len() as i64) - .unwrap_or(0); + let mut total_bytes = 0i64; + for path in &written_paths { + let local = strip_file_scheme(path); + total_bytes += std::fs::metadata(local) + .map(|m| m.len() as i64) + .unwrap_or(0); + } // Update metrics with write statistics - files_written.add(1); - bytes_written.add(file_size as usize); + files_written.add(written_paths.len()); + bytes_written.add(total_bytes as usize); rows_written.add(total_rows as usize); // Log metadata for debugging eprintln!( - "Wrote Parquet file: path={}, size={}, rows={}", - part_file, file_size, total_rows + "ParquetWriterExec wrote {} file(s), {} bytes, {} rows; paths={:?}", + written_paths.len(), + total_bytes, + total_rows, + written_paths ); // Return empty stream to indicate completion @@ -574,6 +674,12 @@ impl ExecutionPlan for ParquetWriterExec { } } +fn strip_file_scheme(path: &str) -> &str { + path.strip_prefix("file://") + .or_else(|| path.strip_prefix("file:")) + .unwrap_or(path) +} + #[cfg(test)] mod tests { use super::*; @@ -755,7 +861,7 @@ mod tests { let full_output_path = format!("hdfs://namenode:9000{}", output_path); let session_ctx = datafusion::prelude::SessionContext::new(); let runtime_env = session_ctx.runtime_env(); - let mut writer = ParquetWriterExec::create_arrow_writer( + let mut writer = StorageWriterFactory::create( &full_output_path, create_test_record_batch(1)?.schema(), props, @@ -767,7 +873,7 @@ mod tests { for i in 1..=5 { let record_batch = create_test_record_batch(i)?; - writer.write(&record_batch).await.map_err(|e| { + writer.write_batch(&record_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) })?; @@ -828,6 +934,7 @@ mod tests { 0, // partition_id column_names, HashMap::new(), // object_store_options + Vec::new(), )?; // Create a session context and execute the plan @@ -849,4 +956,66 @@ mod tests { Ok(()) } + + #[tokio::test] + #[cfg(feature = "s3-opendal")] + #[ignore = "This test requires a running S3 cluster"] + async fn test_write_to_s3() -> Result<()> { + // Configure output path + let output_path = "s3a://comet/test_parquet_write"; + + // Configure writer properties + let props = WriterProperties::builder() + .set_compression(Compression::UNCOMPRESSED) + .build(); + + let session_ctx = datafusion::prelude::SessionContext::new(); + let runtime_env = session_ctx.runtime_env(); + + let mut object_store_options: HashMap = HashMap::new(); + object_store_options.insert("fs.s3a.access.key".to_string(), "admin".to_string()); + object_store_options.insert( + "fs.s3a.secret.key".to_string(), + "adminsecretkey".to_string(), + ); + object_store_options.insert( + "fs.s3a.endpoint".to_string(), + "http://localhost:9000".to_string(), + ); + + let mut writer = StorageWriterFactory::create( + &output_path, + create_test_record_batch(1)?.schema(), + props, + runtime_env, + &object_store_options, + )?; + + // Write 5 batches in a loop + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + writer.write_batch(&record_batch).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) + })?; + + println!( + "Successfully wrote batch {} (1000 rows) using ParquetWriter", + i + ); + } + + // Close the writer + writer + .close() + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {}", e)))?; + + println!( + "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to S3 at {}", + output_path + ); + + Ok(()) + } } diff --git a/native/core/src/execution/operators/partition_writer.rs b/native/core/src/execution/operators/partition_writer.rs new file mode 100644 index 0000000000..1c5904f8ff --- /dev/null +++ b/native/core/src/execution/operators/partition_writer.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::operators::parquet_writer::{ParquetWriter, StorageWriterFactory}; +use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array}; +use arrow::datatypes::{DataType, SchemaRef}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::runtime_env::RuntimeEnv; +use futures::future::try_join_all; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; +use std::collections::HashMap; +use std::sync::Arc; + +/// Placeholder directory name used for `null`/empty partition values. +/// Mirrors Spark's `ExternalCatalogUtils.DEFAULT_PARTITION_NAME`. +const DEFAULT_PARTITION_NAME: &str = "__HIVE_DEFAULT_PARTITION__"; + +pub(crate) struct PartitionedWriter { + /// Open writers keyed by their partition sub-directory (e.g. `a=1/b=2`). + writers: HashMap>, + work_dir: String, + partition_id: i32, + task_attempt_id: Option, + /// Schema of the data actually written to the files (partition columns removed). + data_schema: SchemaRef, + /// Indices (into the renamed output schema) of the partition columns. + partition_col_indices: Vec, + /// Names of the partition columns, in declared order. + partition_col_names: Vec, + /// Indices (into the renamed output schema) of the data columns. + data_col_indices: Vec, + compression: Compression, + runtime_env: Arc, + object_store_options: HashMap, +} + +// Characters that must be escaped, in addition to all control chars (< 0x20). +const NEEDS_ESCAPE: &[char] = &[ + '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u{007F}', '{', '[', ']', '^', +]; + +/// Escape a string so it is safe to use as a single path component of a +/// Hive-style partition directory (`col=value`). +/// +/// This mirrors Spark/Hive `ExternalCatalogUtils.escapePathName`: control +/// characters and a fixed set of special characters are percent-encoded as +/// `%XX` (upper-case hex). The result must match Spark exactly so that the +/// directory layout produced natively is readable by Spark and any catalog. +fn escape_path_name(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for c in value.chars() { + if (c as u32) < 0x20 || NEEDS_ESCAPE.contains(&c) { + // Percent-encode the byte value (chars here are always < 0x80). + out.push_str(&format!("%{:02X}", c as u32)); + } else { + out.push(c); + } + } + out +} + +impl PartitionedWriter { + #[allow(clippy::too_many_arguments)] + pub(super) fn new( + work_dir: String, + partition_id: i32, + task_attempt_id: Option, + data_schema: SchemaRef, + partition_col_indices: Vec, + partition_col_names: Vec, + data_col_indices: Vec, + compression: Compression, + runtime_env: Arc, + object_store_options: HashMap, + ) -> Result { + // `new` only assembles the struct; it performs no IO. All writer + // creation (including the non-partitioned single-file case) happens + // lazily in the async path so that the storage factory's internal + // `block_on` always runs on a blocking thread and never on a tokio + // worker. See `writer_for` / `ensure_default_writer`. + Ok(Self { + writers: HashMap::new(), + work_dir, + partition_id, + task_attempt_id, + data_schema, + partition_col_indices, + partition_col_names, + data_col_indices, + compression, + runtime_env, + object_store_options, + }) + } + + /// Build the absolute part-file path for a given partition sub-directory. + fn build_path(&self, subdir: &str) -> String { + let dir = if subdir.is_empty() { + self.work_dir.clone() + } else { + format!("{}/{}", self.work_dir, subdir) + }; + match self.task_attempt_id { + Some(attempt_id) => format!( + "{}/part-{:05}-{:05}.parquet", + dir, self.partition_id, attempt_id + ), + None => format!("{}/part-{:05}.parquet", dir, self.partition_id), + } + } + + /// Synchronously construct a writer from owned arguments. + /// + /// The storage factory may call `block_on` internally (e.g. for S3), so this + /// function MUST NOT run on a tokio worker thread. It is only ever invoked + /// from inside a `spawn_blocking` closure (see `writer_for`). + fn build_writer( + path: String, + schema: SchemaRef, + compression: Compression, + runtime_env: Arc, + object_store_options: HashMap, + ) -> Result> { + let props = WriterProperties::builder() + .set_compression(compression) + .build(); + StorageWriterFactory::create(&path, schema, props, runtime_env, &object_store_options) + } + + /// Return the writer for `subdir`, creating it on first use. + /// + /// All values needed by the factory are cloned into owned locals *before* + /// the `.await`, so no borrow of `self` is held across the suspension point. + /// This keeps the future `Send` (requiring only `PartitionedWriter: Send`) + /// instead of forcing `Sync`/`dyn ParquetWriter: Sync`. + async fn writer_for(&mut self, subdir: &str) -> Result<&mut Box> { + if !self.writers.contains_key(subdir) { + // Take everything we need by value before the await. + let path = self.build_path(subdir); + let schema = Arc::clone(&self.data_schema); + let compression = self.compression; + let runtime_env = Arc::clone(&self.runtime_env); + let object_store_options = self.object_store_options.clone(); + + // The factory may call `block_on` internally (S3), so run it on a + // blocking thread to avoid "Cannot start a runtime from within a + // runtime" on the tokio worker. + let writer = tokio::task::spawn_blocking(move || { + Self::build_writer(path, schema, compression, runtime_env, object_store_options) + }) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Writer creation task failed to join: {e}")) + })??; + + self.writers.insert(subdir.to_string(), writer); + } + Ok(self.writers.get_mut(subdir).unwrap()) + } + + /// Compute, for each row, the partition sub-directory string `col=val/...`. + /// Partition columns are cast to UTF-8 to obtain their string form; `null` + /// (or empty) values map to `__HIVE_DEFAULT_PARTITION__`. + fn partition_subdirs(&self, batch: &RecordBatch) -> Result> { + let num_rows = batch.num_rows(); + if self.partition_col_indices.is_empty() { + return Ok(vec![String::new(); num_rows]); + } + + // Cast each partition column to Utf8 once for the whole batch. + let mut casted: Vec = Vec::with_capacity(self.partition_col_indices.len()); + for &col_idx in &self.partition_col_indices { + let arr = arrow::compute::cast(batch.column(col_idx).as_ref(), &DataType::Utf8) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to cast partition column to string: {e}" + )) + })?; + casted.push(arr); + } + let casted: Vec<&StringArray> = casted + .iter() + .map(|a| { + a.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal( + "Partition column did not cast to StringArray".to_string(), + ) + }) + }) + .collect::>()?; + + let mut subdirs = Vec::with_capacity(num_rows); + for row in 0..num_rows { + let mut subdir = String::new(); + for (k, name) in self.partition_col_names.iter().enumerate() { + if k > 0 { + subdir.push('/'); + } + subdir.push_str(&escape_path_name(name)); + subdir.push('='); + let arr = casted[k]; + if arr.is_null(row) || arr.value(row).is_empty() { + subdir.push_str(DEFAULT_PARTITION_NAME); + } else { + subdir.push_str(&escape_path_name(arr.value(row))); + } + } + subdirs.push(subdir); + } + Ok(subdirs) + } + + /// Split `batch` by partition key and append each group to its writer. + pub(crate) async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + let subdirs = self.partition_subdirs(batch)?; + + // Group row indices by their partition sub-directory. + let mut groups: HashMap> = HashMap::new(); + for (row, subdir) in subdirs.into_iter().enumerate() { + groups.entry(subdir).or_default().push(row as u32); + } + + for (subdir, rows) in groups { + // Project to data columns only (partition columns live in the path). + let indices = UInt32Array::from(rows); + let mut columns: Vec = Vec::with_capacity(self.data_col_indices.len()); + for &col_idx in &self.data_col_indices { + let taken = arrow::compute::take(batch.column(col_idx).as_ref(), &indices, None) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to take partition rows: {e}")) + })?; + columns.push(taken); + } + let sub_batch = + RecordBatch::try_new(Arc::clone(&self.data_schema), columns).map_err(|e| { + DataFusionError::Execution(format!("Failed to build partition sub-batch: {e}")) + })?; + + let writer = self.writer_for(&subdir).await?; + writer + .write_batch(&sub_batch) + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to write batch: {e}")))?; + } + Ok(()) + } + + /// Close every open writer concurrently, returning the absolute paths of + /// all part-files that were actually created (with their partition + /// sub-directories). An empty input yields an empty Vec — no phantom files. + pub(crate) async fn close(self) -> Result> { + // Compute paths *before* consuming `self.writers` (build_path borrows &self). + let paths: Vec = self.writers.keys().map(|s| self.build_path(s)).collect(); + + let closes = self.writers.into_values().map(|w| w.close()); + try_join_all(closes) + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {e}")))?; + + Ok(paths) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c07a92d700..09cc744c8a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1682,6 +1682,7 @@ impl PhysicalPlanner { self.partition, writer.column_names.clone(), object_store_options, + writer.partition_columns.clone(), )?); Ok(( diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 19a2d774a0..64e5c7998c 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -149,6 +149,7 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled( "jemalloc" => cfg!(feature = "jemalloc"), "hdfs" => cfg!(feature = "hdfs"), "hdfs-opendal" => cfg!(feature = "hdfs-opendal"), + "s3-opendal" => cfg!(feature = "s3-opendal"), _ => false, // Unknown features return false }; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 15265f1d86..0e19ff4a26 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -355,6 +355,7 @@ message ParquetWriter { // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in // the map. map object_store_options = 8; + repeated string partition_columns = 9; } enum AggregateMode { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 4ae73565c6..62b8363120 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -46,6 +46,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd") + private val supportedFilesystemProtocols = Set("file", "hdfs", "s3a") + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED) @@ -58,17 +60,16 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case cmd: InsertIntoHadoopFsRelationCommand => cmd.fileFormat match { case _: ParquetFileFormat => - if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString - .startsWith("hdfs:")) { - return Unsupported(Some("Supported output filesystems: local, HDFS")) + if (!isSupportedFilesystemProtocol(cmd.outputPath.toString)) { + return Unsupported(Some("Supported output filesystems: local, HDFS, S3")) } if (cmd.bucketSpec.isDefined) { return Unsupported(Some("Bucketed writes are not supported")) } - if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { - return Unsupported(Some("Partitioned writes are not supported")) + if (cmd.staticPartitions.nonEmpty) { + return Unsupported(Some("Static partitions writes are not supported")) } val codec = parseCompressionCodec(cmd) @@ -144,6 +145,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec writerOpBuilder.putObjectStoreOptions(key, value) } + writerOpBuilder.addAllPartitionColumns(cmd.partitionColumns.map(_.name).asJava) + val writerOp = writerOpBuilder.build() val writerOperator = Operator @@ -203,6 +206,11 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId) } + private def isSupportedFilesystemProtocol(outputPath: String) = { + supportedFilesystemProtocols + .exists(protocol => outputPath.startsWith(s"${protocol}:")) + } + private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = { cmd.options .getOrElse( diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f6795b91a3..fe2ad2fc3b 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -23,7 +23,7 @@ import java.io.File import scala.util.Random -import org.apache.spark.sql.{CometTestBase, DataFrame, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -35,8 +35,285 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOpt class CometParquetWriterSuite extends CometTestBase { + private val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + import testImplicits._ + test("simple partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(2, 'b')") + sql(s"INSERT INTO $table VALUES(3, 'c')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + Seq((1, "a"), (2, "b"), (3, "c")).foreach { case (col1, col2) => + val rows = spark.read + .parquet(s"$outputPath/col1=$col1") + .collect() + assert(rows.length == 1) + assert(rows.head.getAs[String]("col2") == col2) + } + } + } + } + } + + test("default hive partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(null, 'a')") + sql(s"INSERT INTO $table VALUES(null, 'b')") + sql(s"INSERT INTO $table VALUES(null, 'c')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read + .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME") + .collect() + assert(rows.length == 3) + assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "b", "c").sorted) + } + } + } + } + + test("multiple partition columns parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING, col3 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'x', 'a')") + sql(s"INSERT INTO $table VALUES(1, 'y', 'b')") + sql(s"INSERT INTO $table VALUES(2, 'x', 'c')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1", "col2") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + Seq((1, "x", "a"), (1, "y", "b"), (2, "x", "c")).foreach { case (col1, col2, col3) => + val rows = spark.read + .parquet(s"$outputPath/col1=$col1/col2=$col2") + .collect() + assert(rows.length == 1) + assert(rows.head.getAs[String]("col3") == col3) + } + } + } + } + } + + test("multiple rows per partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(1, 'b')") + sql(s"INSERT INTO $table VALUES(1, 'c')") + sql(s"INSERT INTO $table VALUES(2, 'd')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read.parquet(s"$outputPath/col1=1").collect() + assert(rows.length == 3) + assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "b", "c")) + val rows2 = spark.read.parquet(s"$outputPath/col1=2").collect() + assert(rows2.length == 1) + assert(rows2.head.getAs[String]("col2") == "d") + } + } + } + } + + test("append mode partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(2, 'b')") + captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + val plan = captureWritePlan( + path => + sql("SELECT 1 AS col1, 'c' AS col2").write + .partitionBy("col1") + .mode(SaveMode.Append) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows1 = spark.read.parquet(s"$outputPath/col1=1").collect() + assert(rows1.length == 2) + assert(rows1.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "c")) + } + } + } + } + + test("long type partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 BIGINT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(10000000000, 'a')") + sql(s"INSERT INTO $table VALUES(20000000000, 'b')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + Seq((10000000000L, "a"), (20000000000L, "b")).foreach { case (col1, col2) => + val rows = spark.read.parquet(s"$outputPath/col1=$col1").collect() + assert(rows.length == 1) + assert(rows.head.getAs[String]("col2") == col2) + } + } + } + } + } + + test("string partition with special characters parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 STRING, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES('a b', 'x')") + sql(s"INSERT INTO $table VALUES('a/b', 'y')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read.parquet(outputPath).collect() + assert(rows.length == 2) + assert( + rows.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2"))).toSeq.sorted == + Seq(("a b", "x"), ("a/b", "y")).sorted) + } + } + } + } + + test("empty dataframe partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table WHERE col1 IS NOT NULL").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read.parquet(outputPath).collect() + assert(rows.isEmpty) + } + } + } + } + + test("mixed null and non-null partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(null, 'b')") + sql(s"INSERT INTO $table VALUES(2, 'c')") + sql(s"INSERT INTO $table VALUES(null, 'd')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val nullRows = spark.read + .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME") + .collect() + assert(nullRows.length == 2) + assert(nullRows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("b", "d")) + assert(spark.read.parquet(s"$outputPath/col1=1").collect().length == 1) + assert(spark.read.parquet(s"$outputPath/col1=2").collect().length == 1) + } + } + } + } + test("basic parquet write") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath