[SPARK-57471][SQL] Populate input/output size metrics for JDBC reads and writes#56550
Open
yadavay-amzn wants to merge 1 commit into
Open
[SPARK-57471][SQL] Populate input/output size metrics for JDBC reads and writes#56550yadavay-amzn wants to merge 1 commit into
yadavay-amzn wants to merge 1 commit into
Conversation
…and writes Co-authored-by: Craiu Constantin-Tiberiu <craiu.constantin.tiberiu@gmail.com>
0b6c225 to
eb1c756
Compare
Comment on lines
+2507
to
+2520
| def collectBytesRead(table: String): Long = { | ||
| val taskMetrics = new ArrayBuffer[Long]() | ||
| sparkContext.listenerBus.waitUntilEmpty() | ||
| val listener = new SparkListener() { | ||
| override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { | ||
| taskMetrics += taskEnd.taskMetrics.inputMetrics.bytesRead | ||
| } | ||
| } | ||
| sparkContext.addSparkListener(listener) | ||
| spark.read.jdbc(urlWithUserAndPass, table, props).collect() | ||
| sparkContext.listenerBus.waitUntilEmpty() | ||
| sparkContext.removeSparkListener(listener) | ||
| taskMetrics.sum | ||
| } |
There was a problem hiding this comment.
Could this be extractd into a helper?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Populate the input/output size (bytes) task metrics for the JDBC datasource (both reads and writes). Previously only the record counts were populated and the byte-size metrics always reported 0, because JDBC does no Hadoop filesystem I/O (so the default
bytesReadcallback returns 0) and the write path never setbytesWritten.Since there is no filesystem to measure, the size is estimated on the Spark side from each rows schema and values in
JdbcUtils:inputMetrics.incRecordsRead(1), callinputMetrics.incBytesRead(...)with the estimated row size (theInternalRowand schema are already in scope).savePartition): accumulate the estimated size per row and calloutMetrics.setBytesWritten(...)alongside the existingsetRecordsWrittenon both completion paths.The estimate uses the actual byte length for variable-length values (
UTF8String.numBytes()for strings on the read path;Array[Byte].lengthfor binary) anddataType.defaultSizefor fixed-width types; null fields contribute 0. On the write path the string size uses the character length to avoid per-row allocation in the hot write loop (exact for ASCII; a reasonable estimate otherwise). Both the v1 and v2 JDBC paths converge inJdbcUtils, so a single change covers both.Why are the changes needed?
JDBC read/write tasks reported
bytesRead/bytesWrittenas 0, so users and tooling had no size signal for JDBC I/O (only row counts). A Spark-side estimate gives a useful, non-zero approximation.Does this PR introduce any user-facing change?
Yes. The input/output size metrics for JDBC reads and writes are now populated with an estimated byte size instead of always 0. These are estimates of the Spark-side row size, not exact wire bytes.
How was this patch tested?
New tests in
JDBCSuite(read) andJDBCWriteSuite(write), using the existingSparkListener.onTaskEndmetrics pattern:Was this patch authored or co-authored using generative AI tooling?
Yes.