Skip to content

[SPARK-56125][SQL] Refactor MERGE INTO schema evolution to use assignment-based approach#55302

Closed
szehon-ho wants to merge 3 commits into
apache:masterfrom
szehon-ho:refactor-merge-schema-evolution
Closed

[SPARK-56125][SQL] Refactor MERGE INTO schema evolution to use assignment-based approach#55302
szehon-ho wants to merge 3 commits into
apache:masterfrom
szehon-ho:refactor-merge-schema-evolution

Conversation

@szehon-ho
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Refactors MERGE INTO schema evolution to compute pending schema changes directly from assignments instead of building an intermediate pruned source schema.

Before: pendingSchemaChanges called sourceSchemaForSchemaEvolution to build a pruned version of the source schema containing only columns referenced in assignments, then compared this pruned schema against the full target schema via computeSupportedSchemaChanges.

After: pendingSchemaChanges iterates over individual assignments that are schema evolution candidates and computes changes per-assignment:

  • If the assignment key is unresolved (column missing from target), emit an addColumn change.
  • If the key and value types differ, recursively compute schema changes on the types (handles nested struct additions and type widening).
  • If types match, no change is needed.

Additionally:

  • Extracted filterSupportedChanges from computeSupportedSchemaChanges in ResolveSchemaEvolution so MERGE can reuse the filtering logic independently.
  • Made computeSchemaChanges private[catalyst] so it can be called from MergeIntoTable.
  • Replaced originalTarget/originalSource parameters threaded through the recursive schema comparison with an onError callback, since these schemas were only used to construct the error message.
  • Removed sourceSchemaForSchemaEvolution and its helpers (isEqual, isPrefix, isSameColumnAssignment).

Why are the changes needed?

The previous approach was indirect: it first constructed a pruned source schema, then compared it against the target schema to find differences. The new approach is more direct and easier to follow - it examines each assignment individually to determine what schema changes are needed. This also simplifies ResolveSchemaEvolution by removing the originalTarget/originalSource parameters that were only used for error reporting.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests cover MERGE INTO schema evolution behavior. This is a refactor with no behavioral change.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor (Claude Opus 4)

@szehon-ho
Copy link
Copy Markdown
Member Author

szehon-ho commented Apr 11, 2026

this is from this suggestion : #55124 (comment) from @aokolnychyi

@szehon-ho
Copy link
Copy Markdown
Member Author

FYI @aokolnychyi , applied offline review comments

filterSupportedChanges(targetTable, candidateChanges)
}

private[sql] def throwIncompatibleSchemasError(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reconsider this suggestion. Let's remove this helper and instead add this one:

// a new helper method
private[catalyst] def computeSchemaChanges(
    targetType: StructType,
    sourceType: StructType,
    isByName: Boolean): Seq[TableChange] = {
  computeSchemaChanges(
    targetType,
    sourceType,
    fieldPath = Nil,
    isByName,
    throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(targetType, sourceType))
}

Then use it as the following:

def computeSupportedSchemaChanges(
    targetTable: LogicalPlan,
    sourceSchema: StructType,
    isByName: Boolean): Seq[TableChange] = {
  val candidateChanges = computeSchemaChanges(
    targetTable.schema,
    sourceSchema,
    isByName)
  filterSupportedChanges(targetTable, candidateChanges)
}

Copy link
Copy Markdown
Contributor

@aokolnychyi aokolnychyi Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep throwIncompatibleSchemasError helper in MergeIntoTable but call QueryExecutionErrors.

}
}

/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the comments and docs?

merge: MergeIntoTable): Seq[Assignment] = {
val actions = merge.matchedActions ++ merge.notMatchedActions
val assignments = actions.collect {
val assignments = actions.flatMap {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this impacted by fix in #55340?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think not, that was changing the schemaEvolutionReady gate (prior to this logic)

This logic (schemaEvolutionTriggeringAssignments) is to decide what schema to modify

case UnresolvedAttribute(nameParts) => nameParts
case a: AttributeReference => Seq(a.name)
case Alias(child, _) => extractFieldPath(child, allowUnresolved)
case Alias(child, _) => extractFieldPath(child)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got dropped in the master branch?

}
}

// Guard that assignments are either resolved or candidates for evolution before
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here. Seems like this is accidental?

…volution

- Replace Array[TableChange] with Seq[TableChange] throughout ResolveSchemaEvolution
- Replace onError: () => Nothing with throwError: => Nothing (by-name parameter)
- Extract throwIncompatibleSchemasError helper in ResolveSchemaEvolution (private[sql])
- Add a.key.resolved guard in type-mismatch case of computePendingSchemaChanges
- Add default value for e parameter in failedToMergeIncompatibleSchemasError
- Split combined test assertion into two separate checks for order independence
- Replace throwIncompatibleSchemasError helper in ResolveSchemaEvolution with
  a new computeSchemaChanges(StructType, StructType, Boolean) overload
- MergeIntoTable.throwIncompatibleSchemasError calls QueryExecutionErrors directly
- Restore scaladocs and inline comments removed in first commit
- Restore allowUnresolved parameter on extractFieldPath, aligning with master
- Restore comment on schemaEvolutionReady explaining assignment resolution guard
@szehon-ho szehon-ho force-pushed the refactor-merge-schema-evolution branch from 338b25f to af3866b Compare April 21, 2026 09:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants