Skip to content

[SQL] Update grammar/TableChange to allow CLUSTER BY expressions#55327

Open
itsbilal wants to merge 1 commit into
apache:masterfrom
itsbilal:sql-add-cluster-by-expressions
Open

[SQL] Update grammar/TableChange to allow CLUSTER BY expressions#55327
itsbilal wants to merge 1 commit into
apache:masterfrom
itsbilal:sql-add-cluster-by-expressions

Conversation

@itsbilal
Copy link
Copy Markdown

What changes were proposed in this pull request?

This change updates the parser grammar and the TableChange API to allow representing tables that have CLUSTER BY clauses on expressions (eg. CLUSTER BY (variant_get(col, ...)) instead of just CLUSTER BY (col).

The goal is to make this TableChange backward-compatible with connectors that only support clustering by raw columns, while allowing connectors to opt-into supporting some transforms if they are present.

Why are the changes needed?

To support the use-case of being able to do eg. ALTER TABLE ... CLUSTER BY (upper(col))

Does this PR introduce any user-facing change?

Yes, this is a user-facing change - it adds new functionality but does not change any existing functionality.

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

Used claude code with opus 4.6 to assist in this change, but most of it was manually written.

This change updates the parser grammar and the TableChange API to allow
representing tables that have CLUSTER BY clauses on expressions (eg.
`CLUSTER BY (variant_get(col, ...))` instead of just CLUSTER BY `(col)`.

The goal is to make this TableChange backward-compatible with
connectors that only support clustering by raw columns, while allowing
connectors to opt-into supporting some transforms if they are present.
@itsbilal itsbilal force-pushed the sql-add-cluster-by-expressions branch from d98079c to ba9ee66 Compare May 12, 2026 00:12
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds CLUSTER BY (expression) support but invents a parallel mechanism instead of reusing or following the existing v2 transform infrastructure that already handles this shape for PARTITIONED BY. The result has three structural problems that I think warrant a redesign before this goes further — the rest of the findings below all fall out of these. I've put them as questions so we can discuss the direction; happy to be wrong on any of them.

1. Why not follow the PARTITIONED BY transform pattern?

PARTITIONED BY (bucket(4, c1, c2)) and PARTITIONED BY (years(ts)) already work and have done for years. Everything you need to support CLUSTER BY (upper(c)) is already in the codebase under that path:

PARTITIONED BY (today) This PR
Grammar applyTransform: identifier '(' transformArgument,... ')' (g4:1251) — only name(col_or_lit,...) general expression (g4:549) — accepts 1+1, case … end, …, validated post-parse with IllegalStateException
Args transformArgument: qualifiedName | constant → V2 FieldReference/LiteralValue directly (AstBuilder.scala:5130) Catalyst UnresolvedFunction/UnresolvedAttribute/Literal then manual conversion in ClusterBySpec.fromExpressions
Generic-call Transform ApplyTransform (case class, proper equals/hashCode) new ClusteringColumnTransform (final class, hand-rolled asymmetric equals at expressions.scala:242) plus new ClusterByColumnTransform descriptor whose argumentIndex is dead state (fromExpressions forces col to position 0)
Column position / cardinality bucket(4, c1, c2) — column at any position, multiple columns OK column must be arguments().head and there must be exactly one (interface.scala:392-407); restrictions undocumented
v2 wire shape one Transform per element in table.partitioning: Array[Transform] ClusterByTransform.columnNames + a side-channel transforms: Seq[ClusterByColumnTransform] field; plus TableChange.clusterBy grows a parallel Optional<Transform>[] parameter
DESCRIBE display full transform.describe() only column names — transform invisible

The natural shape is cluster_by(c1, upper(c2), bucket(4, c3)) — one Transform, with the per-column wrappers as arguments (like SortedBucketTransform carries multiple typed args today). Then describe(), toString, arguments() all do the right thing for free; DESCRIBE TABLE and SHOW CREATE TABLE round-trip; connectors see one consistent Transform shape; and the new ClusterByColumnTransform/ClusteringColumnTransform classes can be deleted in favor of ApplyTransform. If a hard "exactly one column per arg" constraint is wanted, that can be a require on the new ClusterByTransform.arguments, not a separate type system.

2. The v2 API is now receiving SQL-syntax strings.

Three spots:

  • ClusteringColumnTransform.name = QuotingUtils.quoted(u.nameParts.toArray) (interface.scala:339 and :380) — for multi-part function names, the v2 Transform.name() is now a backtick-quoted SQL identifier string (`db`.`myfunc`). Every existing transform's name is an unqualified plain string ("bucket", "years", ApplyTransform.name = applyCtx.identifier.getText at AstBuilder.scala:5091 — the applyTransform grammar only allows a single identifier). Connectors comparing transform.name() to a plain function name will silently miss namespaced cases here.
  • FieldReference(QuotingUtils.quoted(a.nameParts.toArray)) (interface.scala:341-342 and :382-383) — has structured Seq[String] parts in hand, stringifies with backtick quoting, then FieldReference(String) re-parses via CatalystSqlParser.parseMultipartIdentifier. Should just be FieldReference(a.nameParts) (the Seq[String] apply that exists already).
  • The Optional<Transform>[] parameter on TableChange.clusterBy is fine as a Java type, but it carries Transforms whose name/args are themselves stringy per the points above.

3. Catalog property storage round-trips through the SQL parser.

ClusterBySpec.toColumnNames (interface.scala:289-307) writes a transform as a SQL function-call string and embeds it in the existing Seq[Seq[String]] JSON shape. fromColumnEntries (interface.scala:331-368) calls CatalystSqlParser.parseExpression on each single-element entry to recover the transform.

Two concrete consequences:

  • Format ambiguity: a column whose name happens to parse as a function call — e.g. backtick-quoted CLUSTER BY (`lower(col)`) — is stored as [["lower(col)"]] (indistinguishable from a transform entry) and read back as a ClusteringColumnTransform. Existing tables with such names silently regress.
  • Parser coupling for on-disk catalog data: round-trip stability depends on Literal(value, dataType).sql ↔ the SQL parser. TimestampType.sql emits TIMESTAMP 'literal', which the parser turns into a Cast of a string, not a Literal — the ClusterBySpecSuite round-trip tests only pass because they pin the session timezone. A future Literal type or any SQL-syntax change could break stored metadata.

Use a structured format: e.g. each entry is {"col": [...]} for plain columns or {"col": [...], "transform": {"name": ..., "args": [...]}} for transforms, all encoded with the same Jackson mapper already in use. No re-parsing, no ambiguity, no parser coupling.

Other findings (mostly fall out of the above)

  • ClusterByTransform.unapply (expressions.scala:215) still returns only Option[Seq[NamedReference]] — every case ClusterByTransform(columnNames) extractor silently drops the new state, e.g. InMemoryBaseTable.scala:297 and TransformExtractorSuite.scala:225/:233. Goes away once transforms is folded into arguments.
  • TableChange.clusterBy(NamedReference[]) is replaced rather than overloaded — source-incompatible for external connectors that construct this change. PR description says the change is backward-compatible; please add the single-arg overload.
  • normalizeClusterBySpec (interface.scala:469) normalizes columnNames but not the FieldReference inside each transform's arguments. After normalization the two can disagree in case/path.
  • CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE (error-conditions.json:728 + interface.scala:392-407) is thrown for three different conditions (no col, >1 col, col not at position 0) with a single message that mentions only two. Position-0 constraint is undocumented.
  • CatalogV2Util.clusterBySpecFromChange doesn't assert transforms.length == clusteringColumns.length — a misaligned connector input silently produces a corrupted spec.
  • fromExpressions accepts any UnresolvedFunction, silently dropping isDistinct/window markers — CLUSTER BY (count(DISTINCT x)) parses cleanly.
  • No end-to-end DDL test. validateClusterBy(...expectedTransforms) is implemented in all four v1/v2 Create/Alter base suites but never called from any test. The only coverage of the new path is ClusterBySpecSuite exercising helpers directly. grep -rE "CLUSTER BY \([a-z_]+\(" sql/ returned no matches.
  • PR title is missing [SPARK-xxxx]. Please file a JIRA and update the title to [SPARK-xxxxx][SQL] ....

Comment on lines +549 to +552
expressionOrMultipartIdentifier
: expression
| multipartIdentifier
;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider reusing the existing transform rule from line 1251 (used by PARTITIONED BY) instead of inventing expressionOrMultipartIdentifier. The general expression rule accepts arbitrary inputs (e.g. 1+1, CASE … END) that then fail post-parse with a generic IllegalStateException from fromExpressions; the dedicated applyTransform: identifier '(' transformArgument,... ')' rule rejects those at parse time with a targeted error. See the body summary, pillar 1.

Comment on lines +171 to +175
case class ClusterByColumnTransform(
columnIndex: Int,
argumentIndex: Int,
function: String,
arguments: Seq[LiteralValue[_]])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both ClusterByColumnTransform (this abstract descriptor) and ClusteringColumnTransform (the concrete Transform added further below) can be removed if you reuse ApplyTransform and store per-column transforms as the arguments of ClusterByTransform itself — cluster_by(c1, upper(c2), bucket(4, c3)). That gives correct describe() / toString / arguments() for free, fixes the DESCRIBE / SHOW CREATE TABLE round-trip, and matches PARTITIONED BY's wire shape. The argumentIndex field here is also dead state today because fromExpressions forces the column reference to position 0. See body summary, pillar 1.

Comment on lines +338 to +346
val transform: Transform = new ClusteringColumnTransform(
QuotingUtils.quoted(u.nameParts.toArray),
u.children.map {
case a: UnresolvedAttribute =>
FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
case l: Literal => LiteralValue(l.value, l.dataType)
case other => throw new IllegalStateException(
s"Unexpected argument type in CLUSTER BY expression: ${other.getClass}")
}.toArray)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two SQL-string round-trips into the v2 API on these lines (and the parallel block at :379-387):

  1. name = QuotingUtils.quoted(u.nameParts.toArray) puts a backtick-quoted SQL identifier string into Transform.name() for multi-part function names. Every existing v2 transform's name is an unqualified plain string ("bucket", "years", ApplyTransform.name = applyCtx.identifier.getText at AstBuilder.scala:5091). Connectors comparing transform.name() against a plain function name will silently miss namespaced cases.
  2. FieldReference(QuotingUtils.quoted(a.nameParts.toArray)) has structured Seq[String] parts in hand, stringifies them with backtick quoting, then FieldReference(String) re-parses via CatalystSqlParser.parseMultipartIdentifier. Should be FieldReference(a.nameParts) (the Seq[String] apply that exists already).

See body summary, pillar 2.

Comment on lines +289 to +307
def toColumnNames: String = {
val entries: Seq[Seq[String]] = if (clusteringColumnTransforms.isEmpty) {
columnNames.map(_.fieldNames().toSeq)
} else {
columnNames.zip(clusteringColumnTransforms).map {
case (colName, None) => colName.fieldNames().toSeq
case (colName, Some(transform)) =>
val args = transform.arguments().map {
case n: NamedReference => n.fieldNames().map(QuotingUtils.quoteIfNeeded).mkString(".")
case LiteralValue(value, dataType) =>
Literal(value, dataType).sql
case other => throw new IllegalStateException(
s"Unexpected argument type in CLUSTER BY expression: ${other.getClass}")
}
Seq(s"${QuotingUtils.quoteIfNeeded(transform.name())}(${args.mkString(",")})")
}
}
ClusterBySpec.mapper.writeValueAsString(entries)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stringifies a v2 Transform into a SQL function-call expression and embeds it in the existing Seq[Seq[String]] JSON shape; fromColumnEntries at :331 then calls CatalystSqlParser.parseExpression on each single-element entry to recover the transform. Two concrete consequences:

  • A column whose name parses as a function call — e.g. backtick-quoted CLUSTER BY (`lower(col)`) — is stored as [["lower(col)"]] (indistinguishable from a transform entry) and read back as a ClusteringColumnTransform. Silent regression on existing tables with such names.
  • Round-trip stability depends on Literal(...).sql ↔ the SQL parser. TimestampType.sql emits TIMESTAMP 'literal', which the parser turns into a Cast of a string, not a Literal — the ClusterBySpecSuite timestamp tests pass only because they pin the session timezone.

Please use a structured format (e.g. {"col": [...], "transform": {"name": ..., "args": [...]}}) using the same Jackson mapper. See body summary, pillar 3.

errorClass = "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE",
messageParameters = Map("expressionType" -> transformName))
}
if (!transform.arguments().head.isInstanceOf[FieldReference]) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why must the column reference be at arguments().head? PARTITIONED BY (bucket(4, c1, c2)) already allows columns at any position; the equivalent restriction here is undocumented and the error message at :403-406 doesn't mention it. If the constraint is intentional, please document it and surface it in the error; if not, drop this check.

resolver)

ClusterBySpec(normalizedColumns)
ClusterBySpec(normalizedColumns, clusterBySpec.clusteringColumnTransforms)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterBySpec.clusteringColumnTransforms is passed through unchanged — the FieldReference inside each transform's arguments is not normalized against the schema, so after normalization the column case/path in transforms can disagree with the case/path in normalizedColumns.

},
"CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE": {
"message" : [
"CLUSTER BY expression <expressionType> has either no column reference, or a column reference in an unsupported argument position."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is thrown for three distinct conditions in ClusterBySpec.fromExpressions (interface.scala:392-407): (a) no column reference, (b) more than one column reference, and (c) the column is not at argument position 0. The message describes only (a) and (c) — a CLUSTER BY (concat(c1, c2)) failure surfaces as if it were about argument position. Either split into separate error classes or rewrite the message to cover all three, and document the position-0 constraint somewhere user-facing.

Comment on lines +282 to 286
static TableChange clusterBy(
NamedReference[] clusteringColumns,
Optional<Transform>[] transforms) {
return new ClusterBy(clusteringColumns, transforms);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This replaces the previous clusterBy(NamedReference[]) factory with a new two-arg one — source-incompatible for any external connector that constructs TableChange.ClusterBy today. The PR description says the change is backward-compatible for connectors, but that's only true for receivers of the TableChange. Please add a backward-compatible single-arg overload (delegating with an empty Optional<Transform>[]).

Also, the Javadoc above only documents @param clusteringColumns; please add @param transforms.

Comment on lines +47 to +56
/**
* Validates clustering columns and their associated transforms.
* @param expectedTransforms per-column transforms, where None means a plain column reference
* and Some(transform) means an expression-based clustering column.
* Must have the same length as clusteringColumns.
*/
def validateClusterBy(
tableName: String,
clusteringColumns: Seq[String],
expectedTransforms: Seq[Option[Transform]]): Unit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new abstract validateClusterBy(... expectedTransforms) is implemented in all four v1/v2 Create/Alter suites but never called from any test — no test issues CREATE TABLE … CLUSTER BY (func(col)) or the ALTER equivalent and verifies the transform survives the parser → spec → catalog → read-back round-trip. The only coverage of the new feature is ClusterBySpecSuite exercising helpers directly. Please add at least one happy-path test per v1/v2 catalog (and the corresponding ones in CreateTableClusterBySuiteBase at :47-56).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants