From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 01/10] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 02/10] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 9da0edb7f4e7801611bd7167b896cb9f0801a59f Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sat, 2 May 2026 21:36:01 +0400 Subject: [PATCH 03/10] WIP --- native/Cargo.lock | 53 +++++++++++++++++++ native/core/Cargo.toml | 2 +- .../src/execution/operators/parquet_writer.rs | 4 +- native/core/src/parquet/parquet_support.rs | 15 ++++++ .../operator/CometDataWritingCommand.scala | 12 +++-- 5 files changed, 81 insertions(+), 5 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index b7416c3bbe..02c12ac3b6 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -4635,6 +4635,7 @@ dependencies = [ "opendal-layer-retry", "opendal-layer-timeout", "opendal-service-hdfs", + "opendal-service-s3", ] [[package]] @@ -4717,6 +4718,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-s3" +version = "0.55.0" +source = "git+https://github.com/apache/opendal?rev=6909efcdfd12b3b2ac3a76f654c35ee576811512#6909efcdfd12b3b2ac3a76f654c35ee576811512" +dependencies = [ + "base64", + "bytes", + "crc32c", + "http 1.4.0", + "log", + "md-5", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "url", +] + [[package]] name = "openssl-probe" version = "0.2.1" @@ -5558,6 +5579,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "reqsign-aws-v4" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43" +dependencies = [ + "anyhow", + "bytes", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "quick-xml 0.39.2", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", + "serde_urlencoded", + "sha1", +] + [[package]] name = "reqsign-core" version = "3.0.0" @@ -5580,6 +5622,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "reqsign-file-read-tokio" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5" +dependencies = [ + "anyhow", + "reqsign-core", + "tokio", +] + [[package]] name = "reqwest" version = "0.12.28" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index d54a03b7b6..3a26a7b72d 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -72,7 +72,7 @@ datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] } object_store_opendal = { git = "https://github.com/apache/opendal", rev = "6909efcdfd12b3b2ac3a76f654c35ee576811512", package = "object_store_opendal", optional = true} hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} -opendal = { git = "https://github.com/apache/opendal", rev = "6909efcdfd12b3b2ac3a76f654c35ee576811512", optional = true, features = ["services-hdfs"] } +opendal = { git = "https://github.com/apache/opendal", rev = "6909efcdfd12b3b2ac3a76f654c35ee576811512", optional = true, features = ["services-hdfs", "services-s3"] } iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } serde_json = "1.0" diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8ba79098d4..284095e113 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -32,7 +32,7 @@ use opendal::Operator; use std::io::Cursor; use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::is_hdfs_scheme; +use crate::parquet::parquet_support::{is_hdfs_scheme, is_s3_scheme}; #[cfg(feature = "hdfs-opendal")] use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; use arrow::datatypes::{Schema, SchemaRef}; @@ -335,6 +335,8 @@ impl ParquetWriterExec { "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), )) } + } else if is_s3_scheme(&url) { + } else if output_file_path.starts_with("file://") || output_file_path.starts_with("file:") || !output_file_path.contains("://") diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3418a17c43..7ee2c2de33 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -372,6 +372,11 @@ pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) } } +pub fn is_s3_scheme(url: &Url) -> bool { + let scheme = url.scheme(); + scheme == "s3a" +} + // Creates an HDFS object store from a URL using the native HDFS implementation #[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))] fn create_hdfs_object_store( @@ -404,6 +409,16 @@ pub(crate) fn create_hdfs_operator(url: &Url) -> Result Result { + let builder = opendal::services::S3::default(); + opendal::Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "hdfs-opendal", + source: error.into(), + }) + .map(|op| op.finish()) +} + // Creates an HDFS object store from a URL using OpenDAL #[cfg(feature = "hdfs-opendal")] pub(crate) fn create_hdfs_object_store( diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 69b9bd5f85..b600740cb0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -46,6 +46,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec private val supportedCompressionCodes = Set("none", "snappy", "lz4", "zstd") + private val supportedFilesystemProtocols = Set("file", "hdfs", "s3a") + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED) @@ -58,9 +60,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case cmd: InsertIntoHadoopFsRelationCommand => cmd.fileFormat match { case _: ParquetFileFormat => - if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString - .startsWith("hdfs:")) { - return Unsupported(Some("Supported output filesystems: local, HDFS")) + if (!isSupportedFilesystemProtocol(cmd.outputPath.toString)) { + return Unsupported(Some("Supported output filesystems: local, HDFS, S3")) } if (cmd.bucketSpec.isDefined) { @@ -204,6 +205,11 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId) } + private def isSupportedFilesystemProtocol(outputPath: String) = { + supportedFilesystemProtocols + .exists(protocol => outputPath.startsWith(s"${protocol}:")) + } + private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = { cmd.options .getOrElse( From f2bce23062d47b56e7c64cf66ab3fbf876a0fdd4 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 5 May 2026 22:01:06 +0400 Subject: [PATCH 04/10] work --- .../src/execution/operators/parquet_writer.rs | 37 ++++++++++++++++++- native/core/src/parquet/parquet_support.rs | 4 +- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 284095e113..fe51dd4a8b 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -34,7 +34,7 @@ use std::io::Cursor; use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::{is_hdfs_scheme, is_s3_scheme}; #[cfg(feature = "hdfs-opendal")] -use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; +use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs, create_s3_operator}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -336,7 +336,42 @@ impl ParquetWriterExec { )) } } else if is_s3_scheme(&url) { + let (_object_store_url, object_store_path) = prepare_object_store_with_configs( + _runtime_env, + output_file_path.to_string(), + object_store_options, + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to prepare object store for '{}': {}", + output_file_path, e + )) + })?; + // For remote storage (HDFS, S3), write to an in-memory buffer + let buffer = Vec::new(); + let cursor = Cursor::new(buffer); + let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to create S3 writer: {}", e)) + })?; + + // Create S3 operator with configuration options using the helper function + let op = create_s3_operator(&url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create S3 operator for '{}': {}", + output_file_path, e + )) + })?; + + // HDFS writer will be created lazily on first write + // Use the path from prepare_object_store_with_configs + Ok(ParquetWriter::Remote( + arrow_parquet_buffer_writer, + None, + op, + object_store_path.to_string(), + )) } else if output_file_path.starts_with("file://") || output_file_path.starts_with("file:") || !output_file_path.contains("://") diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 7ee2c2de33..51a68cf271 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -409,11 +409,11 @@ pub(crate) fn create_hdfs_operator(url: &Url) -> Result Result { +pub(crate) fn create_s3_operator(_url: &Url) -> Result { let builder = opendal::services::S3::default(); opendal::Operator::new(builder) .map_err(|error| object_store::Error::Generic { - store: "hdfs-opendal", + store: "s3-opendal", source: error.into(), }) .map(|op| op.finish()) From 2b2acb1ebfe540292881a60814c18381c03642df Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 31 May 2026 19:29:39 +0400 Subject: [PATCH 05/10] Complete native s3 write draft feature --- .../src/execution/operators/parquet_writer.rs | 607 +++++++++--------- native/core/src/parquet/parquet_support.rs | 15 - 2 files changed, 310 insertions(+), 312 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index fe51dd4a8b..4edd5617d3 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,27 +17,14 @@ //! Parquet writer operator for writing RecordBatches to Parquet files -use std::{ - any::Any, - collections::HashMap, - fmt, - fmt::{Debug, Formatter}, - fs::File, - sync::Arc, -}; - -#[cfg(feature = "hdfs-opendal")] -use opendal::Operator; -#[cfg(feature = "hdfs-opendal")] -use std::io::Cursor; - use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::{is_hdfs_scheme, is_s3_scheme}; +use crate::parquet::parquet_support::is_hdfs_scheme; #[cfg(feature = "hdfs-opendal")] -use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs, create_s3_operator}; +use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, @@ -51,141 +38,325 @@ use datafusion::{ }, }; use futures::TryStreamExt; +#[cfg(feature = "hdfs-opendal")] +use opendal::Operator; +use parquet::errors::ParquetError; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; +use std::fs::create_dir_all; +#[cfg(feature = "hdfs-opendal")] +use std::io::Cursor; +use std::path::Path; +use std::{ + any::Any, + collections::HashMap, + fmt, + fmt::{Debug, Formatter}, + fs::File, + sync::Arc, +}; use url::Url; -/// Enum representing different types of Arrow writers based on storage backend -enum ParquetWriter { - /// Writer for local file system - LocalFile(ArrowWriter), - /// Writer for HDFS or other remote storage (writes to in-memory buffer) - /// Contains the arrow writer, HDFS operator, and destination path - /// an Arrow writer writes to in-memory buffer the data converted to Parquet format - /// The opendal::Writer is created lazily on first write - #[cfg(feature = "hdfs-opendal")] - Remote( - ArrowWriter>>, - Option, - Operator, - String, - ), +// A trait abstracting over different Parquet write targets (local filesystem, HDFS, S3, etc.) +#[async_trait] +trait ParquetWriter: Send { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError>; + async fn close(self: Box) -> Result<(), ParquetError>; } -impl ParquetWriter { - /// Write a RecordBatch to the underlying writer - async fn write( - &mut self, - batch: &RecordBatch, - ) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => writer.write(batch), - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - hdfs_writer_opt, - op, - output_path, - ) => { - // Write batch to in-memory buffer - arrow_parquet_buffer_writer.write(batch)?; - - // Flush and get the current buffer content - arrow_parquet_buffer_writer.flush()?; - let cursor = arrow_parquet_buffer_writer.inner_mut(); - let current_data = cursor.get_ref().clone(); - - // Create HDFS writer lazily on first write - if hdfs_writer_opt.is_none() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - *hdfs_writer_opt = Some(writer); - } - - // Write the accumulated data to HDFS - if let Some(hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(current_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write batch to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - } +// A `ParquetWriter` implementation that writes directly to a local file. +struct LocalFileWriter { + writer: ArrowWriter, +} - // Clear the buffer after upload - cursor.get_mut().clear(); - cursor.set_position(0); +impl LocalFileWriter { + fn try_new(path: &str, schema: SchemaRef, props: WriterProperties) -> Result { + let local_path = path + .strip_prefix("file://") + .or_else(|| path.strip_prefix("file:")) + .unwrap_or(path); - Ok(()) - } + let output_dir = Path::new(local_path).parent().ok_or_else(|| { + DataFusionError::Execution(format!( + "Failed to extract parent directory from path '{local_path}'" + )) + })?; + + create_dir_all(output_dir).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create output directory '{}': {e}", + output_dir.display() + )) + })?; + + let file = File::create(local_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to create output file '{local_path}': {e}")) + })?; + + let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create local writer: {e}")) + })?; + + Ok(Self { writer }) + } +} + +#[async_trait] +impl ParquetWriter for LocalFileWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.writer.write(batch) + } + + async fn close(mut self: Box) -> Result<(), ParquetError> { + self.writer.close()?; + Ok(()) + } +} + +// A `ParquetWriter` implementation that streams Parquet data to remote object storage +// (HDFS, S3, etc.) via the OpenDAL abstraction layer. +struct OpendalWriter { + arrow_writer: ArrowWriter>>, + opendal_writer: Option, + operator: Operator, + path: String, +} + +impl OpendalWriter { + fn try_new( + operator: Operator, + path: String, + schema: SchemaRef, + props: WriterProperties, + ) -> Result { + let cursor = Cursor::new(Vec::new()); + let arrow_writer = ArrowWriter::try_new(cursor, schema, Some(props)).map_err(|e| { + DataFusionError::Execution(format!("Failed to create OpenDAL arrow writer: {e}")) + })?; + Ok(Self { + arrow_writer, + opendal_writer: None, + operator, + path, + }) + } +} + +#[async_trait] +impl ParquetWriter for OpendalWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { + self.arrow_writer.write(batch)?; + self.arrow_writer.flush()?; + + let cursor = self.arrow_writer.inner_mut(); + let data = cursor.get_ref().clone(); + + if data.is_empty() { + return Ok(()); + } + + if self.opendal_writer.is_none() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if let Some(w) = &mut self.opendal_writer { + w.write(data).await.map_err(|e| { + ParquetError::External(format!("Failed to write to '{}': {e}", self.path).into()) + })?; } + + let cursor = self.arrow_writer.inner_mut(); + cursor.get_mut().clear(); + cursor.set_position(0); + + Ok(()) } - /// Close the writer and finalize the file - async fn close(self) -> std::result::Result<(), parquet::errors::ParquetError> { - match self { - ParquetWriter::LocalFile(writer) => { - writer.close()?; - Ok(()) - } - #[cfg(feature = "hdfs-opendal")] - ParquetWriter::Remote( - arrow_parquet_buffer_writer, - mut hdfs_writer_opt, - op, - output_path, - ) => { - // Close the arrow writer to finalize parquet format - let cursor = arrow_parquet_buffer_writer.into_inner()?; - let final_data = cursor.into_inner(); - - // Create HDFS writer if not already created - if hdfs_writer_opt.is_none() && !final_data.is_empty() { - let writer = op.writer(output_path.as_str()).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to create HDFS writer for '{}': {}", output_path, e) - .into(), + async fn close(mut self: Box) -> Result<(), ParquetError> { + let cursor = self.arrow_writer.into_inner()?; + let final_data = cursor.into_inner(); + + if self.opendal_writer.is_none() && !final_data.is_empty() { + let writer = self.operator.writer(&self.path).await.map_err(|e| { + ParquetError::External( + format!("Failed to create OpenDAL writer for '{}': {e}", self.path).into(), + ) + })?; + self.opendal_writer = Some(writer); + } + + if !final_data.is_empty() { + if let Some(mut writer) = self.opendal_writer { + writer.write(final_data).await.map_err(|e| { + ParquetError::External( + format!( + "Failed to write final data to HDFS file '{}': {}", + self.path, e ) - })?; - hdfs_writer_opt = Some(writer); - } - - // Write any remaining data - if !final_data.is_empty() { - if let Some(mut hdfs_writer) = hdfs_writer_opt { - hdfs_writer.write(final_data).await.map_err(|e| { - parquet::errors::ParquetError::External( - format!( - "Failed to write final data to HDFS file '{}': {}", - output_path, e - ) - .into(), - ) - })?; - - // Close the HDFS writer - hdfs_writer.close().await.map_err(|e| { - parquet::errors::ParquetError::External( - format!("Failed to close HDFS writer for '{}': {}", output_path, e) - .into(), - ) - })?; - } - } - - Ok(()) + .into(), + ) + })?; + + writer.close().await.map_err(|e| { + ParquetError::External( + format!("Failed to close HDFS writer for '{}': {}", self.path, e).into(), + ) + })?; } } + + Ok(()) + } +} + +// A factory that inspects the destination URL and produces the appropriate +// `ParquetWriter` implementation for the target storage backend. +struct StorageWriterFactory; + +impl StorageWriterFactory { + // Selects and constructs a `ParquetWriter` based on the URL scheme of `output_path`. + // Supported backends: + // - **HDFS** – detected via `is_hdfs_scheme`; backed by `OpendalWriter`. + // - **S3 / S3A** – detected by scheme; backed by `OpendalWriter`. + // - **Local filesystem** – `file://`, `file:`, or a bare path; backed by `LocalFileWriter`. + fn create( + output_path: &str, + schema: SchemaRef, + props: WriterProperties, + runtime_env: Arc, + object_store_options: &HashMap, + ) -> Result> { + let (_, object_store_path) = prepare_object_store_with_configs( + runtime_env, + output_path.to_string(), + &HashMap::new(), + ) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to prepare object store for '{output_path}': {e}" + )) + })?; + + let url = Url::parse(output_path).map_err(|e| { + DataFusionError::Execution(format!("Failed to parse URL '{output_path}': {e}")) + })?; + + if is_hdfs_scheme(&url, object_store_options) { + Self::create_hdfs_parquet_writer(schema, props, &object_store_path.to_string(), &url) + } else if Self::is_s3_scheme(&url) { + Self::create_s3_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + object_store_options, + ) + } else if Self::is_local_path(output_path) { + Ok(Box::new(LocalFileWriter::try_new( + output_path, + schema, + props, + )?)) + } else { + Err(DataFusionError::Execution(format!( + "Unsupported storage scheme in path: {output_path}" + ))) + } + } + + fn create_hdfs_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + ) -> Result> { + let url_str = url.as_str(); + let op = create_hdfs_operator(url).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create HDFS operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn create_s3_parquet_writer( + schema: SchemaRef, + props: WriterProperties, + object_store_path: &String, + url: &Url, + object_store_options: &HashMap, + ) -> Result> { + let url_str = url.as_str(); + let access_key = object_store_options.get("fs.s3a.access.key"); + if access_key.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.access.key".to_string(), + )); + } + let secret_key = object_store_options.get("fs.s3a.secret.key"); + if secret_key.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.secret.key".to_string(), + )); + } + let endpoint = object_store_options.get("fs.s3a.endpoint"); + if endpoint.is_none() { + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint".to_string(), + )); + } + let region = object_store_options.get("fs.s3a.endpoint.region"); + if region.is_none() { + //todo try extract region from fs.s3a.endpoint + return Err(DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.endpoint.region".to_string(), + )); + } + let bucket_name = url.host_str().unwrap(); + let builder = opendal::services::S3::default() + .endpoint(endpoint.unwrap()) + .secret_access_key(secret_key.unwrap()) + .access_key_id(access_key.unwrap()) + .region(region.unwrap()) + .bucket(bucket_name); + let op = Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "s3-opendal", + source: error.into(), + }) + .map(|op| op.finish()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to create S3 operator for '{url_str}': {e}" + )) + })?; + Ok(Box::new(OpendalWriter::try_new( + op, + object_store_path.into(), + schema, + props, + )?)) + } + + fn is_local_path(path: &str) -> bool { + path.starts_with("file://") || path.starts_with("file:") || !path.contains("://") + } + + fn is_s3_scheme(url: &Url) -> bool { + url.scheme() == "s3a" || url.scheme() == "s3" } } @@ -263,165 +434,6 @@ impl ParquetWriterExec { CompressionCodec::Snappy => Ok(Compression::SNAPPY), } } - - /// Create an Arrow writer based on the storage scheme - /// - /// # Arguments - /// * `output_file_path` - The full path to the output file - /// * `schema` - The Arrow schema for the Parquet file - /// * `props` - Writer properties including compression - /// * `runtime_env` - Runtime environment for object store registration - /// * `object_store_options` - Configuration options for object store - /// - /// # Returns - /// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme - /// * `Err(DataFusionError)` - If writer creation fails - fn create_arrow_writer( - output_file_path: &str, - schema: SchemaRef, - props: WriterProperties, - _runtime_env: Arc, - object_store_options: &HashMap, - ) -> Result { - // Parse URL and match on storage scheme directly - let url = Url::parse(output_file_path).map_err(|e| { - DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e)) - })?; - - if is_hdfs_scheme(&url, object_store_options) { - #[cfg(feature = "hdfs-opendal")] - { - // Use prepare_object_store_with_configs to create and register the object store - let (_object_store_url, object_store_path) = prepare_object_store_with_configs( - _runtime_env, - output_file_path.to_string(), - object_store_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to prepare object store for '{}': {}", - output_file_path, e - )) - })?; - - // For remote storage (HDFS, S3), write to an in-memory buffer - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e)) - })?; - - // Create HDFS operator with configuration options using the helper function - let op = create_hdfs_operator(&url).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create HDFS operator for '{}': {}", - output_file_path, e - )) - })?; - - // HDFS writer will be created lazily on first write - // Use the path from prepare_object_store_with_configs - Ok(ParquetWriter::Remote( - arrow_parquet_buffer_writer, - None, - op, - object_store_path.to_string(), - )) - } - #[cfg(not(feature = "hdfs-opendal"))] - { - Err(DataFusionError::Execution( - "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), - )) - } - } else if is_s3_scheme(&url) { - let (_object_store_url, object_store_path) = prepare_object_store_with_configs( - _runtime_env, - output_file_path.to_string(), - object_store_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to prepare object store for '{}': {}", - output_file_path, e - )) - })?; - - // For remote storage (HDFS, S3), write to an in-memory buffer - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props)) - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create S3 writer: {}", e)) - })?; - - // Create S3 operator with configuration options using the helper function - let op = create_s3_operator(&url).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create S3 operator for '{}': {}", - output_file_path, e - )) - })?; - - // HDFS writer will be created lazily on first write - // Use the path from prepare_object_store_with_configs - Ok(ParquetWriter::Remote( - arrow_parquet_buffer_writer, - None, - op, - object_store_path.to_string(), - )) - } else if output_file_path.starts_with("file://") - || output_file_path.starts_with("file:") - || !output_file_path.contains("://") - { - // Local file system - { - // For a local file system, write directly to file - // Strip file:// or file: prefix if present - let local_path = output_file_path - .strip_prefix("file://") - .or_else(|| output_file_path.strip_prefix("file:")) - .unwrap_or(output_file_path); - - // Extract the parent directory from the file path - let output_dir = std::path::Path::new(local_path).parent().ok_or_else(|| { - DataFusionError::Execution(format!( - "Failed to extract parent directory from path '{}'", - local_path - )) - })?; - - // Create the parent directory if it doesn't exist - std::fs::create_dir_all(output_dir).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output directory '{}': {}", - output_dir.display(), - e - )) - })?; - - let file = File::create(local_path).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create output file '{}': {}", - local_path, e - )) - })?; - - let writer = ArrowWriter::try_new(file, schema, Some(props)).map_err(|e| { - DataFusionError::Execution(format!("Failed to create local file writer: {}", e)) - })?; - Ok(ParquetWriter::LocalFile(writer)) - } - } else { - // Unsupported storage scheme - Err(DataFusionError::Execution(format!( - "Unsupported storage scheme in path: {}", - output_file_path - ))) - } - } } impl DisplayAs for ParquetWriterExec { @@ -535,7 +547,8 @@ impl ExecutionPlan for ParquetWriterExec { .build(); let object_store_options = self.object_store_options.clone(); - let mut writer = Self::create_arrow_writer( + + let mut writer = StorageWriterFactory::create( &part_file, Arc::clone(&output_schema), props, @@ -570,7 +583,7 @@ impl ExecutionPlan for ParquetWriterExec { batch }; - writer.write(&renamed_batch).await.map_err(|e| { + writer.write_batch(&renamed_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch: {}", e)) })?; } @@ -792,7 +805,7 @@ mod tests { let full_output_path = format!("hdfs://namenode:9000{}", output_path); let session_ctx = datafusion::prelude::SessionContext::new(); let runtime_env = session_ctx.runtime_env(); - let mut writer = ParquetWriterExec::create_arrow_writer( + let mut writer = StorageWriterFactory::create( &full_output_path, create_test_record_batch(1)?.schema(), props, @@ -804,7 +817,7 @@ mod tests { for i in 1..=5 { let record_batch = create_test_record_batch(i)?; - writer.write(&record_batch).await.map_err(|e| { + writer.write_batch(&record_batch).await.map_err(|e| { DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) })?; diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 9b0fb40f60..06abe5b539 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -427,11 +427,6 @@ pub fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) } } -pub fn is_s3_scheme(url: &Url) -> bool { - let scheme = url.scheme(); - scheme == "s3a" -} - // Creates an HDFS object store from a URL using the native HDFS implementation #[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))] fn create_hdfs_object_store( @@ -464,16 +459,6 @@ pub(crate) fn create_hdfs_operator(url: &Url) -> Result Result { - let builder = opendal::services::S3::default(); - opendal::Operator::new(builder) - .map_err(|error| object_store::Error::Generic { - store: "s3-opendal", - source: error.into(), - }) - .map(|op| op.finish()) -} - // Creates an HDFS object store from a URL using OpenDAL #[cfg(feature = "hdfs-opendal")] pub(crate) fn create_hdfs_object_store( From 981a1f5848640883ac8497b39b7a2bd2f9ac8423 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 7 Jun 2026 16:49:32 +0400 Subject: [PATCH 06/10] refactoring --- .../src/execution/operators/parquet_writer.rs | 117 +++++++++++++----- 1 file changed, 87 insertions(+), 30 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 4edd5617d3..35ccfe1d9e 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -292,6 +292,8 @@ impl StorageWriterFactory { )?)) } + const DEFAULT_REGION: &str = "us-east-1"; + fn create_s3_parquet_writer( schema: SchemaRef, props: WriterProperties, @@ -300,37 +302,37 @@ impl StorageWriterFactory { object_store_options: &HashMap, ) -> Result> { let url_str = url.as_str(); - let access_key = object_store_options.get("fs.s3a.access.key"); - if access_key.is_none() { - return Err(DataFusionError::Execution( - "Missing required S3 access key: fs.s3a.access.key".to_string(), - )); - } - let secret_key = object_store_options.get("fs.s3a.secret.key"); - if secret_key.is_none() { - return Err(DataFusionError::Execution( - "Missing required S3 access key: fs.s3a.secret.key".to_string(), - )); - } - let endpoint = object_store_options.get("fs.s3a.endpoint"); - if endpoint.is_none() { - return Err(DataFusionError::Execution( + let access_key = object_store_options + .get("fs.s3a.access.key") + .ok_or_else(|| { + DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.access.key".to_string(), + ) + })?; + let secret_key = object_store_options + .get("fs.s3a.secret.key") + .ok_or_else(|| { + DataFusionError::Execution( + "Missing required S3 access key: fs.s3a.secret.key".to_string(), + ) + })?; + let endpoint = object_store_options.get("fs.s3a.endpoint").ok_or_else(|| { + DataFusionError::Execution( "Missing required S3 access key: fs.s3a.endpoint".to_string(), - )); - } - let region = object_store_options.get("fs.s3a.endpoint.region"); - if region.is_none() { - //todo try extract region from fs.s3a.endpoint - return Err(DataFusionError::Execution( - "Missing required S3 access key: fs.s3a.endpoint.region".to_string(), - )); - } - let bucket_name = url.host_str().unwrap(); + ) + })?; + let region = object_store_options + .get("fs.s3a.endpoint.region") + .map(|s| s.as_str()) + .unwrap_or(Self::DEFAULT_REGION); + let bucket_name = url.host_str().ok_or_else(|| { + DataFusionError::Execution(format!("Missing bucket name in S3 URL: {}", url_str)) + })?; let builder = opendal::services::S3::default() - .endpoint(endpoint.unwrap()) - .secret_access_key(secret_key.unwrap()) - .access_key_id(access_key.unwrap()) - .region(region.unwrap()) + .endpoint(endpoint) + .secret_access_key(secret_key) + .access_key_id(access_key) + .region(region) .bucket(bucket_name); let op = Operator::new(builder) .map_err(|error| object_store::Error::Generic { @@ -356,7 +358,7 @@ impl StorageWriterFactory { } fn is_s3_scheme(url: &Url) -> bool { - url.scheme() == "s3a" || url.scheme() == "s3" + url.scheme() == "s3a" } } @@ -899,4 +901,59 @@ mod tests { Ok(()) } + + #[tokio::test] + #[ignore = "This test requires a running S3 cluster"] + async fn test_write_to_s3() -> Result<()> { + // Configure output path + let output_path = "s3a://comet/test_parquet_write"; + + // Configure writer properties + let props = WriterProperties::builder() + .set_compression(Compression::UNCOMPRESSED) + .build(); + + let session_ctx = datafusion::prelude::SessionContext::new(); + let runtime_env = session_ctx.runtime_env(); + + let mut object_store_options: HashMap = HashMap::new(); + object_store_options.insert("fs.s3a.access.key".to_string(), "admin".to_string()); + object_store_options.insert("fs.s3a.secret.key".to_string(), "adminsecretkey".to_string()); + object_store_options.insert("fs.s3a.endpoint".to_string(), "http://localhost:9000".to_string()); + + let mut writer = StorageWriterFactory::create( + &output_path, + create_test_record_batch(1)?.schema(), + props, + runtime_env, + &object_store_options, + )?; + + // Write 5 batches in a loop + for i in 1..=5 { + let record_batch = create_test_record_batch(i)?; + + writer.write_batch(&record_batch).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch {}: {}", i, e)) + })?; + + println!( + "Successfully wrote batch {} (1000 rows) using ParquetWriter", + i + ); + } + + // Close the writer + writer + .close() + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {}", e)))?; + + println!( + "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to HDFS at {}", + output_path + ); + + Ok(()) + } } From b487f91441094847108ddd8b8aa24b443c5078bb Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 7 Jun 2026 16:50:40 +0400 Subject: [PATCH 07/10] fmt --- native/core/src/execution/operators/parquet_writer.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 35ccfe1d9e..8da2b3d6f3 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -918,8 +918,14 @@ mod tests { let mut object_store_options: HashMap = HashMap::new(); object_store_options.insert("fs.s3a.access.key".to_string(), "admin".to_string()); - object_store_options.insert("fs.s3a.secret.key".to_string(), "adminsecretkey".to_string()); - object_store_options.insert("fs.s3a.endpoint".to_string(), "http://localhost:9000".to_string()); + object_store_options.insert( + "fs.s3a.secret.key".to_string(), + "adminsecretkey".to_string(), + ); + object_store_options.insert( + "fs.s3a.endpoint".to_string(), + "http://localhost:9000".to_string(), + ); let mut writer = StorageWriterFactory::create( &output_path, From 0d074e3fff065230b519e08334d2b0bcfabac0c3 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 7 Jun 2026 18:02:48 +0400 Subject: [PATCH 08/10] refactoring --- native/core/Cargo.toml | 5 +- .../src/execution/operators/parquet_writer.rs | 71 ++++++++++++------- native/core/src/lib.rs | 1 + 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 0d3b084ba3..9d4a3b2524 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -72,7 +72,7 @@ datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"] } object_store_opendal = { version = "0.56.0", optional = true } hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} -opendal = { version = "0.56.0", optional = true, features = ["services-hdfs"] } +opendal = { version = "0.56.0", optional = true, features = ["services-hdfs", "services-s3"] } iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } reqsign-core = { workspace = true } @@ -98,7 +98,8 @@ datafusion-functions-nested = { version = "53.1.0" } backtrace = ["datafusion/backtrace"] default = ["hdfs-opendal"] hdfs = ["datafusion-comet-objectstore-hdfs"] -hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] +hdfs-opendal = ["dep:opendal", "dep:object_store_opendal", "dep:hdfs-sys", "opendal/services-hdfs"] +s3-opendal = ["dep:opendal", "dep:object_store_opendal", "opendal/services-s3"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] # exclude optional packages from cargo machete verifications diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8da2b3d6f3..8adf1db937 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -18,9 +18,9 @@ //! Parquet writer operator for writing RecordBatches to Parquet files use crate::execution::shuffle::CompressionCodec; -use crate::parquet::parquet_support::is_hdfs_scheme; #[cfg(feature = "hdfs-opendal")] -use crate::parquet::parquet_support::{create_hdfs_operator, prepare_object_store_with_configs}; +use crate::parquet::parquet_support::create_hdfs_operator; +use crate::parquet::parquet_support::{is_hdfs_scheme, prepare_object_store_with_configs}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -38,7 +38,7 @@ use datafusion::{ }, }; use futures::TryStreamExt; -#[cfg(feature = "hdfs-opendal")] +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] use opendal::Operator; use parquet::errors::ParquetError; use parquet::{ @@ -47,7 +47,7 @@ use parquet::{ file::properties::WriterProperties, }; use std::fs::create_dir_all; -#[cfg(feature = "hdfs-opendal")] +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] use std::io::Cursor; use std::path::Path; use std::{ @@ -118,6 +118,7 @@ impl ParquetWriter for LocalFileWriter { // A `ParquetWriter` implementation that streams Parquet data to remote object storage // (HDFS, S3, etc.) via the OpenDAL abstraction layer. +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] struct OpendalWriter { arrow_writer: ArrowWriter>>, opendal_writer: Option, @@ -125,6 +126,7 @@ struct OpendalWriter { path: String, } +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] impl OpendalWriter { fn try_new( operator: Operator, @@ -146,6 +148,7 @@ impl OpendalWriter { } #[async_trait] +#[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] impl ParquetWriter for OpendalWriter { async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { self.arrow_writer.write(batch)?; @@ -197,17 +200,13 @@ impl ParquetWriter for OpendalWriter { if let Some(mut writer) = self.opendal_writer { writer.write(final_data).await.map_err(|e| { ParquetError::External( - format!( - "Failed to write final data to HDFS file '{}': {}", - self.path, e - ) - .into(), + format!("Failed to write final data to file '{}': {}", self.path, e).into(), ) })?; writer.close().await.map_err(|e| { ParquetError::External( - format!("Failed to close HDFS writer for '{}': {}", self.path, e).into(), + format!("Failed to close writer for '{}': {}", self.path, e).into(), ) })?; } @@ -225,7 +224,7 @@ impl StorageWriterFactory { // Selects and constructs a `ParquetWriter` based on the URL scheme of `output_path`. // Supported backends: // - **HDFS** – detected via `is_hdfs_scheme`; backed by `OpendalWriter`. - // - **S3 / S3A** – detected by scheme; backed by `OpendalWriter`. + // - **S3A** – detected by scheme; backed by `OpendalWriter`. // - **Local filesystem** – `file://`, `file:`, or a bare path; backed by `LocalFileWriter`. fn create( output_path: &str, @@ -237,7 +236,7 @@ impl StorageWriterFactory { let (_, object_store_path) = prepare_object_store_with_configs( runtime_env, output_path.to_string(), - &HashMap::new(), + object_store_options, ) .map_err(|e| { DataFusionError::Execution(format!( @@ -250,15 +249,38 @@ impl StorageWriterFactory { })?; if is_hdfs_scheme(&url, object_store_options) { - Self::create_hdfs_parquet_writer(schema, props, &object_store_path.to_string(), &url) + #[cfg(feature = "hdfs-opendal")] + { + Self::create_hdfs_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + ) + } + #[cfg(not(feature = "hdfs-opendal"))] + { + Err(DataFusionError::Execution( + "HDFS support is not enabled. Rebuild with the 'hdfs-opendal' feature.".into(), + )) + } } else if Self::is_s3_scheme(&url) { - Self::create_s3_parquet_writer( - schema, - props, - &object_store_path.to_string(), - &url, - object_store_options, - ) + #[cfg(feature = "s3-opendal")] + { + Self::create_s3_parquet_writer( + schema, + props, + &object_store_path.to_string(), + &url, + object_store_options, + ) + } + #[cfg(not(feature = "s3-opendal"))] + { + Err(DataFusionError::Execution( + "S3 support is not enabled. Rebuild with the 's3-opendal' feature.".into(), + )) + } } else if Self::is_local_path(output_path) { Ok(Box::new(LocalFileWriter::try_new( output_path, @@ -272,6 +294,7 @@ impl StorageWriterFactory { } } + #[cfg(feature = "hdfs-opendal")] fn create_hdfs_parquet_writer( schema: SchemaRef, props: WriterProperties, @@ -292,8 +315,7 @@ impl StorageWriterFactory { )?)) } - const DEFAULT_REGION: &str = "us-east-1"; - + #[cfg(feature = "s3-opendal")] fn create_s3_parquet_writer( schema: SchemaRef, props: WriterProperties, @@ -324,7 +346,7 @@ impl StorageWriterFactory { let region = object_store_options .get("fs.s3a.endpoint.region") .map(|s| s.as_str()) - .unwrap_or(Self::DEFAULT_REGION); + .unwrap_or("us-east-1"); let bucket_name = url.host_str().ok_or_else(|| { DataFusionError::Execution(format!("Missing bucket name in S3 URL: {}", url_str)) })?; @@ -903,6 +925,7 @@ mod tests { } #[tokio::test] + #[cfg(feature = "s3-opendal")] #[ignore = "This test requires a running S3 cluster"] async fn test_write_to_s3() -> Result<()> { // Configure output path @@ -956,7 +979,7 @@ mod tests { .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {}", e)))?; println!( - "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to HDFS at {}", + "Successfully completed ParquetWriter streaming write of 5 batches (5000 total rows) to S3 at {}", output_path ); diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 19a2d774a0..64e5c7998c 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -149,6 +149,7 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled( "jemalloc" => cfg!(feature = "jemalloc"), "hdfs" => cfg!(feature = "hdfs"), "hdfs-opendal" => cfg!(feature = "hdfs-opendal"), + "s3-opendal" => cfg!(feature = "s3-opendal"), _ => false, // Unknown features return false }; From f66cbd43d14dc3bddfed4c8c38f197fca34dda33 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Wed, 17 Jun 2026 21:31:23 +0400 Subject: [PATCH 09/10] Add partition writing --- native/core/src/execution/operators/mod.rs | 2 + .../src/execution/operators/parquet_writer.rs | 109 ++++--- .../execution/operators/partition_writer.rs | 280 ++++++++++++++++++ native/core/src/execution/planner.rs | 1 + native/proto/src/proto/operator.proto | 1 + .../operator/CometDataWritingCommand.scala | 6 +- .../CometParquetPartitionWriteSuite.scala | 86 ++++++ .../parquet/CometParquetWriterSuite.scala | 66 ++++- 8 files changed, 510 insertions(+), 41 deletions(-) create mode 100644 native/core/src/execution/operators/partition_writer.rs create mode 100644 spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index d68252bd9b..04d8fe29f2 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -32,8 +32,10 @@ mod iceberg_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; mod csv_scan; +mod partition_writer; pub mod projection; mod scan; mod shuffle_scan; + pub use csv_scan::init_csv_datasource_exec; pub use shuffle_scan::ShuffleScanExec; diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 8adf1db937..9eeade077e 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,11 +17,12 @@ //! Parquet writer operator for writing RecordBatches to Parquet files +use crate::execution::operators::partition_writer::PartitionedWriter; use crate::execution::shuffle::CompressionCodec; #[cfg(feature = "hdfs-opendal")] use crate::parquet::parquet_support::create_hdfs_operator; use crate::parquet::parquet_support::{is_hdfs_scheme, prepare_object_store_with_configs}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::execution::runtime_env::RuntimeEnv; @@ -46,6 +47,7 @@ use parquet::{ basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; +use std::collections::HashSet; use std::fs::create_dir_all; #[cfg(any(feature = "hdfs-opendal", feature = "s3-opendal"))] use std::io::Cursor; @@ -62,7 +64,7 @@ use url::Url; // A trait abstracting over different Parquet write targets (local filesystem, HDFS, S3, etc.) #[async_trait] -trait ParquetWriter: Send { +pub(crate) trait ParquetWriter: Send { async fn write_batch(&mut self, batch: &RecordBatch) -> Result<(), ParquetError>; async fn close(self: Box) -> Result<(), ParquetError>; } @@ -218,7 +220,7 @@ impl ParquetWriter for OpendalWriter { // A factory that inspects the destination URL and produces the appropriate // `ParquetWriter` implementation for the target storage backend. -struct StorageWriterFactory; +pub(crate) struct StorageWriterFactory; impl StorageWriterFactory { // Selects and constructs a `ParquetWriter` based on the URL scheme of `output_path`. @@ -226,7 +228,7 @@ impl StorageWriterFactory { // - **HDFS** – detected via `is_hdfs_scheme`; backed by `OpendalWriter`. // - **S3A** – detected by scheme; backed by `OpendalWriter`. // - **Local filesystem** – `file://`, `file:`, or a bare path; backed by `LocalFileWriter`. - fn create( + pub(crate) fn create( output_path: &str, schema: SchemaRef, props: WriterProperties, @@ -409,6 +411,7 @@ pub struct ParquetWriterExec { metrics: ExecutionPlanMetricsSet, /// Cache for plan properties cache: Arc, + partition_columns: Vec, } impl ParquetWriterExec { @@ -424,6 +427,7 @@ impl ParquetWriterExec { partition_id: i32, column_names: Vec, object_store_options: HashMap, + partition_columns: Vec, ) -> Result { // Preserve the input's partitioning so each partition writes its own file let input_partitioning = input.output_partitioning().clone(); @@ -447,6 +451,7 @@ impl ParquetWriterExec { object_store_options, metrics: ExecutionPlanMetricsSet::new(), cache, + partition_columns, }) } @@ -516,6 +521,7 @@ impl ExecutionPlan for ParquetWriterExec { self.partition_id, self.column_names.clone(), self.object_store_options.clone(), + self.partition_columns.clone(), )?)), _ => Err(DataFusionError::Internal( "ParquetWriterExec requires exactly one child".to_string(), @@ -543,7 +549,13 @@ impl ExecutionPlan for ParquetWriterExec { let compression = self.compression_to_parquet()?; let column_names = self.column_names.clone(); - assert_eq!(input_schema.fields().len(), column_names.len()); + if input_schema.fields().len() != column_names.len() { + return Err(DataFusionError::Internal(format!( + "ParquetWriterExec: column_names length ({}) does not match input schema fields ({})", + column_names.len(), + input_schema.fields().len() + ))); + } // Replace the generic column names (col_0, col_1, etc.) with the actual names let fields: Vec<_> = input_schema @@ -552,32 +564,43 @@ impl ExecutionPlan for ParquetWriterExec { .enumerate() .map(|(i, field)| Arc::new(field.as_ref().clone().with_name(&column_names[i]))) .collect(); - let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); - - // Generate part file name for this partition - // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename - let part_file = if let Some(attempt_id) = task_attempt_id { - format!( - "{}/part-{:05}-{:05}.parquet", - work_dir, self.partition_id, attempt_id - ) - } else { - format!("{}/part-{:05}.parquet", work_dir, self.partition_id) - }; - // Configure writer properties - let props = WriterProperties::builder() - .set_compression(compression) - .build(); + let output_schema = Arc::new(Schema::new(fields)); let object_store_options = self.object_store_options.clone(); - let mut writer = StorageWriterFactory::create( - &part_file, - Arc::clone(&output_schema), - props, + // Resolve partition column indices, preserving the declared order. + let mut partition_col_indices = Vec::with_capacity(self.partition_columns.len()); + for name in &self.partition_columns { + let idx = column_names.iter().position(|c| c == name).ok_or_else(|| { + DataFusionError::Execution(format!( + "Partition column '{name}' not found among output columns {column_names:?}" + )) + })?; + partition_col_indices.push(idx); + } + let part_set: HashSet = partition_col_indices.iter().copied().collect(); + let data_col_indices: Vec = (0..output_schema.fields().len()) + .filter(|i| !part_set.contains(i)) + .collect(); + let data_fields: Vec = data_col_indices + .iter() + .map(|&i| Arc::clone(&output_schema.fields()[i])) + .collect(); + + let data_schema = Arc::new(Schema::new(data_fields)); + + let mut writer = PartitionedWriter::new( + work_dir, + self.partition_id, + task_attempt_id, + Arc::clone(&data_schema), + partition_col_indices, + self.partition_columns.clone(), + data_col_indices, + compression, runtime_env, - &object_store_options, + object_store_options, )?; // Clone schema for use in async closure @@ -612,28 +635,31 @@ impl ExecutionPlan for ParquetWriterExec { })?; } - writer.close().await.map_err(|e| { + let written_paths = writer.close().await.map_err(|e| { DataFusionError::Execution(format!("Failed to close writer: {}", e)) })?; // Get file size - strip file:// prefix if present for local filesystem access - let local_path = part_file - .strip_prefix("file://") - .or_else(|| part_file.strip_prefix("file:")) - .unwrap_or(&part_file); - let file_size = std::fs::metadata(local_path) - .map(|m| m.len() as i64) - .unwrap_or(0); + let mut total_bytes = 0i64; + for path in &written_paths { + let local = strip_file_scheme(path); + total_bytes += std::fs::metadata(local) + .map(|m| m.len() as i64) + .unwrap_or(0); + } // Update metrics with write statistics - files_written.add(1); - bytes_written.add(file_size as usize); + files_written.add(written_paths.len()); + bytes_written.add(total_bytes as usize); rows_written.add(total_rows as usize); // Log metadata for debugging eprintln!( - "Wrote Parquet file: path={}, size={}, rows={}", - part_file, file_size, total_rows + "ParquetWriterExec wrote {} file(s), {} bytes, {} rows; paths={:?}", + written_paths.len(), + total_bytes, + total_rows, + written_paths ); // Return empty stream to indicate completion @@ -648,6 +674,12 @@ impl ExecutionPlan for ParquetWriterExec { } } +fn strip_file_scheme(path: &str) -> &str { + path.strip_prefix("file://") + .or_else(|| path.strip_prefix("file:")) + .unwrap_or(path) +} + #[cfg(test)] mod tests { use super::*; @@ -902,6 +934,7 @@ mod tests { 0, // partition_id column_names, HashMap::new(), // object_store_options + Vec::new(), )?; // Create a session context and execute the plan diff --git a/native/core/src/execution/operators/partition_writer.rs b/native/core/src/execution/operators/partition_writer.rs new file mode 100644 index 0000000000..1c5904f8ff --- /dev/null +++ b/native/core/src/execution/operators/partition_writer.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution::operators::parquet_writer::{ParquetWriter, StorageWriterFactory}; +use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt32Array}; +use arrow::datatypes::{DataType, SchemaRef}; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::runtime_env::RuntimeEnv; +use futures::future::try_join_all; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; +use std::collections::HashMap; +use std::sync::Arc; + +/// Placeholder directory name used for `null`/empty partition values. +/// Mirrors Spark's `ExternalCatalogUtils.DEFAULT_PARTITION_NAME`. +const DEFAULT_PARTITION_NAME: &str = "__HIVE_DEFAULT_PARTITION__"; + +pub(crate) struct PartitionedWriter { + /// Open writers keyed by their partition sub-directory (e.g. `a=1/b=2`). + writers: HashMap>, + work_dir: String, + partition_id: i32, + task_attempt_id: Option, + /// Schema of the data actually written to the files (partition columns removed). + data_schema: SchemaRef, + /// Indices (into the renamed output schema) of the partition columns. + partition_col_indices: Vec, + /// Names of the partition columns, in declared order. + partition_col_names: Vec, + /// Indices (into the renamed output schema) of the data columns. + data_col_indices: Vec, + compression: Compression, + runtime_env: Arc, + object_store_options: HashMap, +} + +// Characters that must be escaped, in addition to all control chars (< 0x20). +const NEEDS_ESCAPE: &[char] = &[ + '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u{007F}', '{', '[', ']', '^', +]; + +/// Escape a string so it is safe to use as a single path component of a +/// Hive-style partition directory (`col=value`). +/// +/// This mirrors Spark/Hive `ExternalCatalogUtils.escapePathName`: control +/// characters and a fixed set of special characters are percent-encoded as +/// `%XX` (upper-case hex). The result must match Spark exactly so that the +/// directory layout produced natively is readable by Spark and any catalog. +fn escape_path_name(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for c in value.chars() { + if (c as u32) < 0x20 || NEEDS_ESCAPE.contains(&c) { + // Percent-encode the byte value (chars here are always < 0x80). + out.push_str(&format!("%{:02X}", c as u32)); + } else { + out.push(c); + } + } + out +} + +impl PartitionedWriter { + #[allow(clippy::too_many_arguments)] + pub(super) fn new( + work_dir: String, + partition_id: i32, + task_attempt_id: Option, + data_schema: SchemaRef, + partition_col_indices: Vec, + partition_col_names: Vec, + data_col_indices: Vec, + compression: Compression, + runtime_env: Arc, + object_store_options: HashMap, + ) -> Result { + // `new` only assembles the struct; it performs no IO. All writer + // creation (including the non-partitioned single-file case) happens + // lazily in the async path so that the storage factory's internal + // `block_on` always runs on a blocking thread and never on a tokio + // worker. See `writer_for` / `ensure_default_writer`. + Ok(Self { + writers: HashMap::new(), + work_dir, + partition_id, + task_attempt_id, + data_schema, + partition_col_indices, + partition_col_names, + data_col_indices, + compression, + runtime_env, + object_store_options, + }) + } + + /// Build the absolute part-file path for a given partition sub-directory. + fn build_path(&self, subdir: &str) -> String { + let dir = if subdir.is_empty() { + self.work_dir.clone() + } else { + format!("{}/{}", self.work_dir, subdir) + }; + match self.task_attempt_id { + Some(attempt_id) => format!( + "{}/part-{:05}-{:05}.parquet", + dir, self.partition_id, attempt_id + ), + None => format!("{}/part-{:05}.parquet", dir, self.partition_id), + } + } + + /// Synchronously construct a writer from owned arguments. + /// + /// The storage factory may call `block_on` internally (e.g. for S3), so this + /// function MUST NOT run on a tokio worker thread. It is only ever invoked + /// from inside a `spawn_blocking` closure (see `writer_for`). + fn build_writer( + path: String, + schema: SchemaRef, + compression: Compression, + runtime_env: Arc, + object_store_options: HashMap, + ) -> Result> { + let props = WriterProperties::builder() + .set_compression(compression) + .build(); + StorageWriterFactory::create(&path, schema, props, runtime_env, &object_store_options) + } + + /// Return the writer for `subdir`, creating it on first use. + /// + /// All values needed by the factory are cloned into owned locals *before* + /// the `.await`, so no borrow of `self` is held across the suspension point. + /// This keeps the future `Send` (requiring only `PartitionedWriter: Send`) + /// instead of forcing `Sync`/`dyn ParquetWriter: Sync`. + async fn writer_for(&mut self, subdir: &str) -> Result<&mut Box> { + if !self.writers.contains_key(subdir) { + // Take everything we need by value before the await. + let path = self.build_path(subdir); + let schema = Arc::clone(&self.data_schema); + let compression = self.compression; + let runtime_env = Arc::clone(&self.runtime_env); + let object_store_options = self.object_store_options.clone(); + + // The factory may call `block_on` internally (S3), so run it on a + // blocking thread to avoid "Cannot start a runtime from within a + // runtime" on the tokio worker. + let writer = tokio::task::spawn_blocking(move || { + Self::build_writer(path, schema, compression, runtime_env, object_store_options) + }) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Writer creation task failed to join: {e}")) + })??; + + self.writers.insert(subdir.to_string(), writer); + } + Ok(self.writers.get_mut(subdir).unwrap()) + } + + /// Compute, for each row, the partition sub-directory string `col=val/...`. + /// Partition columns are cast to UTF-8 to obtain their string form; `null` + /// (or empty) values map to `__HIVE_DEFAULT_PARTITION__`. + fn partition_subdirs(&self, batch: &RecordBatch) -> Result> { + let num_rows = batch.num_rows(); + if self.partition_col_indices.is_empty() { + return Ok(vec![String::new(); num_rows]); + } + + // Cast each partition column to Utf8 once for the whole batch. + let mut casted: Vec = Vec::with_capacity(self.partition_col_indices.len()); + for &col_idx in &self.partition_col_indices { + let arr = arrow::compute::cast(batch.column(col_idx).as_ref(), &DataType::Utf8) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to cast partition column to string: {e}" + )) + })?; + casted.push(arr); + } + let casted: Vec<&StringArray> = casted + .iter() + .map(|a| { + a.as_any().downcast_ref::().ok_or_else(|| { + DataFusionError::Internal( + "Partition column did not cast to StringArray".to_string(), + ) + }) + }) + .collect::>()?; + + let mut subdirs = Vec::with_capacity(num_rows); + for row in 0..num_rows { + let mut subdir = String::new(); + for (k, name) in self.partition_col_names.iter().enumerate() { + if k > 0 { + subdir.push('/'); + } + subdir.push_str(&escape_path_name(name)); + subdir.push('='); + let arr = casted[k]; + if arr.is_null(row) || arr.value(row).is_empty() { + subdir.push_str(DEFAULT_PARTITION_NAME); + } else { + subdir.push_str(&escape_path_name(arr.value(row))); + } + } + subdirs.push(subdir); + } + Ok(subdirs) + } + + /// Split `batch` by partition key and append each group to its writer. + pub(crate) async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + let subdirs = self.partition_subdirs(batch)?; + + // Group row indices by their partition sub-directory. + let mut groups: HashMap> = HashMap::new(); + for (row, subdir) in subdirs.into_iter().enumerate() { + groups.entry(subdir).or_default().push(row as u32); + } + + for (subdir, rows) in groups { + // Project to data columns only (partition columns live in the path). + let indices = UInt32Array::from(rows); + let mut columns: Vec = Vec::with_capacity(self.data_col_indices.len()); + for &col_idx in &self.data_col_indices { + let taken = arrow::compute::take(batch.column(col_idx).as_ref(), &indices, None) + .map_err(|e| { + DataFusionError::Execution(format!("Failed to take partition rows: {e}")) + })?; + columns.push(taken); + } + let sub_batch = + RecordBatch::try_new(Arc::clone(&self.data_schema), columns).map_err(|e| { + DataFusionError::Execution(format!("Failed to build partition sub-batch: {e}")) + })?; + + let writer = self.writer_for(&subdir).await?; + writer + .write_batch(&sub_batch) + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to write batch: {e}")))?; + } + Ok(()) + } + + /// Close every open writer concurrently, returning the absolute paths of + /// all part-files that were actually created (with their partition + /// sub-directories). An empty input yields an empty Vec — no phantom files. + pub(crate) async fn close(self) -> Result> { + // Compute paths *before* consuming `self.writers` (build_path borrows &self). + let paths: Vec = self.writers.keys().map(|s| self.build_path(s)).collect(); + + let closes = self.writers.into_values().map(|w| w.close()); + try_join_all(closes) + .await + .map_err(|e| DataFusionError::Execution(format!("Failed to close writer: {e}")))?; + + Ok(paths) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c07a92d700..09cc744c8a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1682,6 +1682,7 @@ impl PhysicalPlanner { self.partition, writer.column_names.clone(), object_store_options, + writer.partition_columns.clone(), )?); Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 15265f1d86..0e19ff4a26 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -355,6 +355,7 @@ message ParquetWriter { // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in // the map. map object_store_options = 8; + repeated string partition_columns = 9; } enum AggregateMode { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index fde6f37196..62b8363120 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -68,8 +68,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec return Unsupported(Some("Bucketed writes are not supported")) } - if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { - return Unsupported(Some("Partitioned writes are not supported")) + if (cmd.staticPartitions.nonEmpty) { + return Unsupported(Some("Static partitions writes are not supported")) } val codec = parseCompressionCodec(cmd) @@ -145,6 +145,8 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec writerOpBuilder.putObjectStoreOptions(key, value) } + writerOpBuilder.addAllPartitionColumns(cmd.partitionColumns.map(_.name).asJava) + val writerOp = writerOpBuilder.build() val writerOperator = Operator diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala new file mode 100644 index 0000000000..fdabb96096 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import java.io.File + +import org.apache.spark.sql.{CometTestBase, SaveMode} + +import org.apache.comet.CometConf + +class CometParquetPartitionWriteSuite extends CometTestBase { + + private val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + + test("simple partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(2, 'b')") + sql(s"INSERT INTO $table VALUES(3, 'c')") + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(outputPath) + Seq((1, "a"), (2, "b"), (3, "c")).foreach { case (col1, col2) => + val row = spark.read + .parquet(s"$outputPath/col1=$col1") + .collect() + .headOption + assert(row.isDefined) + assert(row.get.getAs[String]("col2") == col2) + } + } + } + } + } + + test("default hive partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(null, 'a')") + sql(s"INSERT INTO $table VALUES(null, 'b')") + sql(s"INSERT INTO $table VALUES(null, 'c')") + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(outputPath) + val rows = spark.read + .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME") + .collect() + assert(rows.length == 3) + assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "b", "c").sorted) + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f6795b91a3..f9588b9e8c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -23,7 +23,7 @@ import java.io.File import scala.util.Random -import org.apache.spark.sql.{CometTestBase, DataFrame, Row} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -35,8 +35,72 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOpt class CometParquetWriterSuite extends CometTestBase { + private val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + import testImplicits._ + test("simple partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(2, 'b')") + sql(s"INSERT INTO $table VALUES(3, 'c')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + Seq((1, "a"), (2, "b"), (3, "c")).foreach { case (col1, col2) => + val rows = spark.read + .parquet(s"$outputPath/col1=$col1") + .collect() + assert(rows.length == 1) + assert(rows.head.getAs[String]("col2") == col2) + } + } + } + } + } + + test("default hive partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(null, 'a')") + sql(s"INSERT INTO $table VALUES(null, 'b')") + sql(s"INSERT INTO $table VALUES(null, 'c')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read + .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME") + .collect() + assert(rows.length == 3) + assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "b", "c").sorted) + } + } + } + } + test("basic parquet write") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath From 1d7d729923917dce1bfa649d4d5930105306263d Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Thu, 18 Jun 2026 14:36:50 +0400 Subject: [PATCH 10/10] more tests --- .../CometParquetPartitionWriteSuite.scala | 86 ------- .../parquet/CometParquetWriterSuite.scala | 213 ++++++++++++++++++ 2 files changed, 213 insertions(+), 86 deletions(-) delete mode 100644 spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala deleted file mode 100644 index fdabb96096..0000000000 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetPartitionWriteSuite.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet - -import java.io.File - -import org.apache.spark.sql.{CometTestBase, SaveMode} - -import org.apache.comet.CometConf - -class CometParquetPartitionWriteSuite extends CometTestBase { - - private val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" - - test("simple partition parquet write") { - withSQLConf( - CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", - CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { - withTempPath { dir => - val table = "test_write_partitions" - val outputPath = new File(dir, "output.parquet").getAbsolutePath - withTable(table) { - sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") - sql(s"INSERT INTO $table VALUES(1, 'a')") - sql(s"INSERT INTO $table VALUES(2, 'b')") - sql(s"INSERT INTO $table VALUES(3, 'c')") - sql(s"SELECT * FROM $table").write - .partitionBy("col1") - .mode(SaveMode.Overwrite) - .parquet(outputPath) - Seq((1, "a"), (2, "b"), (3, "c")).foreach { case (col1, col2) => - val row = spark.read - .parquet(s"$outputPath/col1=$col1") - .collect() - .headOption - assert(row.isDefined) - assert(row.get.getAs[String]("col2") == col2) - } - } - } - } - } - - test("default hive partition parquet write") { - withSQLConf( - CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", - CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { - withTempPath { dir => - val table = "test_write_partitions" - val outputPath = new File(dir, "output.parquet").getAbsolutePath - withTable(table) { - sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") - sql(s"INSERT INTO $table VALUES(null, 'a')") - sql(s"INSERT INTO $table VALUES(null, 'b')") - sql(s"INSERT INTO $table VALUES(null, 'c')") - sql(s"SELECT * FROM $table").write - .partitionBy("col1") - .mode(SaveMode.Overwrite) - .parquet(outputPath) - val rows = spark.read - .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME") - .collect() - assert(rows.length == 3) - assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "b", "c").sorted) - } - } - } - } -} diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index f9588b9e8c..fe2ad2fc3b 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -101,6 +101,219 @@ class CometParquetWriterSuite extends CometTestBase { } } + test("multiple partition columns parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING, col3 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'x', 'a')") + sql(s"INSERT INTO $table VALUES(1, 'y', 'b')") + sql(s"INSERT INTO $table VALUES(2, 'x', 'c')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1", "col2") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + Seq((1, "x", "a"), (1, "y", "b"), (2, "x", "c")).foreach { case (col1, col2, col3) => + val rows = spark.read + .parquet(s"$outputPath/col1=$col1/col2=$col2") + .collect() + assert(rows.length == 1) + assert(rows.head.getAs[String]("col3") == col3) + } + } + } + } + } + + test("multiple rows per partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(1, 'b')") + sql(s"INSERT INTO $table VALUES(1, 'c')") + sql(s"INSERT INTO $table VALUES(2, 'd')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read.parquet(s"$outputPath/col1=1").collect() + assert(rows.length == 3) + assert(rows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "b", "c")) + val rows2 = spark.read.parquet(s"$outputPath/col1=2").collect() + assert(rows2.length == 1) + assert(rows2.head.getAs[String]("col2") == "d") + } + } + } + } + + test("append mode partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(2, 'b')") + captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + val plan = captureWritePlan( + path => + sql("SELECT 1 AS col1, 'c' AS col2").write + .partitionBy("col1") + .mode(SaveMode.Append) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows1 = spark.read.parquet(s"$outputPath/col1=1").collect() + assert(rows1.length == 2) + assert(rows1.map(_.getAs[String]("col2")).toSeq.sorted == Seq("a", "c")) + } + } + } + } + + test("long type partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 BIGINT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(10000000000, 'a')") + sql(s"INSERT INTO $table VALUES(20000000000, 'b')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + Seq((10000000000L, "a"), (20000000000L, "b")).foreach { case (col1, col2) => + val rows = spark.read.parquet(s"$outputPath/col1=$col1").collect() + assert(rows.length == 1) + assert(rows.head.getAs[String]("col2") == col2) + } + } + } + } + } + + test("string partition with special characters parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 STRING, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES('a b', 'x')") + sql(s"INSERT INTO $table VALUES('a/b', 'y')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read.parquet(outputPath).collect() + assert(rows.length == 2) + assert( + rows.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2"))).toSeq.sorted == + Seq(("a b", "x"), ("a/b", "y")).sorted) + } + } + } + } + + test("empty dataframe partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table WHERE col1 IS NOT NULL").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val rows = spark.read.parquet(outputPath).collect() + assert(rows.isEmpty) + } + } + } + } + + test("mixed null and non-null partition parquet write") { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT.key -> "true") { + withTempPath { dir => + val table = "test_write_partitions" + val outputPath = new File(dir, "output.parquet").getAbsolutePath + withTable(table) { + sql(s"CREATE TABLE $table(col1 INT, col2 STRING) USING parquet") + sql(s"INSERT INTO $table VALUES(1, 'a')") + sql(s"INSERT INTO $table VALUES(null, 'b')") + sql(s"INSERT INTO $table VALUES(2, 'c')") + sql(s"INSERT INTO $table VALUES(null, 'd')") + val plan = captureWritePlan( + path => + sql(s"SELECT * FROM $table").write + .partitionBy("col1") + .mode(SaveMode.Overwrite) + .parquet(path), + outputPath) + assertHasCometNativeWriteExec(plan) + val nullRows = spark.read + .parquet(s"$outputPath/col1=$DEFAULT_PARTITION_NAME") + .collect() + assert(nullRows.length == 2) + assert(nullRows.map(_.getAs[String]("col2")).toSeq.sorted == Seq("b", "d")) + assert(spark.read.parquet(s"$outputPath/col1=1").collect().length == 1) + assert(spark.read.parquet(s"$outputPath/col1=2").collect().length == 1) + } + } + } + } + test("basic parquet write") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath