[SPARK-56125][SQL] Refactor MERGE INTO schema evolution to use assignment-based approach#55302
[SPARK-56125][SQL] Refactor MERGE INTO schema evolution to use assignment-based approach#55302szehon-ho wants to merge 3 commits into
Conversation
e15aa18 to
b62338c
Compare
|
this is from this suggestion : #55124 (comment) from @aokolnychyi |
|
FYI @aokolnychyi , applied offline review comments |
| filterSupportedChanges(targetTable, candidateChanges) | ||
| } | ||
|
|
||
| private[sql] def throwIncompatibleSchemasError( |
There was a problem hiding this comment.
I reconsider this suggestion. Let's remove this helper and instead add this one:
// a new helper method
private[catalyst] def computeSchemaChanges(
targetType: StructType,
sourceType: StructType,
isByName: Boolean): Seq[TableChange] = {
computeSchemaChanges(
targetType,
sourceType,
fieldPath = Nil,
isByName,
throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(targetType, sourceType))
}
Then use it as the following:
def computeSupportedSchemaChanges(
targetTable: LogicalPlan,
sourceSchema: StructType,
isByName: Boolean): Seq[TableChange] = {
val candidateChanges = computeSchemaChanges(
targetTable.schema,
sourceSchema,
isByName)
filterSupportedChanges(targetTable, candidateChanges)
}
There was a problem hiding this comment.
Keep throwIncompatibleSchemasError helper in MergeIntoTable but call QueryExecutionErrors.
| } | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Why remove the comments and docs?
| merge: MergeIntoTable): Seq[Assignment] = { | ||
| val actions = merge.matchedActions ++ merge.notMatchedActions | ||
| val assignments = actions.collect { | ||
| val assignments = actions.flatMap { |
There was a problem hiding this comment.
i think not, that was changing the schemaEvolutionReady gate (prior to this logic)
This logic (schemaEvolutionTriggeringAssignments) is to decide what schema to modify
| case UnresolvedAttribute(nameParts) => nameParts | ||
| case a: AttributeReference => Seq(a.name) | ||
| case Alias(child, _) => extractFieldPath(child, allowUnresolved) | ||
| case Alias(child, _) => extractFieldPath(child) |
There was a problem hiding this comment.
This got dropped in the master branch?
| } | ||
| } | ||
|
|
||
| // Guard that assignments are either resolved or candidates for evolution before |
There was a problem hiding this comment.
Same question here. Seems like this is accidental?
…ment-based approach
…volution - Replace Array[TableChange] with Seq[TableChange] throughout ResolveSchemaEvolution - Replace onError: () => Nothing with throwError: => Nothing (by-name parameter) - Extract throwIncompatibleSchemasError helper in ResolveSchemaEvolution (private[sql]) - Add a.key.resolved guard in type-mismatch case of computePendingSchemaChanges - Add default value for e parameter in failedToMergeIncompatibleSchemasError - Split combined test assertion into two separate checks for order independence
- Replace throwIncompatibleSchemasError helper in ResolveSchemaEvolution with a new computeSchemaChanges(StructType, StructType, Boolean) overload - MergeIntoTable.throwIncompatibleSchemasError calls QueryExecutionErrors directly - Restore scaladocs and inline comments removed in first commit - Restore allowUnresolved parameter on extractFieldPath, aligning with master - Restore comment on schemaEvolutionReady explaining assignment resolution guard
338b25f to
af3866b
Compare
What changes were proposed in this pull request?
Refactors MERGE INTO schema evolution to compute pending schema changes directly from assignments instead of building an intermediate pruned source schema.
Before:
pendingSchemaChangescalledsourceSchemaForSchemaEvolutionto build a pruned version of the source schema containing only columns referenced in assignments, then compared this pruned schema against the full target schema viacomputeSupportedSchemaChanges.After:
pendingSchemaChangesiterates over individual assignments that are schema evolution candidates and computes changes per-assignment:addColumnchange.Additionally:
filterSupportedChangesfromcomputeSupportedSchemaChangesinResolveSchemaEvolutionso MERGE can reuse the filtering logic independently.computeSchemaChangesprivate[catalyst]so it can be called fromMergeIntoTable.originalTarget/originalSourceparameters threaded through the recursive schema comparison with anonErrorcallback, since these schemas were only used to construct the error message.sourceSchemaForSchemaEvolutionand its helpers (isEqual,isPrefix,isSameColumnAssignment).Why are the changes needed?
The previous approach was indirect: it first constructed a pruned source schema, then compared it against the target schema to find differences. The new approach is more direct and easier to follow - it examines each assignment individually to determine what schema changes are needed. This also simplifies
ResolveSchemaEvolutionby removing theoriginalTarget/originalSourceparameters that were only used for error reporting.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests cover MERGE INTO schema evolution behavior. This is a refactor with no behavioral change.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4)