Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ private[kafka010] class KafkaMicroBatchStream(
}
}
} else {
Some(latestPartitionOffsets)
Comment thread
yadavay-amzn marked this conversation as resolved.
Option(latestPartitionOffsets)
Comment thread
yadavay-amzn marked this conversation as resolved.
}

KafkaMicroBatchStream.metrics(latestConsumedOffset, reCalculatedLatestPartitionOffsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,32 @@ abstract class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBa
// test null latestAvailablePartitionOffsets
assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), None).isEmpty)
}

test("SPARK-57438: metrics should not NPE when latestPartitionOffsets is null") {
// Construct a KafkaMicroBatchStream instance without calling latestOffset(),
// so latestPartitionOffsets remains null (its default uninitialized value).
// Calling metrics() on this instance exercises the real non-RTM code path.
val topic = newTopic()
val tp = new TopicPartition(topic, 0)

SparkSession.setActiveSession(spark)
withTempDir { dir =>
val provider = new KafkaSourceProvider()
val options = Map(
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
)
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = provider.getTable(dsOptions)
val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
.asInstanceOf[KafkaMicroBatchStream]

// latestPartitionOffsets is still null - metrics() must not NPE
val offset = KafkaSourceOffset(Map(tp -> 0L))
val result = stream.metrics(Optional.of(offset))
assert(result.isEmpty)
}
}
}

class KafkaMicroBatchV1SourceWithAdminSuite extends KafkaMicroBatchV1SourceSuite {
Expand Down