diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 422232f546..2c382468ba 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -287,6 +287,7 @@ jobs: org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite org.apache.comet.CometIcebergRewriteActionSuite + org.apache.comet.CometIcebergWriteActionSuite org.apache.comet.iceberg.IcebergReflectionSuite org.apache.comet.csv.CometCsvNativeReadSuite org.apache.comet.CometFuzzTestSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index d0a03eeb75..0a4120de1e 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -129,6 +129,7 @@ jobs: org.apache.comet.exec.CometNativeReaderSuite org.apache.comet.CometIcebergNativeSuite org.apache.comet.CometIcebergRewriteActionSuite + org.apache.comet.CometIcebergWriteActionSuite org.apache.comet.iceberg.IcebergReflectionSuite org.apache.comet.csv.CometCsvNativeReadSuite org.apache.comet.CometFuzzTestSuite diff --git a/docs/source/user-guide/latest/iceberg-writes.md b/docs/source/user-guide/latest/iceberg-writes.md new file mode 100644 index 0000000000..5918b21bc0 --- /dev/null +++ b/docs/source/user-guide/latest/iceberg-writes.md @@ -0,0 +1,107 @@ + + +# Accelerating Apache Iceberg V2 Writes using Comet + +## Overview + +Iceberg-java writes a V2 table through a single Spark operator that combines data-file writing, +metadata writing, committing, and validation, and it consumes one row at a time. Comet's query +results are columnar (Arrow), so feeding them to that operator means transposing every batch +back into rows — only for Parquet's own encoder to re-assemble the columns on the way to disk. + +Comet's Iceberg write support addresses this in two independent layers: + +1. **Split-operator plan.** Comet splits the data-file-writing step out into its own Spark + operator so it can accelerate just that part of the process — the scans, projects, sorts, and + exchanges that feed the write become visible to AQE and Comet's columnar rules. Metadata + writing, committing, and validation stay in the JVM exactly as iceberg-java does them. +2. **Native Parquet write.** With the split in place, Comet may write the Iceberg data files + using [iceberg-rust](https://github.com/apache/iceberg-rust) instead of iceberg-java, under + the circumstances described below. This keeps the data columnar end-to-end and avoids the row + transpose. + +Both layers are off by default, and both fall back to the stock iceberg-java writer whenever +they can't be applied safely. + +## Configuration + +Standard Comet + Iceberg setup (see [`iceberg.md`](iceberg.md)) plus two write-side toggles: + +``` +# Standard Comet / Iceberg wiring +spark.plugins=org.apache.spark.CometPlugin +spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.=org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog..type=hadoop # or hive / glue / rest / ... +spark.sql.catalog..warehouse=... + +# Comet write toggles (both off by default; enable independently) +spark.comet.write.iceberg.splitOperator.enabled=true # layer 1: split-operator plan +spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native Parquet write +``` + +The native write (layer 2) only engages when the split plan (layer 1) is also enabled. With +`splitOperator.enabled=false`, every write goes straight through iceberg-java. + +## Spark version compatibility + +The split-operator plan and the native write are supported on Spark 3.4, 3.5, and 4.0, with +identical coverage on all three: `INSERT INTO`, `INSERT OVERWRITE` (static and dynamic), and +copy-on-write `DELETE` / `UPDATE` / `MERGE`. + +Merge-on-read writes (Iceberg `WriteDelta`, used when a table is configured for merge-on-read) +are **not** intercepted — their per-row delta writer can't be made native yet — so they always +run on iceberg-java. + +## When Comet falls back to the iceberg-java writer + +If the split-operator plan is disabled (`splitOperator.enabled=false`), nothing is intercepted +and every write runs on iceberg-java. + +When the split plan is enabled but the native write can't reproduce iceberg-java's output, only +the file write falls back — the split plan still accelerates the data sub-query. Each native-write +gate below is checked at planning time, so Comet writes natively only when it can produce the same +`DataFile`s iceberg-java would. The **Source** column attributes each gap to the responsible +layer: **iceberg-rust** (the high-level Iceberg writer) or **parquet-rs** (the Rust Parquet +encoder it drives). + +| Table / write property | Falls back when… | Source | +| ---------------------------------------------------------- | -------------------------------------------------------------------------------- | ------------------------- | +| Resolved write format | not `parquet` (per-write `write-format` option or `write.format.default`) | iceberg-rust | +| `write.object-storage.enabled` | `true` — no hashed-prefix object-storage location layout | iceberg-rust | +| `write.location-provider.impl` | set — a custom Java `LocationProvider` can't be loaded | iceberg-rust | +| `format-version` | `>= 3` — row lineage / deletion vectors / variant types aren't emitted | iceberg-rust | +| `encryption.*` | any key set — Parquet modular encryption is unsupported | iceberg-rust / parquet-rs | +| `write.metadata.metrics.default` | `none`, or mentions `counts` | iceberg-rust | +| `write.metadata.metrics.column.` | any column set to `none` or `counts` | iceberg-rust | +| `write.metadata.metrics.max-inferred-column-defaults` | projected column count exceeds it while no explicit metrics default is set | iceberg-rust | +| `write.parquet.bloom-filter-max-bytes` | set — there is no global bloom-filter byte cap to honour | parquet-rs | +| `write.parquet.bloom-filter-enabled.column.` | any set to `true` — bloom-filter sizing can't be capped to match | iceberg-rust | +| `write.parquet.row-group-check-min-record-count` / `-max-` | set to a non-default value — row-group cadence is byte-driven only | parquet-rs | +| `write.parquet.stats-enabled.column.` | set (Iceberg 1.10.0+) — no matching per-column stats toggle | parquet-rs | +| `parquet.enable.dictionary` | set — an explicit dictionary override isn't translated | parquet-rs | +| `io-impl` (catalog / table) | set — native storage is chosen by URI scheme, not a Java `FileIO` | iceberg-rust | +| Data-location URI scheme | not one of `file`, `memory`, `s3`, `s3a`, `gs`, `oss` | iceberg-rust | +| Copy-on-write `MERGE` | always — the per-row merge dispatch (`MergeRowsExec`) has no native operator yet | Comet | + +Comet also falls back whenever it can't reflect the Iceberg internals needed to build an +identical write — for example an unrecognised `BatchWrite` class, a column type it can't +serialise, a `ReplaceData` projection it can't map, or a metadata-table scan. These are +conversion-time guards (source: Comet) rather than table-property gates. diff --git a/docs/source/user-guide/latest/index.rst b/docs/source/user-guide/latest/index.rst index 00f770c27e..5c8995224a 100644 --- a/docs/source/user-guide/latest/index.rst +++ b/docs/source/user-guide/latest/index.rst @@ -79,6 +79,7 @@ to read more. :hidden: Iceberg Guide + Iceberg Writes S3 Credential Providers Kubernetes Guide diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index 78ea0f0168..c4465fa51e 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -121,6 +121,16 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.write.iceberg.splitOperator.enabled") + .category(CATEGORY_TESTING) + .doc( + "Whether to rewrite Iceberg V2 writes from Spark's combined V2 write/commit operator " + + "into Comet's two-operator shape: a file writer exec (inside AQE) and a committer " + + "(outside AQE).") + .booleanConf + .createWithDefault(false) + val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] = conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit") .category(CATEGORY_SCAN) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 6c4a92f312..317c1cb386 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf._ +import org.apache.comet.iceberg.IcebergWriteStrategy import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions} import org.apache.comet.shims.ShimCometSparkSessionExtensions @@ -97,6 +98,7 @@ class CometSparkSessionExtensions extensions.injectQueryStagePrepRule { session => CometExecRule(session) } injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters) injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery) + extensions.injectPlannerStrategy { session => IcebergWriteStrategy(session) } } case class CometScanColumnar(session: SparkSession) extends ColumnarRule { diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index 2a39369c76..0be2c031ea 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -51,6 +51,10 @@ object IcebergReflection extends Logging { val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate" val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan" val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan" + val SPARK_WRITE = "org.apache.iceberg.spark.source.SparkWrite" + + // Iceberg 1.5.2 uses its own `ReplaceIcebergData` due to lack of `ReplaceData` in Spark 3.4. + val REPLACE_ICEBERG_DATA = "org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData" } /** @@ -94,6 +98,45 @@ object IcebergReflection extends Logging { val UNKNOWN = "unknown" } + /** Loads a class, returning `None` when it's absent (e.g. Iceberg not on the classpath). */ + private def tryLoadClass(name: String): Option[Class[_]] = + try Some(loadClass(name)) + catch { case _: ClassNotFoundException => None } + + private lazy val sparkWriteClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SPARK_WRITE) + + /** Whether `write` is an Iceberg `SparkWrite` (false if Iceberg isn't on the classpath). */ + def isIcebergSparkWrite(write: Any): Boolean = + sparkWriteClassOpt.exists(_.isInstance(write)) + + def isReplaceIcebergData(plan: Any): Boolean = + plan != null && plan.getClass.getName == ClassNames.REPLACE_ICEBERG_DATA + + private def reflectField(plan: Any, fieldName: String): Option[AnyRef] = + try { + val field = plan.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + Option(field.get(plan)) + } catch { + case e: Exception => + logError( + s"Iceberg reflection failure: $fieldName on ${plan.getClass.getName}: ${e.getMessage}") + None + } + + def extractReplaceIcebergDataFields(plan: Any): Option[(AnyRef, AnyRef, AnyRef, AnyRef)] = { + if (!isReplaceIcebergData(plan)) return None + for { + table <- reflectField(plan, "table") + query <- reflectField(plan, "query") + originalTable <- reflectField(plan, "originalTable") + write <- reflectField( + plan, + "write" + ) // Option[Write]; field can be Some(null) so kept AnyRef + } yield (table, query, originalTable, write) + } + /** * Loads a class using the thread context classloader first, then falls back to the system * classloader. diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergWriteLogical.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergWriteLogical.scala new file mode 100644 index 0000000000..4c70592f26 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergWriteLogical.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} +import org.apache.spark.sql.connector.write.BatchWrite + +/** Logical anchor for the writer. See `IcebergWriteStrategy` for the rationale. */ +case class IcebergWriteLogical( + child: LogicalPlan, + // Driver-side only: AQE re-planning is driver-local and write commands aren't cached. + @transient batchWrite: BatchWrite, + replaceDataDispatch: Option[ReplaceDataDispatchInfo] = None) + extends UnaryNode { + + override def output: Seq[Attribute] = Nil + + override protected def withNewChildInternal(newChild: LogicalPlan): IcebergWriteLogical = + copy(child = newChild) +} diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergWriteStrategy.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergWriteStrategy.scala new file mode 100644 index 0000000000..e4ff1419c8 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergWriteStrategy.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceData} +import org.apache.spark.sql.comet.{IcebergCommitExec, IcebergWriteExec} +import org.apache.spark.sql.connector.write.Write +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +import org.apache.comet.CometConf + +/** + * Spark Strategy that intercepts Iceberg V2 copy-on-write logical writes and emits Comet's + * two-operator physical tree. + */ +case class IcebergWriteStrategy(session: SparkSession) extends SparkStrategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + if (!CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.get(session.sessionState.conf)) { + return Nil + } + + plan match { + case ad: AppendData => + matchedSparkWrite(ad.table, ad.write, ad.query, replaceDataDispatch = None).toList + case obe: OverwriteByExpression => + matchedSparkWrite(obe.table, obe.write, obe.query, replaceDataDispatch = None).toList + case opd: OverwritePartitionsDynamic => + matchedSparkWrite(opd.table, opd.write, opd.query, replaceDataDispatch = None).toList + case rd: ReplaceData => + matchedSparkWrite( + rd.originalTable, + rd.write, + rd.query, + replaceDataDispatch = IcebergReplaceDataShim.extractProjections(rd)).toList + case plan if IcebergReflection.isReplaceIcebergData(plan) => + IcebergReflection + .extractReplaceIcebergDataFields(plan) + .flatMap { case (_, query, originalTable, write) => + matchedSparkWrite( + originalTable.asInstanceOf[org.apache.spark.sql.catalyst.analysis.NamedRelation], + write.asInstanceOf[Option[Write]], + query.asInstanceOf[LogicalPlan], + replaceDataDispatch = None) + } + .toList + // Hit by AQE. + case IcebergWriteLogical(child, batchWrite, replaceDataDispatch) => + Seq(IcebergWriteExec(batchWrite, planLater(child), replaceDataDispatch)) + case _ => Nil + } + } + + private def matchedSparkWrite( + table: org.apache.spark.sql.catalyst.analysis.NamedRelation, + write: Option[Write], + query: LogicalPlan, + replaceDataDispatch: Option[ReplaceDataDispatchInfo]): Option[SparkPlan] = { + table match { + case rel: DataSourceV2Relation => + write.flatMap { w => + if (IcebergReflection.isIcebergSparkWrite(w)) { + Some(buildTwoOp(w, rel, query, replaceDataDispatch)) + } else { + None + } + } + case _ => None + } + } + + /** + * Builds the two-op tree. The committer and writer share one `BatchWrite` (also reused across + * AQE re-plans): `toBatch()` returns a fresh instance per call, but the committer's commit-time + * validation must see the same instance the writer wrote through, hence we store it. The + * writer's child is wrapped in [[IcebergWriteLogical]] so AQE re-emits only the data-writing + * operator on each re-plan as opposed to multiple new commit operators. + */ + private def buildTwoOp( + write: Write, + rel: DataSourceV2Relation, + query: LogicalPlan, + replaceDataDispatch: Option[ReplaceDataDispatchInfo]): SparkPlan = { + val batchWrite = write.toBatch + // To mirror Spark ReplaceData semantics we invalidate our cache of the state of + // `originalTable`. + val refresh: () => Unit = () => IcebergRefreshCacheShim.recacheByPlan(rel) + IcebergCommitExec( + batchWrite, + refresh, + // `replaceDataDispatch` may project the data into the format the writer expects. + planLater(IcebergWriteLogical(query, batchWrite, replaceDataDispatch))) + } +} diff --git a/spark/src/main/scala/org/apache/comet/iceberg/ReplaceDataDispatchInfo.scala b/spark/src/main/scala/org/apache/comet/iceberg/ReplaceDataDispatchInfo.scala new file mode 100644 index 0000000000..8f9274a9e2 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/iceberg/ReplaceDataDispatchInfo.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.catalyst.ProjectingInternalRow + +/** + * Mirror of Spark 4.x's `ReplaceDataProjections`, defined here so we can compile against Spark + * 3.4 / 3.5 source trees (where the class doesn't exist). Populated by `IcebergReplaceDataShim` + * on 4.x and left `None` on 3.x; consumed by `IcebergWriteExec`. + */ +case class ReplaceDataDispatchInfo( + rowProjection: ProjectingInternalRow, + metadataProjection: Option[ProjectingInternalRow]) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/IcebergCommitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/IcebergCommitExec.scala new file mode 100644 index 0000000000..3df001a909 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/IcebergCommitExec.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.write.{BatchWrite, WriterCommitMessage} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +/** + * Driver-side committer for Comet's split-operator Iceberg V2 write. + */ +case class IcebergCommitExec( + // Neither of these fields are serialized, this is all run on the driver. + @transient batchWrite: BatchWrite, + @transient refreshCache: () => Unit, + child: SparkPlan) + extends V2CommandExec + with UnaryExecNode + with Logging { + + override def output: Seq[Attribute] = Nil + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numCommittedMessages" -> SQLMetrics + .createMetric(sparkContext, "number of task commit messages")) + + override protected def run(): Seq[InternalRow] = { + val messages: Array[WriterCommitMessage] = child.executeCollect().map { row => + IcebergWriteExec.deserializeMessage(row.getBinary(0)) + } + longMetric("numCommittedMessages").add(messages.length) + + try { + messages.foreach(batchWrite.onDataWriterCommit) + batchWrite.commit(messages) + logInfo(s"Iceberg commit succeeded with ${messages.length} task message(s)") + } catch { + case cause: Throwable => + logError(s"Iceberg commit failed; aborting ${messages.length} task message(s)", cause) + try batchWrite.abort(messages) + catch { + case abortFailure: Throwable => + cause.addSuppressed(abortFailure) + } + throw cause + } + + refreshCache() + Nil + } + + override protected def withNewChildInternal(newChild: SparkPlan): IcebergCommitExec = + copy(child = newChild) + + override def nodeName: String = "IcebergCommit" +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/IcebergWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/IcebergWriteExec.scala new file mode 100644 index 0000000000..f04c7b4cb6 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/IcebergWriteExec.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, PhysicalWriteInfoImpl, WriterCommitMessage} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.apache.spark.util.Utils + +import org.apache.comet.iceberg.ReplaceDataDispatchInfo + +/** + * Executor-side file writer for Comet's split-operator Iceberg V2 write. + */ +case class IcebergWriteExec( + // `batchWrite` only stored driver side, only the writer factory is shipped to executors. + @transient batchWrite: BatchWrite, + child: SparkPlan, + replaceDataDispatch: Option[ReplaceDataDispatchInfo] = None) + extends UnaryExecNode { + + override def output: Seq[Attribute] = Seq( + AttributeReference(IcebergWriteExec.CommitMessageColumn, BinaryType, nullable = false)()) + + // Spark already adds a distribution for the V2 write; adding another here is redundant. + override def requiredChildDistribution: Seq[Distribution] = Seq(UnspecifiedDistribution) + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + override protected def doExecute(): RDD[InternalRow] = { + val rdd = child.execute() + val factory = batchWrite.createBatchWriterFactory(PhysicalWriteInfoImpl(rdd.getNumPartitions)) + require( + !batchWrite.useCommitCoordinator(), + "Comet's Iceberg write path does not currently support BatchWrite implementations that " + + "require Spark's commit coordinator; received: " + batchWrite.getClass.getName) + + val rowsMetric = longMetric("numOutputRows") + val schemaTypes = output.map(_.dataType).toArray + val capturedReplaceDataDispatch = replaceDataDispatch + rdd.mapPartitionsInternal { iter => + val partId = TaskContext.getPartitionId() + val taskId = TaskContext.get().taskAttemptId() + val writer = factory.createWriter(partId, taskId) + val projection = UnsafeProjection.create(schemaTypes) + IcebergWriteExec.runWriter( + writer, + iter, + rowsMetric, + projection, + capturedReplaceDataDispatch) + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): IcebergWriteExec = + copy(child = newChild) + + override def nodeName: String = "IcebergWrite" +} + +object IcebergWriteExec { + + val CommitMessageColumn: String = "iceberg_commit_message" + + val OutputSchema: StructType = StructType( + Seq(StructField(CommitMessageColumn, BinaryType, nullable = false))) + + /** Writes data files and returns the serialised Iceberg commit message. */ + def runWriter( + writer: DataWriter[InternalRow], + iter: Iterator[InternalRow], + rowsMetric: SQLMetric, + projection: UnsafeProjection, + replaceDataDispatch: Option[ReplaceDataDispatchInfo]): Iterator[InternalRow] = { + val message = Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + if (replaceDataDispatch.isDefined) { + runReplaceDataWriter(writer, iter, replaceDataDispatch.get, rowsMetric) + } else { + while (iter.hasNext) { + writer.write(iter.next()) + rowsMetric.add(1L) + } + } + writer.commit() + })( + catchBlock = { + writer.abort() + }, + finallyBlock = { + writer.close() + }) + + Iterator.single(projection(InternalRow(serializeMessage(message))).copy()) + } + + // Mirrors Spark RowDeltaUtils, which is private and changes location across versions. + private val WRITE_OPERATION = 5 + private val WRITE_WITH_METADATA_OPERATION = 6 + + // Spark has different `DataWriter#write` methods across versions. + @transient private lazy val dataWriterWriteWithMetadataMethod + : Option[java.lang.reflect.Method] = + try Some(classOf[DataWriter[_]].getMethod("write", classOf[Object], classOf[Object])) + catch { case _: NoSuchMethodException => None } + + def serializeMessage(message: WriterCommitMessage): Array[Byte] = { + val bos = new ByteArrayOutputStream() + val oos = new ObjectOutputStream(bos) + try oos.writeObject(message) + finally oos.close() + bos.toByteArray + } + + def deserializeMessage(bytes: Array[Byte]): WriterCommitMessage = { + val bis = new ByteArrayInputStream(bytes) + val ois = new ObjectInputStream(bis) + try ois.readObject().asInstanceOf[WriterCommitMessage] + finally ois.close() + } + + private def runReplaceDataWriter( + writer: DataWriter[InternalRow], + iter: Iterator[InternalRow], + dispatch: ReplaceDataDispatchInfo, + rowsMetric: SQLMetric): Unit = { + val rowProjection = dispatch.rowProjection + val metadataProjection = dispatch.metadataProjection.orNull + while (iter.hasNext) { + val row = iter.next() + rowsMetric.add(1L) + row.getInt(0) match { + case WRITE_OPERATION => + rowProjection.project(row) + writer.write(rowProjection) + case WRITE_WITH_METADATA_OPERATION => + rowProjection.project(row) + if (metadataProjection != null) metadataProjection.project(row) + val writeWithMetadata = dataWriterWriteWithMetadataMethod.getOrElse( + throw new UnsupportedOperationException( + "DataWriter.write(metadata, row) is not available in this Spark version but the " + + s"analyzer emitted operation code $WRITE_WITH_METADATA_OPERATION")) + writeWithMetadata.invoke(writer, metadataProjection, rowProjection) + case other => + throw new IllegalArgumentException( + s"Unexpected ReplaceData operation code $other; supported: " + + s"$WRITE_OPERATION (WRITE), $WRITE_WITH_METADATA_OPERATION (WRITE_WITH_METADATA)") + } + } + } +} diff --git a/spark/src/main/spark-3.x/org/apache/comet/iceberg/IcebergRefreshCacheShim.scala b/spark/src/main/spark-3.x/org/apache/comet/iceberg/IcebergRefreshCacheShim.scala new file mode 100644 index 0000000000..4a0d97dc8e --- /dev/null +++ b/spark/src/main/spark-3.x/org/apache/comet/iceberg/IcebergRefreshCacheShim.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +private[iceberg] object IcebergRefreshCacheShim { + def recacheByPlan(plan: LogicalPlan): Unit = { + val session = SparkSession.active + session.sharedState.cacheManager.recacheByPlan(session, plan) + } +} diff --git a/spark/src/main/spark-3.x/org/apache/comet/iceberg/IcebergReplaceDataShim.scala b/spark/src/main/spark-3.x/org/apache/comet/iceberg/IcebergReplaceDataShim.scala new file mode 100644 index 0000000000..5168e35012 --- /dev/null +++ b/spark/src/main/spark-3.x/org/apache/comet/iceberg/IcebergReplaceDataShim.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.catalyst.plans.logical.ReplaceData + +private[iceberg] object IcebergReplaceDataShim { + def extractProjections(rd: ReplaceData): Option[ReplaceDataDispatchInfo] = None +} diff --git a/spark/src/main/spark-4.x/org/apache/comet/iceberg/IcebergRefreshCacheShim.scala b/spark/src/main/spark-4.x/org/apache/comet/iceberg/IcebergRefreshCacheShim.scala new file mode 100644 index 0000000000..cf68aa645e --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/comet/iceberg/IcebergRefreshCacheShim.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.classic.SparkSession + +/** Spark 4.x imports the classic `SparkSession`; the api-module one has no `sharedState`. */ +private[iceberg] object IcebergRefreshCacheShim { + def recacheByPlan(plan: LogicalPlan): Unit = { + val session = SparkSession.active + session.sharedState.cacheManager.recacheByPlan(session, plan) + } +} diff --git a/spark/src/main/spark-4.x/org/apache/comet/iceberg/IcebergReplaceDataShim.scala b/spark/src/main/spark-4.x/org/apache/comet/iceberg/IcebergReplaceDataShim.scala new file mode 100644 index 0000000000..c860580005 --- /dev/null +++ b/spark/src/main/spark-4.x/org/apache/comet/iceberg/IcebergReplaceDataShim.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.iceberg + +import org.apache.spark.sql.catalyst.plans.logical.ReplaceData + +/** + * Spark 4.x: `ReplaceData` carries `projections: ReplaceDataProjections` -- the rewritten row + * stream is prefixed with an operation column (5=WRITE, 6=WRITE_WITH_METADATA), and we have to + * apply `dataProj` / `metadataProj` before handing rows to the underlying `DataWriter`. + */ +private[iceberg] object IcebergReplaceDataShim { + def extractProjections(rd: ReplaceData): Option[ReplaceDataDispatchInfo] = { + val p = rd.projections + Some(ReplaceDataDispatchInfo(p.rowProjection, p.metadataProjection)) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergWriteActionSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergWriteActionSuite.scala new file mode 100644 index 0000000000..0fae8f3138 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometIcebergWriteActionSuite.scala @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.io.File + +import scala.collection.mutable + +import org.apache.spark.{CometListenerBusUtils, SparkConf} +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.Row +import org.apache.spark.sql.comet.{IcebergCommitExec, IcebergWriteExec} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.util.QueryExecutionListener + +private case class WriteSnapshot(snapshotDelta: Long, plans: Seq[SparkPlan]) + +class CometIcebergWriteActionSuite + extends CometTestBase + with AdaptiveSparkPlanHelper + with CometIcebergTestBase { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.key, "true") + .set( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + } + + test("AppendData unpartitioned INSERT INTO routes through two-op") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "append_unpart", partitionSpec = "") + val snapshot = captureWrite("append_unpart") { + spark.sql( + "INSERT INTO cat.db.append_unpart VALUES " + + "(1, 'us-east', 10.5), (2, 'us-west', 20.3), (3, 'eu', 30.7)") + } + assertExactlyOneCommit(snapshot) + assertRows("append_unpart", expectedIds = Seq(1, 2, 3)) + } + } + + test("AppendData partitioned INSERT INTO routes through two-op") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "append_part", partitionSpec = "PARTITIONED BY (region)") + val snapshot = captureWrite("append_part") { + spark.sql( + "INSERT INTO cat.db.append_part VALUES " + + "(1, 'us-east', 10.5), (2, 'us-east', 20.3), (3, 'eu', 30.7)") + } + assertExactlyOneCommit(snapshot) + assertRows("append_part", expectedIds = Seq(1, 2, 3)) + } + } + + test("AppendData INSERT FROM SELECT survives the intervening exchange/sort") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "src", partitionSpec = "") + createTable(warehouseDir, "append_from_select", partitionSpec = "PARTITIONED BY (region)") + spark.sql( + "INSERT INTO cat.db.src VALUES " + + "(1, 'us-east', 10.5), (2, 'us-west', 20.3), (3, 'eu', 30.7)") + + val snapshot = captureWrite("append_from_select") { + spark.sql( + "INSERT INTO cat.db.append_from_select " + + "SELECT id, region, amount FROM cat.db.src ORDER BY id") + } + assertExactlyOneCommit(snapshot) + assertRows("append_from_select", expectedIds = Seq(1, 2, 3)) + } + } + + test("AppendData on an empty source still emits a single commit") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "empty_target", partitionSpec = "") + val snapshot = captureWrite("empty_target") { + spark.sql( + "INSERT INTO cat.db.empty_target SELECT id, region, amount " + + "FROM (SELECT 1 AS id, 'r' AS region, 1.0 AS amount) WHERE id < 0") + } + assertExactlyOneCommit(snapshot) + assertRows("empty_target", expectedIds = Seq.empty) + } + } + + test("OverwriteByExpression replaces existing rows via two-op") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "overwrite_static", partitionSpec = "") + spark.sql( + "INSERT INTO cat.db.overwrite_static VALUES " + + "(1, 'old', 1.0), (2, 'old', 2.0), (3, 'old', 3.0)") + + val snapshot = captureWrite("overwrite_static") { + withSQLConf("spark.sql.sources.partitionOverwriteMode" -> "STATIC") { + spark.sql( + "INSERT OVERWRITE cat.db.overwrite_static VALUES " + + "(10, 'new', 100.0), (11, 'new', 110.0)") + } + } + assertExactlyOneCommit(snapshot) + assertRows("overwrite_static", expectedIds = Seq(10, 11)) + } + } + + test("OverwritePartitionsDynamic replaces only touched partitions") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "overwrite_dynamic", partitionSpec = "PARTITIONED BY (region)") + spark.sql( + "INSERT INTO cat.db.overwrite_dynamic VALUES " + + "(1, 'us-east', 1.0), (2, 'us-west', 2.0), (3, 'eu', 3.0)") + + val snapshot = captureWrite("overwrite_dynamic") { + withSQLConf("spark.sql.sources.partitionOverwriteMode" -> "DYNAMIC") { + spark.sql("INSERT OVERWRITE cat.db.overwrite_dynamic VALUES (10, 'us-east', 100.0)") + } + } + assertExactlyOneCommit(snapshot) + val ids = spark + .sql("SELECT id FROM cat.db.overwrite_dynamic ORDER BY id") + .collect() + .map(_.getInt(0)) + .toSeq + assert(ids == Seq(2, 3, 10), s"expected (2,3,10), got $ids") + } + } + + test("ReplaceData (CoW DELETE) on a row predicate goes through two-op") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable( + warehouseDir, + "cow_delete", + partitionSpec = "", + properties = Some("'write.delete.mode'='copy-on-write'")) + withSQLConf(CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.key -> "false") { + coalesceInsert( + "cow_delete", + Seq((1, "us-east", 10.0), (2, "us-west", 20.0), (3, "eu", 30.0), (4, "us-east", 40.0))) + } + + val snapshot = captureWrite("cow_delete") { + spark.sql("DELETE FROM cat.db.cow_delete WHERE id = 2") + } + assertExactlyOneCommit(snapshot) + assertRows("cow_delete", expectedIds = Seq(1, 3, 4)) + } + } + + test("ReplaceData (CoW UPDATE) routes through two-op") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable( + warehouseDir, + "cow_update", + partitionSpec = "", + properties = Some("'write.update.mode'='copy-on-write'")) + coalesceInsert( + "cow_update", + Seq((1, "us-east", 10.0), (2, "us-west", 20.0), (3, "eu", 30.0))) + + val snapshot = captureWrite("cow_update") { + spark.sql("UPDATE cat.db.cow_update SET amount = amount * 2 WHERE id = 2") + } + assertExactlyOneCommit(snapshot) + val r = spark + .sql("SELECT id, amount FROM cat.db.cow_update WHERE id = 2") + .collect() + assert(r.length == 1 && r(0).getDouble(1) == 40.0, s"got ${r.toSeq}") + } + } + + test("ReplaceData (CoW MERGE) with matched and unmatched legs routes through two-op") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable( + warehouseDir, + "cow_merge", + partitionSpec = "", + properties = Some("'write.merge.mode'='copy-on-write'")) + coalesceInsert("cow_merge", Seq((1, "us-east", 10.0), (2, "us-west", 20.0))) + + val snapshot = captureWrite("cow_merge") { + spark.sql(""" + |MERGE INTO cat.db.cow_merge t + |USING (SELECT 2 AS id, 'us-west' AS region, 200.0 AS amount UNION ALL + | SELECT 3 AS id, 'eu' AS region, 30.0 AS amount) s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.amount = s.amount + |WHEN NOT MATCHED THEN INSERT (id, region, amount) VALUES (s.id, s.region, s.amount) + |""".stripMargin) + } + assertExactlyOneCommit(snapshot) + assertRows("cow_merge", expectedIds = Seq(1, 2, 3)) + } + } + + test("sanity check: Spark's default DELETE path works against a Hadoop catalog") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + withSQLConf(CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.key -> "false") { + createTable( + warehouseDir, + "spark_cow_delete", + partitionSpec = "", + properties = Some("'write.delete.mode'='copy-on-write'")) + coalesceInsert( + "spark_cow_delete", + Seq((1, "us-east", 10.0), (2, "us-west", 20.0), (3, "eu", 30.0), (4, "us-east", 40.0))) + spark.sql("DELETE FROM cat.db.spark_cow_delete WHERE id = 2") + assertRows("spark_cow_delete", expectedIds = Seq(1, 3, 4)) + } + } + } + + test("disabled config falls through to Spark's V2ExistingTableWriteExec") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "disabled_conf", partitionSpec = "") + + val snapshot = captureWrite("disabled_conf") { + withSQLConf(CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.key -> "false") { + spark.sql("INSERT INTO cat.db.disabled_conf VALUES (1, 'us-east', 10.5)") + } + } + val (commits, writes) = collectIcebergWriteOps(snapshot.plans) + assert(commits.isEmpty, s"unexpected IcebergCommitExec: $commits") + assert(writes.isEmpty, s"unexpected IcebergWriteExec: $writes") + assertRows("disabled_conf", expectedIds = Seq(1)) + } + } + + test("Comet-written rows round-trip through Spark's reader unchanged") { + assume(icebergAvailable, "Iceberg not available in classpath") + withIcebergCatalog { warehouseDir => + createTable(warehouseDir, "parity_comet", partitionSpec = "PARTITIONED BY (region)") + createTable(warehouseDir, "parity_spark", partitionSpec = "PARTITIONED BY (region)") + + spark.sql( + "INSERT INTO cat.db.parity_comet VALUES " + + "(1, 'us', 1.5), (2, 'eu', 2.5), (3, 'ap', 3.5), (4, 'us', 4.5)") + + withSQLConf(CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.key -> "false") { + spark.sql( + "INSERT INTO cat.db.parity_spark VALUES " + + "(1, 'us', 1.5), (2, 'eu', 2.5), (3, 'ap', 3.5), (4, 'us', 4.5)") + } + + val cometRows: Array[Row] = spark + .sql("SELECT id, region, amount FROM cat.db.parity_comet ORDER BY id") + .collect() + val sparkRows: Array[Row] = spark + .sql("SELECT id, region, amount FROM cat.db.parity_spark ORDER BY id") + .collect() + assert(cometRows.toSeq == sparkRows.toSeq, s"$cometRows vs $sparkRows") + } + } + + private val catalog = "cat" + private val ns = "db" + + private def withIcebergCatalog(f: File => Unit): Unit = withTempIcebergDir { warehouseDir => + withSQLConf( + s"spark.sql.catalog.$catalog" -> "org.apache.iceberg.spark.SparkCatalog", + s"spark.sql.catalog.$catalog.type" -> "hadoop", + s"spark.sql.catalog.$catalog.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + f(warehouseDir) + } + } + + private def createTable( + warehouseDir: File, + tableName: String, + partitionSpec: String, + properties: Option[String] = None): Unit = { + val props = properties.map(s => s" TBLPROPERTIES ($s)").getOrElse("") + spark.sql(s""" + CREATE TABLE $catalog.$ns.$tableName ( + id INT, + region STRING, + amount DOUBLE + ) USING iceberg + $partitionSpec + $props + """) + } + + private def coalesceInsert(tableName: String, rows: Seq[(Int, String, Double)]): Unit = { + val session = spark + import session.implicits._ + rows + .toDF("id", "region", "amount") + .coalesce(1) + .writeTo(s"$catalog.$ns.$tableName") + .append() + } + + private def captureWrite(tableName: String)(action: => Unit): WriteSnapshot = { + val before = countSnapshots(tableName) + val captured = mutable.Buffer.empty[SparkPlan] + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + captured += qe.executedPlan + } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = + () + } + spark.listenerManager.register(listener) + try { + action + try CometListenerBusUtils.waitUntilEmpty(spark.sparkContext) + catch { case _: java.util.concurrent.TimeoutException => () } + } finally { + spark.listenerManager.unregister(listener) + } + val after = countSnapshots(tableName) + WriteSnapshot(after - before, captured.toSeq) + } + + private def countSnapshots(tableName: String): Long = + try { + spark + .sql(s"SELECT count(*) FROM $catalog.$ns.$tableName.snapshots") + .collect() + .head + .getLong(0) + } catch { + case _: Throwable => 0L + } + + private def collectIcebergWriteOps( + plans: Seq[SparkPlan]): (Seq[IcebergCommitExec], Seq[IcebergWriteExec]) = { + val commits = plans.flatMap { plan => + collectWithSubqueries(plan) { case c: IcebergCommitExec => c } + } + val writes = plans.flatMap { plan => + collectWithSubqueries(plan) { case w: IcebergWriteExec => w } + } + (commits, writes) + } + + private def assertExactlyOneCommit(snapshot: WriteSnapshot): Unit = { + assert( + snapshot.snapshotDelta == 1L, + s"expected exactly 1 new Iceberg snapshot, got ${snapshot.snapshotDelta}. Plans:\n" + + snapshot.plans.mkString("\n--\n")) + val (commits, writes) = collectIcebergWriteOps(snapshot.plans) + assert( + commits.nonEmpty, + s"expected >= 1 IcebergCommitExec in captured plans, got ${commits.size}. Plans:\n" + + snapshot.plans.mkString("\n--\n")) + assert( + writes.nonEmpty, + s"expected >= 1 IcebergWriteExec in captured plans, got ${writes.size}. Plans:\n" + + snapshot.plans.mkString("\n--\n")) + } + + private def assertRows(tableName: String, expectedIds: Seq[Int]): Unit = { + val ids = spark + .sql(s"SELECT id FROM $catalog.$ns.$tableName ORDER BY id") + .collect() + .map(_.getInt(0)) + .toSeq + assert(ids == expectedIds, s"expected $expectedIds, got $ids") + } + +}