Skip to content

[SPARK-57471][SQL] Populate input/output size metrics for JDBC reads and writes#56550

Open
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:fix/SPARK-57471-jdbc-size-metrics
Open

[SPARK-57471][SQL] Populate input/output size metrics for JDBC reads and writes#56550
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:fix/SPARK-57471-jdbc-size-metrics

Conversation

@yadavay-amzn

@yadavay-amzn yadavay-amzn commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Populate the input/output size (bytes) task metrics for the JDBC datasource (both reads and writes). Previously only the record counts were populated and the byte-size metrics always reported 0, because JDBC does no Hadoop filesystem I/O (so the default bytesRead callback returns 0) and the write path never set bytesWritten.

Since there is no filesystem to measure, the size is estimated on the Spark side from each rows schema and values in JdbcUtils:

  • Read path: after inputMetrics.incRecordsRead(1), call inputMetrics.incBytesRead(...) with the estimated row size (the InternalRow and schema are already in scope).
  • Write path (savePartition): accumulate the estimated size per row and call outMetrics.setBytesWritten(...) alongside the existing setRecordsWritten on both completion paths.

The estimate uses the actual byte length for variable-length values (UTF8String.numBytes() for strings on the read path; Array[Byte].length for binary) and dataType.defaultSize for fixed-width types; null fields contribute 0. On the write path the string size uses the character length to avoid per-row allocation in the hot write loop (exact for ASCII; a reasonable estimate otherwise). Both the v1 and v2 JDBC paths converge in JdbcUtils, so a single change covers both.

Why are the changes needed?

JDBC read/write tasks reported bytesRead/bytesWritten as 0, so users and tooling had no size signal for JDBC I/O (only row counts). A Spark-side estimate gives a useful, non-zero approximation.

Does this PR introduce any user-facing change?

Yes. The input/output size metrics for JDBC reads and writes are now populated with an estimated byte size instead of always 0. These are estimates of the Spark-side row size, not exact wire bytes.

How was this patch tested?

New tests in JDBCSuite (read) and JDBCWriteSuite (write), using the existing SparkListener.onTaskEnd metrics pattern:

  • bytes are 0 on master and populated with the fix (TDD),
  • bytes scale with string width and with binary length (proving variable-length estimation, not a constant),
  • null values do not error and yield fewer bytes than the populated equivalent.

Was this patch authored or co-authored using generative AI tooling?

Yes.

…and writes

Co-authored-by: Craiu Constantin-Tiberiu <craiu.constantin.tiberiu@gmail.com>
@yadavay-amzn yadavay-amzn force-pushed the fix/SPARK-57471-jdbc-size-metrics branch from 0b6c225 to eb1c756 Compare June 16, 2026 22:07
Comment on lines +2507 to +2520
def collectBytesRead(table: String): Long = {
val taskMetrics = new ArrayBuffer[Long]()
sparkContext.listenerBus.waitUntilEmpty()
val listener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskMetrics += taskEnd.taskMetrics.inputMetrics.bytesRead
}
}
sparkContext.addSparkListener(listener)
spark.read.jdbc(urlWithUserAndPass, table, props).collect()
sparkContext.listenerBus.waitUntilEmpty()
sparkContext.removeSparkListener(listener)
taskMetrics.sum
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be extractd into a helper?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants