[SQL] Update grammar/TableChange to allow CLUSTER BY expressions#55327
[SQL] Update grammar/TableChange to allow CLUSTER BY expressions#55327itsbilal wants to merge 1 commit into
Conversation
This change updates the parser grammar and the TableChange API to allow representing tables that have CLUSTER BY clauses on expressions (eg. `CLUSTER BY (variant_get(col, ...))` instead of just CLUSTER BY `(col)`. The goal is to make this TableChange backward-compatible with connectors that only support clustering by raw columns, while allowing connectors to opt-into supporting some transforms if they are present.
d98079c to
ba9ee66
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
This PR adds CLUSTER BY (expression) support but invents a parallel mechanism instead of reusing or following the existing v2 transform infrastructure that already handles this shape for PARTITIONED BY. The result has three structural problems that I think warrant a redesign before this goes further — the rest of the findings below all fall out of these. I've put them as questions so we can discuss the direction; happy to be wrong on any of them.
1. Why not follow the PARTITIONED BY transform pattern?
PARTITIONED BY (bucket(4, c1, c2)) and PARTITIONED BY (years(ts)) already work and have done for years. Everything you need to support CLUSTER BY (upper(c)) is already in the codebase under that path:
PARTITIONED BY (today) |
This PR | |
|---|---|---|
| Grammar | applyTransform: identifier '(' transformArgument,... ')' (g4:1251) — only name(col_or_lit,...) |
general expression (g4:549) — accepts 1+1, case … end, …, validated post-parse with IllegalStateException |
| Args | transformArgument: qualifiedName | constant → V2 FieldReference/LiteralValue directly (AstBuilder.scala:5130) |
Catalyst UnresolvedFunction/UnresolvedAttribute/Literal then manual conversion in ClusterBySpec.fromExpressions |
Generic-call Transform |
ApplyTransform (case class, proper equals/hashCode) |
new ClusteringColumnTransform (final class, hand-rolled asymmetric equals at expressions.scala:242) plus new ClusterByColumnTransform descriptor whose argumentIndex is dead state (fromExpressions forces col to position 0) |
| Column position / cardinality | bucket(4, c1, c2) — column at any position, multiple columns OK |
column must be arguments().head and there must be exactly one (interface.scala:392-407); restrictions undocumented |
| v2 wire shape | one Transform per element in table.partitioning: Array[Transform] |
ClusterByTransform.columnNames + a side-channel transforms: Seq[ClusterByColumnTransform] field; plus TableChange.clusterBy grows a parallel Optional<Transform>[] parameter |
| DESCRIBE display | full transform.describe() |
only column names — transform invisible |
The natural shape is cluster_by(c1, upper(c2), bucket(4, c3)) — one Transform, with the per-column wrappers as arguments (like SortedBucketTransform carries multiple typed args today). Then describe(), toString, arguments() all do the right thing for free; DESCRIBE TABLE and SHOW CREATE TABLE round-trip; connectors see one consistent Transform shape; and the new ClusterByColumnTransform/ClusteringColumnTransform classes can be deleted in favor of ApplyTransform. If a hard "exactly one column per arg" constraint is wanted, that can be a require on the new ClusterByTransform.arguments, not a separate type system.
2. The v2 API is now receiving SQL-syntax strings.
Three spots:
ClusteringColumnTransform.name = QuotingUtils.quoted(u.nameParts.toArray)(interface.scala:339and:380) — for multi-part function names, the v2Transform.name()is now a backtick-quoted SQL identifier string (`db`.`myfunc`). Every existing transform'snameis an unqualified plain string ("bucket","years",ApplyTransform.name = applyCtx.identifier.getTextatAstBuilder.scala:5091— theapplyTransformgrammar only allows a singleidentifier). Connectors comparingtransform.name()to a plain function name will silently miss namespaced cases here.FieldReference(QuotingUtils.quoted(a.nameParts.toArray))(interface.scala:341-342and:382-383) — has structuredSeq[String]parts in hand, stringifies with backtick quoting, thenFieldReference(String)re-parses viaCatalystSqlParser.parseMultipartIdentifier. Should just beFieldReference(a.nameParts)(theSeq[String]apply that exists already).- The
Optional<Transform>[]parameter onTableChange.clusterByis fine as a Java type, but it carries Transforms whose name/args are themselves stringy per the points above.
3. Catalog property storage round-trips through the SQL parser.
ClusterBySpec.toColumnNames (interface.scala:289-307) writes a transform as a SQL function-call string and embeds it in the existing Seq[Seq[String]] JSON shape. fromColumnEntries (interface.scala:331-368) calls CatalystSqlParser.parseExpression on each single-element entry to recover the transform.
Two concrete consequences:
- Format ambiguity: a column whose name happens to parse as a function call — e.g. backtick-quoted
CLUSTER BY (`lower(col)`)— is stored as[["lower(col)"]](indistinguishable from a transform entry) and read back as aClusteringColumnTransform. Existing tables with such names silently regress. - Parser coupling for on-disk catalog data: round-trip stability depends on
Literal(value, dataType).sql↔ the SQL parser.TimestampType.sqlemitsTIMESTAMP 'literal', which the parser turns into aCastof a string, not aLiteral— theClusterBySpecSuiteround-trip tests only pass because they pin the session timezone. A future Literal type or any SQL-syntax change could break stored metadata.
Use a structured format: e.g. each entry is {"col": [...]} for plain columns or {"col": [...], "transform": {"name": ..., "args": [...]}} for transforms, all encoded with the same Jackson mapper already in use. No re-parsing, no ambiguity, no parser coupling.
Other findings (mostly fall out of the above)
ClusterByTransform.unapply(expressions.scala:215) still returns onlyOption[Seq[NamedReference]]— everycase ClusterByTransform(columnNames)extractor silently drops the new state, e.g.InMemoryBaseTable.scala:297andTransformExtractorSuite.scala:225/:233. Goes away oncetransformsis folded intoarguments.TableChange.clusterBy(NamedReference[])is replaced rather than overloaded — source-incompatible for external connectors that construct this change. PR description says the change is backward-compatible; please add the single-arg overload.normalizeClusterBySpec(interface.scala:469) normalizescolumnNamesbut not theFieldReferenceinside each transform's arguments. After normalization the two can disagree in case/path.CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE(error-conditions.json:728+interface.scala:392-407) is thrown for three different conditions (no col, >1 col, col not at position 0) with a single message that mentions only two. Position-0 constraint is undocumented.CatalogV2Util.clusterBySpecFromChangedoesn't asserttransforms.length == clusteringColumns.length— a misaligned connector input silently produces a corrupted spec.fromExpressionsaccepts anyUnresolvedFunction, silently droppingisDistinct/window markers —CLUSTER BY (count(DISTINCT x))parses cleanly.- No end-to-end DDL test.
validateClusterBy(...expectedTransforms)is implemented in all four v1/v2 Create/Alter base suites but never called from any test. The only coverage of the new path isClusterBySpecSuiteexercising helpers directly.grep -rE "CLUSTER BY \([a-z_]+\(" sql/returned no matches. - PR title is missing
[SPARK-xxxx]. Please file a JIRA and update the title to[SPARK-xxxxx][SQL] ....
| expressionOrMultipartIdentifier | ||
| : expression | ||
| | multipartIdentifier | ||
| ; |
There was a problem hiding this comment.
Consider reusing the existing transform rule from line 1251 (used by PARTITIONED BY) instead of inventing expressionOrMultipartIdentifier. The general expression rule accepts arbitrary inputs (e.g. 1+1, CASE … END) that then fail post-parse with a generic IllegalStateException from fromExpressions; the dedicated applyTransform: identifier '(' transformArgument,... ')' rule rejects those at parse time with a targeted error. See the body summary, pillar 1.
| case class ClusterByColumnTransform( | ||
| columnIndex: Int, | ||
| argumentIndex: Int, | ||
| function: String, | ||
| arguments: Seq[LiteralValue[_]]) |
There was a problem hiding this comment.
Both ClusterByColumnTransform (this abstract descriptor) and ClusteringColumnTransform (the concrete Transform added further below) can be removed if you reuse ApplyTransform and store per-column transforms as the arguments of ClusterByTransform itself — cluster_by(c1, upper(c2), bucket(4, c3)). That gives correct describe() / toString / arguments() for free, fixes the DESCRIBE / SHOW CREATE TABLE round-trip, and matches PARTITIONED BY's wire shape. The argumentIndex field here is also dead state today because fromExpressions forces the column reference to position 0. See body summary, pillar 1.
| val transform: Transform = new ClusteringColumnTransform( | ||
| QuotingUtils.quoted(u.nameParts.toArray), | ||
| u.children.map { | ||
| case a: UnresolvedAttribute => | ||
| FieldReference(QuotingUtils.quoted(a.nameParts.toArray)) | ||
| case l: Literal => LiteralValue(l.value, l.dataType) | ||
| case other => throw new IllegalStateException( | ||
| s"Unexpected argument type in CLUSTER BY expression: ${other.getClass}") | ||
| }.toArray) |
There was a problem hiding this comment.
Two SQL-string round-trips into the v2 API on these lines (and the parallel block at :379-387):
name = QuotingUtils.quoted(u.nameParts.toArray)puts a backtick-quoted SQL identifier string intoTransform.name()for multi-part function names. Every existing v2 transform'snameis an unqualified plain string ("bucket","years",ApplyTransform.name = applyCtx.identifier.getTextatAstBuilder.scala:5091). Connectors comparingtransform.name()against a plain function name will silently miss namespaced cases.FieldReference(QuotingUtils.quoted(a.nameParts.toArray))has structuredSeq[String]parts in hand, stringifies them with backtick quoting, thenFieldReference(String)re-parses viaCatalystSqlParser.parseMultipartIdentifier. Should beFieldReference(a.nameParts)(theSeq[String]apply that exists already).
See body summary, pillar 2.
| def toColumnNames: String = { | ||
| val entries: Seq[Seq[String]] = if (clusteringColumnTransforms.isEmpty) { | ||
| columnNames.map(_.fieldNames().toSeq) | ||
| } else { | ||
| columnNames.zip(clusteringColumnTransforms).map { | ||
| case (colName, None) => colName.fieldNames().toSeq | ||
| case (colName, Some(transform)) => | ||
| val args = transform.arguments().map { | ||
| case n: NamedReference => n.fieldNames().map(QuotingUtils.quoteIfNeeded).mkString(".") | ||
| case LiteralValue(value, dataType) => | ||
| Literal(value, dataType).sql | ||
| case other => throw new IllegalStateException( | ||
| s"Unexpected argument type in CLUSTER BY expression: ${other.getClass}") | ||
| } | ||
| Seq(s"${QuotingUtils.quoteIfNeeded(transform.name())}(${args.mkString(",")})") | ||
| } | ||
| } | ||
| ClusterBySpec.mapper.writeValueAsString(entries) | ||
| } |
There was a problem hiding this comment.
This stringifies a v2 Transform into a SQL function-call expression and embeds it in the existing Seq[Seq[String]] JSON shape; fromColumnEntries at :331 then calls CatalystSqlParser.parseExpression on each single-element entry to recover the transform. Two concrete consequences:
- A column whose name parses as a function call — e.g. backtick-quoted
CLUSTER BY (`lower(col)`)— is stored as[["lower(col)"]](indistinguishable from a transform entry) and read back as aClusteringColumnTransform. Silent regression on existing tables with such names. - Round-trip stability depends on
Literal(...).sql↔ the SQL parser.TimestampType.sqlemitsTIMESTAMP 'literal', which the parser turns into aCastof a string, not aLiteral— theClusterBySpecSuitetimestamp tests pass only because they pin the session timezone.
Please use a structured format (e.g. {"col": [...], "transform": {"name": ..., "args": [...]}}) using the same Jackson mapper. See body summary, pillar 3.
| errorClass = "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE", | ||
| messageParameters = Map("expressionType" -> transformName)) | ||
| } | ||
| if (!transform.arguments().head.isInstanceOf[FieldReference]) { |
There was a problem hiding this comment.
Why must the column reference be at arguments().head? PARTITIONED BY (bucket(4, c1, c2)) already allows columns at any position; the equivalent restriction here is undocumented and the error message at :403-406 doesn't mention it. If the constraint is intentional, please document it and surface it in the error; if not, drop this check.
| resolver) | ||
|
|
||
| ClusterBySpec(normalizedColumns) | ||
| ClusterBySpec(normalizedColumns, clusterBySpec.clusteringColumnTransforms) |
There was a problem hiding this comment.
clusterBySpec.clusteringColumnTransforms is passed through unchanged — the FieldReference inside each transform's arguments is not normalized against the schema, so after normalization the column case/path in transforms can disagree with the case/path in normalizedColumns.
| }, | ||
| "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE": { | ||
| "message" : [ | ||
| "CLUSTER BY expression <expressionType> has either no column reference, or a column reference in an unsupported argument position." |
There was a problem hiding this comment.
This error is thrown for three distinct conditions in ClusterBySpec.fromExpressions (interface.scala:392-407): (a) no column reference, (b) more than one column reference, and (c) the column is not at argument position 0. The message describes only (a) and (c) — a CLUSTER BY (concat(c1, c2)) failure surfaces as if it were about argument position. Either split into separate error classes or rewrite the message to cover all three, and document the position-0 constraint somewhere user-facing.
| static TableChange clusterBy( | ||
| NamedReference[] clusteringColumns, | ||
| Optional<Transform>[] transforms) { | ||
| return new ClusterBy(clusteringColumns, transforms); | ||
| } |
There was a problem hiding this comment.
This replaces the previous clusterBy(NamedReference[]) factory with a new two-arg one — source-incompatible for any external connector that constructs TableChange.ClusterBy today. The PR description says the change is backward-compatible for connectors, but that's only true for receivers of the TableChange. Please add a backward-compatible single-arg overload (delegating with an empty Optional<Transform>[]).
Also, the Javadoc above only documents @param clusteringColumns; please add @param transforms.
| /** | ||
| * Validates clustering columns and their associated transforms. | ||
| * @param expectedTransforms per-column transforms, where None means a plain column reference | ||
| * and Some(transform) means an expression-based clustering column. | ||
| * Must have the same length as clusteringColumns. | ||
| */ | ||
| def validateClusterBy( | ||
| tableName: String, | ||
| clusteringColumns: Seq[String], | ||
| expectedTransforms: Seq[Option[Transform]]): Unit |
There was a problem hiding this comment.
This new abstract validateClusterBy(... expectedTransforms) is implemented in all four v1/v2 Create/Alter suites but never called from any test — no test issues CREATE TABLE … CLUSTER BY (func(col)) or the ALTER equivalent and verifies the transform survives the parser → spec → catalog → read-back round-trip. The only coverage of the new feature is ClusterBySpecSuite exercising helpers directly. Please add at least one happy-path test per v1/v2 catalog (and the corresponding ones in CreateTableClusterBySuiteBase at :47-56).
What changes were proposed in this pull request?
This change updates the parser grammar and the TableChange API to allow representing tables that have CLUSTER BY clauses on expressions (eg.
CLUSTER BY (variant_get(col, ...))instead of just CLUSTER BY(col).The goal is to make this TableChange backward-compatible with connectors that only support clustering by raw columns, while allowing connectors to opt-into supporting some transforms if they are present.
Why are the changes needed?
To support the use-case of being able to do eg.
ALTER TABLE ... CLUSTER BY (upper(col))Does this PR introduce any user-facing change?
Yes, this is a user-facing change - it adds new functionality but does not change any existing functionality.
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
Used claude code with opus 4.6 to assist in this change, but most of it was manually written.