Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, Count, CountStar, Max, Min, Sum}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownJoin, SupportsPushDownVariantExtractions, V1Scan, VariantExtraction}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, VariantInRelation, VariantMetadata}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownJoin, SupportsPushDownRequiredColumns, SupportsPushDownVariantExtractions, V1Scan, VariantExtraction}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, RequestedVariantField, VariantInRelation, VariantMetadata}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.VariantExtractionImpl
import org.apache.spark.sql.sources
Expand Down Expand Up @@ -358,11 +358,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
case agg: Aggregate => rewriteAggregate(agg)
}

def pushDownVariants(plan: LogicalPlan): LogicalPlan = plan.transformDown {
case p@PhysicalOperation(projectList, filters, sHolder @ ScanBuilderHolder(_, _,
builder: SupportsPushDownVariantExtractions))
if conf.getConf(org.apache.spark.sql.internal.SQLConf.PUSH_VARIANT_INTO_SCAN) =>
pushVariantExtractions(p, projectList, filters, sHolder, builder)
// Two-visit protocol explanation:
// `transformDown` is pre-order and always recurses into children, because
// `pushVariantExtractions` returns `originalPlan` unchanged (same object reference).
// This means a tree like Project(pl, ScanBuilderHolder) gets visited twice:
// (1) Outer visit: PhysicalOperation collapses Project->ScanBuilderHolder and
// yields the real (projectList, filters, sHolder). This is the authoritative
// visit where column pruning decisions are made.
// (2) Inner leaf visit: PhysicalOperation on a bare ScanBuilderHolder LeafNode
// yields (sHolder.output, Nil, sHolder) -- projectList = full schema output.
// Without a guard, this would add fullVariant for *every* variant column.
//
// The `pushedVariants.isEmpty` guard prevents the inner visit from re-running once
// the outer visit has completed (successfully or not). An empty-mapping sentinel
// `Some(new VariantInRelation())` is written by the outer visit when there is
// nothing to push; `buildScanWithPushedVariants` requires `mapping.nonEmpty` to
// fire, so the sentinel correctly bypasses it while still suppressing the inner visit.
def pushDownVariants(plan: LogicalPlan): LogicalPlan = {
plan.transformDown {
case p@PhysicalOperation(projectList, filters, sHolder @ ScanBuilderHolder(_, _,
builder: SupportsPushDownVariantExtractions))
if conf.getConf(org.apache.spark.sql.internal.SQLConf.PUSH_VARIANT_INTO_SCAN) &&
sHolder.pushedVariants.isEmpty =>
pushVariantExtractions(p, projectList, filters, sHolder, builder)
}
}

/**
Expand Down Expand Up @@ -418,14 +437,61 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
}
if (variants.mapping.isEmpty) return originalPlan
if (variants.mapping.isEmpty) {
// No variant columns in the schema. Mark as attempted so the guard in
// pushDownVariants prevents a spurious second visit on the leaf node.
sHolder.pushedVariants = Some(new VariantInRelation())
return originalPlan
}

// Collect requested fields from project list and filters
projectList.foreach(variants.collectRequestedFields)
filters.foreach(variants.collectRequestedFields)

// If no variant columns remain after collection, return original plan
if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
// Variant extraction pushdown is for extractions, not for whole-variant reads. A bare variant
// reference (e.g. `SELECT v`, or a column lifted into the local Project to feed a `variant_get`
// that sits above a Join/Sort/Aggregate barrier this local rewrite cannot see) makes
// `collectRequestedFields` record `RequestedVariantField.fullVariant` -- path `$`, target
// VariantType -- meaning "produce the entire value." That is not a real extraction:
// - No I/O benefit: the whole variant is read regardless, so shredding it to a single
// full-variant slot never reads fewer bytes than leaving the column raw.
// - It is the case readers mishandle: a lone full-variant slot collapses to a boolean
// placeholder (see ParquetScan.rewriteVariantPushdownSchema), and above a barrier the
// re-exposed `GetStructField(v_new, i) AS v#orig` alias can be dropped by
// RemoveRedundantAliases, yielding wrong results or an invalid plan.
// So when a variant is read ONLY as a whole -- its sole requested field is fullVariant -- leave
// it raw and let normal column pruning read it. Scoped to the sole-field case on purpose:
// - It is exactly the broken shape (a lone full-variant slot is what collapses / re-exposes).
// - When fullVariant coexists with real extractions on the same variant (e.g.
// `SELECT v, variant_get(v, '$.a')`), the shredded struct has >= 2 slots, so the reader
// does not collapse it; keeping it shredded preserves the typed-path pushdown.
// This also keeps the rule correct for free as barrier-aware pushdown lands later: once a
// `variant_get` above a Join/Sort/Aggregate becomes visible to the rewrite it is collected as a
// real typed path (not fullVariant), so it is no longer sole-fullVariant and the column shreds
// automatically -- this guard simply steps aside. Runs before the IsNull/IsNotNull injection
// below so presence-only references, which legitimately shred to a tiny placeholder, are
// unaffected.
variants.mapping.values.foreach { pathToFields =>
pathToFields.filterInPlace { case (_, fields) =>
!(fields.size == 1 && fields.contains(RequestedVariantField.fullVariant))
}
}
variants.mapping.filterInPlace { case (_, pathToFields) => pathToFields.nonEmpty }

// If a variant column is referenced only via IsNull/IsNotNull (e.g. WHERE isnotnull(v)),
// collectRequestedFields adds nothing to its field map (the IsNull/IsNotNull branch is a
// no-op). Inject a fullVariant entry for each such column so rewriteType generates the
// placeholder struct and the extraction is pushed on this (outer) visit. Without this,
// the inner leaf visit would fire with sHolder.output as its projectList, causing all
// variant columns -- including unreferenced siblings -- to receive fullVariant treatment.
val referencedAttrs = AttributeSet((projectList ++ filters).flatMap(_.references))
for (a <- sHolder.relation.output) {
if (variants.mapping.contains(a.exprId) &&
variants.mapping(a.exprId).values.forall(_.isEmpty) &&
referencedAttrs.contains(a)) {
variants.collectRequestedFields(a)
}
}

// Build individual VariantExtraction for each field access
// Track which extraction corresponds to which (attr, field, ordinal)
Expand Down Expand Up @@ -475,8 +541,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
}
}

// Call the API to push down variant extractions
if (extractionInfo.isEmpty) return originalPlan
// No extraction was requested for any variant column. Set the sentinel to suppress
// the inner leaf visit and avoid a spurious empty push.
if (extractionInfo.isEmpty) {
sHolder.pushedVariants = Some(new VariantInRelation())
return originalPlan
}

// Companion extractions can only be honored by readers that support cast-error deferral. If
// none were generated, the pushdown carries only non-strict accesses (`try_variant_get`, plain
Expand All @@ -488,7 +558,10 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {

// Filter to only the accepted extractions
val acceptedExtractions = extractionInfo.zip(pushedResults).filter(_._2).map(_._1)
if (acceptedExtractions.isEmpty) return originalPlan
if (acceptedExtractions.isEmpty) {
sHolder.pushedVariants = Some(new VariantInRelation())
return originalPlan
}

// Group accepted extractions by attribute to rebuild the struct schemas
val extractionsByAttr = acceptedExtractions.groupBy(_._2)
Expand All @@ -513,6 +586,31 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
sHolder.pushedVariantAttributeMap = attributeMap
sHolder.output = newOutput

// Commit the required top-level columns to the builder now, while projectList and
// filters are in scope. This mirrors how rewriteAggregate sets holder.output to only
// the pushed-down aggregate columns before build() -- both patterns ensure that
// buildScanWithPushed* can call build() and zip holder.output with readSchema() directly.
//
// projectList/filters reference original ExprIds (this function returns originalPlan
// unchanged). holder.relation.output carries those original ExprIds, so AttributeSet
// matching works directly against it.
//
// attributeMap maps old ExprId -> new AttributeReference (with a fresh ExprId for rewritten
// variant columns). Invert it to map new ExprId -> old ExprId so we can filter holder.output
// (which carries new ExprIds) against requiredColumns (which carries old ExprIds).
sHolder.builder match {
case r: SupportsPushDownRequiredColumns =>
val requiredColumns = AttributeSet((projectList ++ filters).flatMap(_.references))
val neededRelOutput = sHolder.relation.output.filter(requiredColumns.contains)
val newToOldExprId = attributeMap.map { case (oldId, newAttr) => newAttr.exprId -> oldId }
val oldExprIdToRelAttr = sHolder.relation.output.map(a => a.exprId -> a).toMap
sHolder.output = sHolder.output.filter { a =>
oldExprIdToRelAttr.get(newToOldExprId(a.exprId)).exists(requiredColumns.contains)
}
r.pruneColumns(neededRelOutput.toStructType)
case _ => // builder does not support column pruning; holder.output stays full-schema
}

// Return the original plan unchanged - transformation happens in buildScanWithPushedVariants
originalPlan
}
Expand Down Expand Up @@ -794,13 +892,17 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {

def buildScanWithPushedVariants(plan: LogicalPlan): LogicalPlan = plan.transform {
case p@PhysicalOperation(projectList, filters, holder: ScanBuilderHolder)
if holder.pushedVariants.isDefined =>
if holder.pushedVariants.exists(_.mapping.nonEmpty) =>
val variants = holder.pushedVariants.get
val attributeMap = holder.pushedVariantAttributeMap

// Build the scan
val scan = holder.builder.build()
val realOutput = toAttributes(scan.readSchema())
assert(realOutput.length == holder.output.length,
s"The data source returns ${realOutput.length} columns but the plan expected " +
s"${holder.output.length}: scan=[${realOutput.map(_.name).mkString(",")}], " +
s"plan=[${holder.output.map(_.name).mkString(",")}]")
val wrappedScan = getWrappedScan(scan, holder)
// Note: holder.pushedFilterExpressions is not propagated here because the output schema
// changes with variant extraction. When validConstraints is wired up, this needs revisiting.
Expand Down
Loading