Skip to content

perf(parquet): revise filter pushdown configuration#4722

Merged
mbutrovich merged 9 commits into
apache:mainfrom
mbutrovich:pushdown_filters
Jun 25, 2026
Merged

perf(parquet): revise filter pushdown configuration#4722
mbutrovich merged 9 commits into
apache:mainfrom
mbutrovich:pushdown_filters

Conversation

@mbutrovich

@mbutrovich mbutrovich commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

While investigating TPC-DS performance regressions on DF 54.0, I noticed that filters Spark's optimizer pushes to a Comet native scan never reach DataFusion's ParquetSource under the default config, even though the filters appear in EXPLAIN/Spark UI (note that they match the associated CometFilter expressions):

(1) CometNativeScan parquet spark_catalog.default.store_sales
Output [5]: [ss_quantity#569, ss_wholesale_cost#570, ss_list_price#571, ss_coupon_amt#578, ss_sold_date_sk#582]
Batched: true
Location: InMemoryFileIndex [s3a://.../tpcds-parquet/.../store_sales]
PushedFilters: [IsNotNull(ss_quantity), GreaterThanOrEqual(ss_quantity,0), LessThanOrEqual(ss_quantity,5), Or(Or(And(GreaterThanOrEqual(ss_list_price,8.00),LessThanOrEqual(ss_list_price,18.00)),And(GreaterThanOrEqual(ss_coupon_amt,459.00),LessThanOrEqual(ss_coupon_amt,1459.00))),And(GreaterThanOrEqual(ss_wholesale_cost,57.00),LessThanOrEqual(ss_wholesale_cost,77.00)))]
ReadSchema: struct<ss_quantity:int,ss_wholesale_cost:decimal(7,2),ss_list_price:decimal(7,2),ss_coupon_amt:decimal(7,2)>

(2) CometFilter
Input [5]: [ss_quantity#569, ss_wholesale_cost#570, ss_list_price#571, ss_coupon_amt#578, ss_sold_date_sk#582]
Condition : (((isnotnull(ss_quantity#569) AND (ss_quantity#569 >= 0)) AND (ss_quantity#569 <= 5)) AND ((((ss_list_price#571 >= 8.00) AND (ss_list_price#571 <= 18.00)) OR ((ss_coupon_amt#578 >= 459.00) AND (ss_coupon_amt#578 <= 1459.00))) OR ((ss_wholesale_cost#570 >= 57.00) AND (ss_wholesale_cost#570 <= 77.00))))

In the Spark UI, the CometFilter output is far smaller than the scan output, work that should have happened at scan time:

Screenshot 2026-06-24 at 2 54 23 PM

That's a lot of I/O that likely didn't need to happen.

A note on terminology: Spark and most engines use "filter pushdown" to mean evaluating filters at the scan against Parquet metadata (row group statistics, page index, bloom filters) so whole row groups or pages can be skipped without decoding. DataFusion's pushdown_filters config means something narrower: in addition to metadata-based pruning, the reader evaluates the predicate on filter columns first, builds a selection mask, and lazily materializes the remaining projected columns only for surviving rows. Format-level pruning runs whenever a predicate is attached to the source; pushdown_filters only controls the row-level evaluation and late-materialization layer.

CometNativeScan gated serialization of scan.supportedDataFilters on spark.comet.parquet.respectFilterPushdown (added in #1936), defaulting to false. With the default, the protobuf crossed JNI with an empty data_filters list, the native side built ParquetSource with no predicate, and as a result:

  • Row-group statistics filtering did not run.
  • Parquet page index filtering did not run.
  • Bloom filter pruning did not run.
  • Row-level RowFilter evaluation had nothing to evaluate.
  • CometFilter above the scan did the full row-level reduction on the decoded Arrow batches.

The hardcoded table_parquet_options.global.pushdown_filters = true in parquet_exec.rs was therefore dead in the default config: enabling row-level eval has no effect when no predicate is attached. The PushedFilters: [...] and DataFilters: [...] in the Spark plan come from CometScanExec's fields populated by Spark's optimizer and reflect what Spark planned, not what crossed JNI.

This PR makes the first three (format-level pruning) work by default whenever Spark's spark.sql.parquet.filterPushdown is on. Row-level RowFilter evaluation is the only piece that remains opt-in, gated by the new spark.comet.parquet.rowFilterPushdown.enabled flag (default false).

What changes are included in this PR?

  • CometNativeScan.scala: serialize scan.supportedDataFilters whenever Spark's spark.sql.parquet.filterPushdown is enabled. The Comet-specific second gate is gone; when Spark's optimizer didn't push filters, supportedDataFilters is empty and the loop is a no-op.
  • parquet_exec.rs: drop the dead hardcoded pushdown_filters = true and reorder_filters = true. Both now default to DataFusion's defaults.
  • CometConf.scala: remove COMET_RESPECT_PARQUET_FILTER_PUSHDOWN. Add COMET_PARQUET_ROW_FILTER_PUSHDOWN_ENABLED (spark.comet.parquet.rowFilterPushdown.enabled), default false. This single flag controls row-level RowFilter evaluation and reorder.
  • jni_api.rs / spark_config.rs: translate the new Comet flag to the equivalent DataFusion session options when set.
  • CometExecIterator.scala: always emit the resolved value of the new Comet flag into the JNI config map. cometSqlConfs only carries values from SQLConf.getAllConfs, which excludes defaults — so Comet configs that need to reach native must be written here explicitly.
  • CometTestBase.scala: drop the line that set the removed config.
  • dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.2}.diff: regenerated against their tags. The SharedSparkSession.scala patch now sets the new flag to true, preserving pre-PR behavior for the Spark SQL test suite.

Result by config

Spark filterPushdown Comet rowFilterPushdown.enabled Predicate on scan Format-level pruning Row-level RowFilter eval
true (default) false (default) yes yes no
true true yes yes yes
false any no no no

The first row is the change in default behavior. Format-level pruning was previously unreachable in the default config and is now active. Row-level evaluation remains opt-in.

How are these changes tested?

Existing tests. Also ran TPC-DS SF 1000:

tpcds_queries_compare tpcds_allqueries

@mbutrovich mbutrovich marked this pull request as draft June 24, 2026 20:55
@mbutrovich mbutrovich changed the title perf: serialize data filters into CometNativeScan, remove COMET_RESPECT_PARQUET_FILTER_PUSHDOWN perf(parquet): serialize data filters; remove respectFilterPushdown Jun 24, 2026
@mbutrovich mbutrovich changed the title perf(parquet): serialize data filters; remove respectFilterPushdown perf: serialize data filters into CometNativeScan, remove COMET_RESPECT_PARQUET_FILTER_PUSHDOWN Jun 24, 2026
@mbutrovich mbutrovich marked this pull request as ready for review June 24, 2026 22:28
@mbutrovich mbutrovich changed the title perf: serialize data filters into CometNativeScan, remove COMET_RESPECT_PARQUET_FILTER_PUSHDOWN perf(parquet): revise filter pushdown configuration Jun 25, 2026

@andygrove andygrove left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @mbutrovich

@mbutrovich mbutrovich merged commit e5821f8 into apache:main Jun 25, 2026
71 of 72 checks passed
@mbutrovich mbutrovich deleted the pushdown_filters branch June 25, 2026 15:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants