diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 0af728c1958d4..4d135508de640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -157,9 +157,9 @@ abstract class FileTable( * @return */ protected def mergedOptions(options: CaseInsensitiveStringMap): CaseInsensitiveStringMap = { - val finalOptions = this.options.asCaseSensitiveMap().asScala ++ - options.asCaseSensitiveMap().asScala - new CaseInsensitiveStringMap(finalOptions.asJava) + val tableOnly = this.options.asCaseSensitiveMap().asScala + .filter { case (key, _) => !options.containsKey(key) } + new CaseInsensitiveStringMap((tableOnly ++ options.asCaseSensitiveMap().asScala).asJava) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala index 986276d1e8e3e..0a41d181b76dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala @@ -156,4 +156,44 @@ class FileTableSuite extends SharedSparkSession { } } } + + allFileBasedDataSources.foreach { format => + test("SPARK-49519, SPARK-50287: relation options override table options case-insensitively " + + s"when merging table and relation options - $format") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + val userSpecifiedSchema = StructType(Seq(StructField("c1", StringType))) + + DataSource.lookupDataSourceV2(format, spark.sessionState.conf) match { + case Some(provider) => + val dsOptions = new CaseInsensitiveStringMap(Map("lineSep" -> "table_val").asJava) + val table = provider.getTable( + userSpecifiedSchema, + Array.empty, + dsOptions.asCaseSensitiveMap()).asInstanceOf[FileTable] + val relationOptions = new CaseInsensitiveStringMap( + Map("linesep" -> "relation_val").asJava) + + val mergedReadOptions = table.newScanBuilder(relationOptions) match { + case csv: CSVScanBuilder => csv.options + case json: JsonScanBuilder => json.options + case orc: OrcScanBuilder => orc.options + case parquet: ParquetScanBuilder => parquet.options + case text: TextScanBuilder => text.options + } + assert(mergedReadOptions.size === 1) + assert(mergedReadOptions.get("lineSep") === "relation_val") + assert(mergedReadOptions.get("linesep") === "relation_val") + + val writeInfo = LogicalWriteInfoImpl("query-id", userSpecifiedSchema, relationOptions) + val mergedWriteOptions = table.newWriteBuilder(writeInfo).build() + .asInstanceOf[FileWrite].options + assert(mergedWriteOptions.size === 1) + assert(mergedWriteOptions.get("lineSep") === "relation_val") + assert(mergedWriteOptions.get("linesep") === "relation_val") + case _ => + throw new IllegalArgumentException(s"Failed to get table provider for $format") + } + } + } + } }