perf(parquet): revise filter pushdown configuration#4722
Merged
Conversation
6a809fe to
00cd68a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
While investigating TPC-DS performance regressions on DF 54.0, I noticed that filters Spark's optimizer pushes to a Comet native scan never reach DataFusion's
ParquetSourceunder the default config, even though the filters appear in EXPLAIN/Spark UI (note that they match the associated CometFilter expressions):In the Spark UI, the CometFilter output is far smaller than the scan output, work that should have happened at scan time:
That's a lot of I/O that likely didn't need to happen.
A note on terminology: Spark and most engines use "filter pushdown" to mean evaluating filters at the scan against Parquet metadata (row group statistics, page index, bloom filters) so whole row groups or pages can be skipped without decoding. DataFusion's
pushdown_filtersconfig means something narrower: in addition to metadata-based pruning, the reader evaluates the predicate on filter columns first, builds a selection mask, and lazily materializes the remaining projected columns only for surviving rows. Format-level pruning runs whenever a predicate is attached to the source;pushdown_filtersonly controls the row-level evaluation and late-materialization layer.CometNativeScangated serialization ofscan.supportedDataFiltersonspark.comet.parquet.respectFilterPushdown(added in #1936), defaulting tofalse. With the default, the protobuf crossed JNI with an emptydata_filterslist, the native side builtParquetSourcewith no predicate, and as a result:RowFilterevaluation had nothing to evaluate.CometFilterabove the scan did the full row-level reduction on the decoded Arrow batches.The hardcoded
table_parquet_options.global.pushdown_filters = trueinparquet_exec.rswas therefore dead in the default config: enabling row-level eval has no effect when no predicate is attached. ThePushedFilters: [...]andDataFilters: [...]in the Spark plan come fromCometScanExec's fields populated by Spark's optimizer and reflect what Spark planned, not what crossed JNI.This PR makes the first three (format-level pruning) work by default whenever Spark's
spark.sql.parquet.filterPushdownis on. Row-levelRowFilterevaluation is the only piece that remains opt-in, gated by the newspark.comet.parquet.rowFilterPushdown.enabledflag (defaultfalse).What changes are included in this PR?
CometNativeScan.scala: serializescan.supportedDataFilterswhenever Spark'sspark.sql.parquet.filterPushdownis enabled. The Comet-specific second gate is gone; when Spark's optimizer didn't push filters,supportedDataFiltersis empty and the loop is a no-op.parquet_exec.rs: drop the dead hardcodedpushdown_filters = trueandreorder_filters = true. Both now default to DataFusion's defaults.CometConf.scala: removeCOMET_RESPECT_PARQUET_FILTER_PUSHDOWN. AddCOMET_PARQUET_ROW_FILTER_PUSHDOWN_ENABLED(spark.comet.parquet.rowFilterPushdown.enabled), defaultfalse. This single flag controls row-level RowFilter evaluation and reorder.jni_api.rs/spark_config.rs: translate the new Comet flag to the equivalent DataFusion session options when set.CometExecIterator.scala: always emit the resolved value of the new Comet flag into the JNI config map.cometSqlConfsonly carries values fromSQLConf.getAllConfs, which excludes defaults — so Comet configs that need to reach native must be written here explicitly.CometTestBase.scala: drop the line that set the removed config.dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.2}.diff: regenerated against their tags. TheSharedSparkSession.scalapatch now sets the new flag totrue, preserving pre-PR behavior for the Spark SQL test suite.Result by config
filterPushdownrowFilterPushdown.enabledThe first row is the change in default behavior. Format-level pruning was previously unreachable in the default config and is now active. Row-level evaluation remains opt-in.
How are these changes tested?
Existing tests. Also ran TPC-DS SF 1000: