diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 828891f0b4983..17323165b451f 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -357,7 +357,7 @@ private[kafka010] class KafkaMicroBatchStream( } } } else { - Some(latestPartitionOffsets) + Option(latestPartitionOffsets) } KafkaMicroBatchStream.metrics(latestConsumedOffset, reCalculatedLatestPartitionOffsets) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index e6a794c22e1f2..274be623da6fa 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1883,6 +1883,32 @@ abstract class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBa // test null latestAvailablePartitionOffsets assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), None).isEmpty) } + + test("SPARK-57438: metrics should not NPE when latestPartitionOffsets is null") { + // Construct a KafkaMicroBatchStream instance without calling latestOffset(), + // so latestPartitionOffsets remains null (its default uninitialized value). + // Calling metrics() on this instance exercises the real non-RTM code path. + val topic = newTopic() + val tp = new TopicPartition(topic, 0) + + SparkSession.setActiveSession(spark) + withTempDir { dir => + val provider = new KafkaSourceProvider() + val options = Map( + "kafka.bootstrap.servers" -> testUtils.brokerAddress, + "subscribe" -> topic + ) + val dsOptions = new CaseInsensitiveStringMap(options.asJava) + val table = provider.getTable(dsOptions) + val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath) + .asInstanceOf[KafkaMicroBatchStream] + + // latestPartitionOffsets is still null - metrics() must not NPE + val offset = KafkaSourceOffset(Map(tp -> 0L)) + val result = stream.metrics(Optional.of(offset)) + assert(result.isEmpty) + } + } } class KafkaMicroBatchV1SourceWithAdminSuite extends KafkaMicroBatchV1SourceSuite {