Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteActionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
org.apache.comet.csv.CometCsvNativeReadSuite
org.apache.comet.CometFuzzTestSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ jobs:
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
org.apache.comet.CometIcebergRewriteActionSuite
org.apache.comet.CometIcebergWriteActionSuite
org.apache.comet.iceberg.IcebergReflectionSuite
org.apache.comet.csv.CometCsvNativeReadSuite
org.apache.comet.CometFuzzTestSuite
Expand Down
107 changes: 107 additions & 0 deletions docs/source/user-guide/latest/iceberg-writes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Accelerating Apache Iceberg V2 Writes using Comet

## Overview

Iceberg-java writes a V2 table through a single Spark operator that combines data-file writing,
metadata writing, committing, and validation, and it consumes one row at a time. Comet's query
results are columnar (Arrow), so feeding them to that operator means transposing every batch
back into rows — only for Parquet's own encoder to re-assemble the columns on the way to disk.

Comet's Iceberg write support addresses this in two independent layers:

1. **Split-operator plan.** Comet splits the data-file-writing step out into its own Spark
operator so it can accelerate just that part of the process — the scans, projects, sorts, and
exchanges that feed the write become visible to AQE and Comet's columnar rules. Metadata
writing, committing, and validation stay in the JVM exactly as iceberg-java does them.
2. **Native Parquet write.** With the split in place, Comet may write the Iceberg data files
using [iceberg-rust](https://github.com/apache/iceberg-rust) instead of iceberg-java, under
the circumstances described below. This keeps the data columnar end-to-end and avoids the row
transpose.

Both layers are off by default, and both fall back to the stock iceberg-java writer whenever
they can't be applied safely.

## Configuration

Standard Comet + Iceberg setup (see [`iceberg.md`](iceberg.md)) plus two write-side toggles:

```
# Standard Comet / Iceberg wiring
spark.plugins=org.apache.spark.CometPlugin
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.<name>=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.<name>.type=hadoop # or hive / glue / rest / ...
spark.sql.catalog.<name>.warehouse=...

# Comet write toggles (both off by default; enable independently)
spark.comet.write.iceberg.splitOperator.enabled=true # layer 1: split-operator plan
spark.comet.write.iceberg.nativeAcceleration=true # layer 2: native Parquet write
```

The native write (layer 2) only engages when the split plan (layer 1) is also enabled. With
`splitOperator.enabled=false`, every write goes straight through iceberg-java.

## Spark version compatibility

The split-operator plan and the native write are supported on Spark 3.4, 3.5, and 4.0, with
identical coverage on all three: `INSERT INTO`, `INSERT OVERWRITE` (static and dynamic), and
copy-on-write `DELETE` / `UPDATE` / `MERGE`.

Merge-on-read writes (Iceberg `WriteDelta`, used when a table is configured for merge-on-read)
are **not** intercepted — their per-row delta writer can't be made native yet — so they always
run on iceberg-java.

## When Comet falls back to the iceberg-java writer

If the split-operator plan is disabled (`splitOperator.enabled=false`), nothing is intercepted
and every write runs on iceberg-java.

When the split plan is enabled but the native write can't reproduce iceberg-java's output, only
the file write falls back — the split plan still accelerates the data sub-query. Each native-write
gate below is checked at planning time, so Comet writes natively only when it can produce the same
`DataFile`s iceberg-java would. The **Source** column attributes each gap to the responsible
layer: **iceberg-rust** (the high-level Iceberg writer) or **parquet-rs** (the Rust Parquet
encoder it drives).

| Table / write property | Falls back when… | Source |
| ---------------------------------------------------------- | -------------------------------------------------------------------------------- | ------------------------- |
| Resolved write format | not `parquet` (per-write `write-format` option or `write.format.default`) | iceberg-rust |
| `write.object-storage.enabled` | `true` — no hashed-prefix object-storage location layout | iceberg-rust |
| `write.location-provider.impl` | set — a custom Java `LocationProvider` can't be loaded | iceberg-rust |
| `format-version` | `>= 3` — row lineage / deletion vectors / variant types aren't emitted | iceberg-rust |
| `encryption.*` | any key set — Parquet modular encryption is unsupported | iceberg-rust / parquet-rs |
| `write.metadata.metrics.default` | `none`, or mentions `counts` | iceberg-rust |
| `write.metadata.metrics.column.<col>` | any column set to `none` or `counts` | iceberg-rust |
| `write.metadata.metrics.max-inferred-column-defaults` | projected column count exceeds it while no explicit metrics default is set | iceberg-rust |
| `write.parquet.bloom-filter-max-bytes` | set — there is no global bloom-filter byte cap to honour | parquet-rs |
| `write.parquet.bloom-filter-enabled.column.<col>` | any set to `true` — bloom-filter sizing can't be capped to match | iceberg-rust |
| `write.parquet.row-group-check-min-record-count` / `-max-` | set to a non-default value — row-group cadence is byte-driven only | parquet-rs |
| `write.parquet.stats-enabled.column.<col>` | set (Iceberg 1.10.0+) — no matching per-column stats toggle | parquet-rs |
| `parquet.enable.dictionary` | set — an explicit dictionary override isn't translated | parquet-rs |
| `io-impl` (catalog / table) | set — native storage is chosen by URI scheme, not a Java `FileIO` | iceberg-rust |
| Data-location URI scheme | not one of `file`, `memory`, `s3`, `s3a`, `gs`, `oss` | iceberg-rust |
| Copy-on-write `MERGE` | always — the per-row merge dispatch (`MergeRowsExec`) has no native operator yet | Comet |

Comet also falls back whenever it can't reflect the Iceberg internals needed to build an
identical write — for example an unrecognised `BatchWrite` class, a column type it can't
serialise, a `ReplaceData` projection it can't map, or a metadata-table scan. These are
conversion-time guards (source: Comet) rather than table-property gates.
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ to read more.
:hidden:

Iceberg Guide <iceberg>
Iceberg Writes <iceberg-writes>
S3 Credential Providers <s3-credential-providers>
Kubernetes Guide <kubernetes>

Expand Down
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.write.iceberg.splitOperator.enabled")
.category(CATEGORY_TESTING)
.doc(
"Whether to rewrite Iceberg V2 writes from Spark's combined V2 write/commit operator " +
"into Comet's two-operator shape: a file writer exec (inside AQE) and a committer " +
"(outside AQE).")
.booleanConf
.createWithDefault(false)

val COMET_ICEBERG_DATA_FILE_CONCURRENCY_LIMIT: ConfigEntry[Int] =
conf("spark.comet.scan.icebergNative.dataFileConcurrencyLimit")
.category(CATEGORY_SCAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf._
import org.apache.comet.iceberg.IcebergWriteStrategy
import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions}
import org.apache.comet.shims.ShimCometSparkSessionExtensions

Expand Down Expand Up @@ -97,6 +98,7 @@ class CometSparkSessionExtensions
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters)
injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery)
extensions.injectPlannerStrategy { session => IcebergWriteStrategy(session) }
}

case class CometScanColumnar(session: SparkSession) extends ColumnarRule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ object IcebergReflection extends Logging {
val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate"
val SPARK_BATCH_QUERY_SCAN = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan"
val SPARK_WRITE = "org.apache.iceberg.spark.source.SparkWrite"

// Iceberg 1.5.2 uses its own `ReplaceIcebergData` due to lack of `ReplaceData` in Spark 3.4.
val REPLACE_ICEBERG_DATA = "org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData"
}

/**
Expand Down Expand Up @@ -94,6 +98,45 @@ object IcebergReflection extends Logging {
val UNKNOWN = "unknown"
}

/** Loads a class, returning `None` when it's absent (e.g. Iceberg not on the classpath). */
private def tryLoadClass(name: String): Option[Class[_]] =
try Some(loadClass(name))
catch { case _: ClassNotFoundException => None }

private lazy val sparkWriteClassOpt: Option[Class[_]] = tryLoadClass(ClassNames.SPARK_WRITE)

/** Whether `write` is an Iceberg `SparkWrite` (false if Iceberg isn't on the classpath). */
def isIcebergSparkWrite(write: Any): Boolean =
sparkWriteClassOpt.exists(_.isInstance(write))

def isReplaceIcebergData(plan: Any): Boolean =
plan != null && plan.getClass.getName == ClassNames.REPLACE_ICEBERG_DATA

private def reflectField(plan: Any, fieldName: String): Option[AnyRef] =
try {
val field = plan.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
Option(field.get(plan))
} catch {
case e: Exception =>
logError(
s"Iceberg reflection failure: $fieldName on ${plan.getClass.getName}: ${e.getMessage}")
None
}

def extractReplaceIcebergDataFields(plan: Any): Option[(AnyRef, AnyRef, AnyRef, AnyRef)] = {
if (!isReplaceIcebergData(plan)) return None
for {
table <- reflectField(plan, "table")
query <- reflectField(plan, "query")
originalTable <- reflectField(plan, "originalTable")
write <- reflectField(
plan,
"write"
) // Option[Write]; field can be Some(null) so kept AnyRef
} yield (table, query, originalTable, write)
}

/**
* Loads a class using the thread context classloader first, then falls back to the system
* classloader.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.iceberg

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.connector.write.BatchWrite

/** Logical anchor for the writer. See `IcebergWriteStrategy` for the rationale. */
case class IcebergWriteLogical(
child: LogicalPlan,
// Driver-side only: AQE re-planning is driver-local and write commands aren't cached.
@transient batchWrite: BatchWrite,
replaceDataDispatch: Option[ReplaceDataDispatchInfo] = None)
extends UnaryNode {

override def output: Seq[Attribute] = Nil

override protected def withNewChildInternal(newChild: LogicalPlan): IcebergWriteLogical =
copy(child = newChild)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.iceberg

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceData}
import org.apache.spark.sql.comet.{IcebergCommitExec, IcebergWriteExec}
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import org.apache.comet.CometConf

/**
* Spark Strategy that intercepts Iceberg V2 copy-on-write logical writes and emits Comet's
* two-operator physical tree.
*/
case class IcebergWriteStrategy(session: SparkSession) extends SparkStrategy {

override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
if (!CometConf.COMET_ICEBERG_WRITE_SPLIT_OPERATOR_ENABLED.get(session.sessionState.conf)) {
return Nil
}

plan match {
case ad: AppendData =>
matchedSparkWrite(ad.table, ad.write, ad.query, replaceDataDispatch = None).toList
case obe: OverwriteByExpression =>
matchedSparkWrite(obe.table, obe.write, obe.query, replaceDataDispatch = None).toList
case opd: OverwritePartitionsDynamic =>
matchedSparkWrite(opd.table, opd.write, opd.query, replaceDataDispatch = None).toList
case rd: ReplaceData =>
matchedSparkWrite(
rd.originalTable,
rd.write,
rd.query,
replaceDataDispatch = IcebergReplaceDataShim.extractProjections(rd)).toList
case plan if IcebergReflection.isReplaceIcebergData(plan) =>
IcebergReflection
.extractReplaceIcebergDataFields(plan)
.flatMap { case (_, query, originalTable, write) =>
matchedSparkWrite(
originalTable.asInstanceOf[org.apache.spark.sql.catalyst.analysis.NamedRelation],
write.asInstanceOf[Option[Write]],
query.asInstanceOf[LogicalPlan],
replaceDataDispatch = None)
}
.toList
// Hit by AQE.
case IcebergWriteLogical(child, batchWrite, replaceDataDispatch) =>
Seq(IcebergWriteExec(batchWrite, planLater(child), replaceDataDispatch))
case _ => Nil
}
}

private def matchedSparkWrite(
table: org.apache.spark.sql.catalyst.analysis.NamedRelation,
write: Option[Write],
query: LogicalPlan,
replaceDataDispatch: Option[ReplaceDataDispatchInfo]): Option[SparkPlan] = {
table match {
case rel: DataSourceV2Relation =>
write.flatMap { w =>
if (IcebergReflection.isIcebergSparkWrite(w)) {
Some(buildTwoOp(w, rel, query, replaceDataDispatch))
} else {
None
}
}
case _ => None
}
}

/**
* Builds the two-op tree. The committer and writer share one `BatchWrite` (also reused across
* AQE re-plans): `toBatch()` returns a fresh instance per call, but the committer's commit-time
* validation must see the same instance the writer wrote through, hence we store it. The
* writer's child is wrapped in [[IcebergWriteLogical]] so AQE re-emits only the data-writing
* operator on each re-plan as opposed to multiple new commit operators.
*/
private def buildTwoOp(
write: Write,
rel: DataSourceV2Relation,
query: LogicalPlan,
replaceDataDispatch: Option[ReplaceDataDispatchInfo]): SparkPlan = {
val batchWrite = write.toBatch
// To mirror Spark ReplaceData semantics we invalidate our cache of the state of
// `originalTable`.
val refresh: () => Unit = () => IcebergRefreshCacheShim.recacheByPlan(rel)
IcebergCommitExec(
batchWrite,
refresh,
// `replaceDataDispatch` may project the data into the format the writer expects.
planLater(IcebergWriteLogical(query, batchWrite, replaceDataDispatch)))
}
}
Loading
Loading