Skip to content

[SPARK-57478][SQL] Read text files from tar archives#56527

Open
akshatshenoi-db wants to merge 5 commits into
apache:masterfrom
akshatshenoi-db:archive-text
Open

[SPARK-57478][SQL] Read text files from tar archives#56527
akshatshenoi-db wants to merge 5 commits into
apache:masterfrom
akshatshenoi-db:archive-text

Conversation

@akshatshenoi-db

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

SPARK-57135 added reading CSV files packed in tar archives (.tar/.tar.gz/.tgz) and SPARK-57321 added schema inference for them; SPARK-57419 extended both to JSON. All are gated by spark.sql.files.archive.reader.enabled. This extends archive reading to the text data source.

When the flag is enabled, the V1 text data source reads a tar archive as if it were a directory of its entries: each entry is streamed through ArchiveReader (never unpacked to disk) and read exactly like a standalone text file -- one row per line, or a single row holding the whole entry when wholeText is set (TextFileFormat.readArchive). The whole archive is one non-splittable unit (isSplitable returns false for an archive path), and a corrupt/missing archive is skipped as a unit under ignoreCorruptFiles/ignoreMissingFiles.

Text has a fixed value STRING schema, so there is no schema inference. Archive scanning is wired into the V1 file source only; the DSv2 reader is left untouched.

Why are the changes needed?

To let text ingestion read tar archives without unpacking them to disk, matching the CSV and JSON behavior already in Spark.

Does this PR introduce any user-facing change?

Yes. With spark.sql.files.archive.reader.enabled=true (default false), the text data source can read .tar/.tar.gz/.tgz files.

How was this patch tested?

New TextTarArchiveReadSuite: reads of multi-entry archives across all three extensions, parity with a directory read of the same files, wholeText and a custom line separator, empty and corrupt archives, the single-partition guarantee, and an archive mixed with loose files in the same directory.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

@akshatshenoi-db akshatshenoi-db left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 1 non-blocking, 0 nits.
Clean, minimal port of the archive-read pattern to text; readArchive faithfully mirrors readToUnsafeMem. One non-blocking cross-path consistency note on the (intentionally untouched) V2 text source.

Design / architecture (1)

  • TextFileFormat / v2 text source: with archiveFormatEnabled=true, a .tar read through the V1 source works correctly (this PR), but the V2 text source has no archive awareness — TextPartitionReaderFactory reads via HadoopFileLinesReader, so it scans the raw tar bytes as text lines and silently returns garbage rows. This is consistent with how V2 behaves for the sibling formats today rather than a new divergence: JSON has no archive support on master yet, and V2 CSV only fails loudly (UNABLE_TO_INFER_SCHEMA) while inferring — given an explicit .schema(...) the guard in CSVTable.inferSchema is bypassed (FileTable.dataSchema prefers the user schema) and V2 CSV silently mis-reads too. Text has no inference step at all, so there is nowhere to hang that guard. Not a regression (text never supported archives on any path before this PR) and the V2 source is intentionally out of scope here, so non-blocking — but worth a follow-up to give the V2 path a uniform archive guard across formats.

Verification

Traced readArchive against the per-file readToUnsafeMem: identical value-column rows — an empty required schema yields the shared emptyUnsafeRow, otherwise unsafeRowWriter.reset()/write(0, bytes, 0, length)/getRow() with length-aware Text handling (getBytes+getLength, not bytes.length); wholeText reads the whole entry as one row. The single reused UnsafeRowWriter/Text buffer is safe across entries because ArchiveReader advances to the next entry only after the current entry's iterator is exhausted (parse-before-advance), so no live row aliases a buffer about to be overwritten. isSplitable=false makes the whole archive one split; empty and corrupt (ignoreCorruptFiles) archives are covered by the suite.

PR description suggestions

  • Note the V2 limitation: with the flag enabled, the DSv2 text path silently reads archive bytes as text (it has no archive guard). This matches the other formats' V2 behavior today — JSON has no archive support yet, and V2 CSV's loud UNABLE_TO_INFER_SCHEMA only applies when inferring, not with an explicit schema.

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 blocking, 2 non-blocking, 0 nits.
Clean, faithful port of the archive-read pattern to the V1 text source; readArchive mirrors readToUnsafeMem and the CSV analogue, and the new suite is thorough.

Design / architecture (1)

  • V2 text source has no archive awareness: with the flag on and text forced through DSv2 (non-default), an archive is silently read as raw tar bytes (garbage rows), where v2 CSV/JSON raise UNABLE_TO_INFER_SCHEMA. Not a regression and V2 is scoped out, so non-blocking, but worth a follow-up ticket. This concurs with the existing review on this PR; not re-flagging.

Suggestions (1)

  • TextTarArchiveReadSuite.scala:173: ignoreMissingFiles is named in the design description but only ignoreCorruptFiles is tested — see inline.

Verification

Traced readArchive against the per-file readToUnsafeMem: identical value-column rows — an empty required schema yields the shared emptyUnsafeRow, otherwise length-aware writes into the single value column (getBytes + getLength, not bytes.length); wholeText reads the whole entry as one row. The single reused UnsafeRowWriter/Text buffer is safe across entries because ArchiveReader advances to the next entry only after the current one's iterator is exhausted, so no returned row aliases a buffer about to be overwritten. isSplitable=false makes the archive one split. BOM handling matches the non-archive path in the default config: the archive line path strips the UTF-8 BOM via ArchiveReader.lineIterator, as does the non-archive path via HadoopLineRecordReader (default on).

PR description suggestions

  • Document the V2 limitation: with the flag enabled, the DSv2 text path silently reads archive bytes as text rather than raising UNABLE_TO_INFER_SCHEMA like CSV/JSON, because text has no schema-inference guard.

}

Seq(true, false).foreach { ignoreCorrupt =>
test(s"ignoreCorruptFiles=$ignoreCorrupt controls whether a corrupt archive is skipped") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: the design description names ignoreMissingFiles alongside ignoreCorruptFiles, but only the corrupt case is covered here. Consider a parameterized ignoreMissingFiles case mirroring this test (a missing archive path skipped vs. erroring). Low-value since the behavior is inherited from FileScanRDD and isn't archive-specific, but it's the one behavior named in the description that has no test.

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 addressed, 1 remaining, 2 new. Prior ignoreMissingFiles suggestion now covered (TextTarArchiveReadSuite.scala:188-205); the V2 archive-awareness limitation remains (accepted/scoped out, not re-flagging).

0 blocking, 2 non-blocking, 0 nits.
Faithful port of the CSV archive-read pattern to the V1 text source; readArchive mirrors readToUnsafeMem. Two doc-accuracy notes below, both because JSON archive support isn't actually present in the tree yet.

Suggestions (2)

  • SQLConf.scala:2726-2729: the spark.sql.files.archive.reader.enabled doc still says archives are "streamed through the CSV parser" and "Only the CSV data source supports reading archives." This PR adds text support, so that's now inaccurate — please generalize the wording (e.g. "the CSV and text data sources"). The config file isn't in this PR's diff, hence a general comment rather than inline.
  • PR description: "SPARK-57419 extended both to JSON" — there is no JSON archive support on master (ArchiveReader.isArchivePath / archiveFormatEnabled are referenced only by the CSV and text sources, and the config doc still says only CSV). If that's in-flight/unmerged work, worth clarifying so the baseline reads accurately.

Verification

Traced readArchive against the per-file readToUnsafeMem: identical value-column rows — an empty required schema yields the shared emptyUnsafeRow, otherwise length-aware writes into the single value column (getBytes+getLength, not bytes.length); wholeText reads the whole entry as one row. The reused per-entry UnsafeRowWriter is safe across entries because ArchiveReader opens the next entry only after the current one's iterator is exhausted (parse-before-advance), so no returned row aliases a buffer about to be overwritten. isSplitable=false makes the archive one split. wholeText BOM handling matches the non-archive path (both read raw bytes via WholeTextFileRecordReader / readAllBytes, neither strips a BOM); the line path strips the BOM via ArchiveReader.lineIterator, matching the default HadoopLineRecordReader.

PR description suggestions

  • The references to JSON archive support ("SPARK-57419 extended both to JSON") don't match the current tree — either that work isn't merged yet or the reference is stale. Worth correcting so the baseline reads accurately.

Extend the text data source to read .tar/.tar.gz/.tgz archives when
spark.sql.files.archive.reader.enabled is set. Each entry is streamed through
the text reader (one row per line, or a whole-entry row under wholeText),
never unpacked to disk; the whole archive is one non-splittable unit. Text has
a fixed value STRING schema, so there is no inference. V1-only; DSv2 untouched.
Mirror the ignoreCorruptFiles parameterized test: a missing archive path is
skipped under ignoreMissingFiles and errors otherwise. Covers the one behavior
named in the PR description that previously had no test.
Move emptyUnsafeRow / unsafeRowWriter into the per-entry readEntries callback so each
archive entry gets its own row writer -- exactly as readToUnsafeMem builds one per file --
instead of sharing a single writer across all entries.
…text

The spark.sql.files.archive.reader.enabled doc named only the CSV data source.
JSON (SPARK-57419) and now text support reading archives too, so generalize the
wording instead of enumerating one source.

@cloud-fan cloud-fan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 addressed, 1 remaining, 1 new. Prior SQLConf doc and PR-description JSON findings now resolved — JSON archive support landed in the rebased tree, so the CSV/JSON/text wording is accurate. The DSv2 archive-awareness limitation remains (accepted/scoped-out, not re-flagging).

0 blocking, 1 non-blocking, 0 nits.
Clean, faithful port of the archive-read pattern to the V1 text source; readArchive mirrors readToUnsafeMem and the CSV/JSON analogues, and the new suite is thorough.

Suggestions (1)

  • TextTarArchiveReadSuite.scala:54: the tar-writing helpers duplicate TarArchiveReadBase — see inline.

Verification

Traced readArchive against readToUnsafeMem: identical value-column rows — empty required schema → shared emptyUnsafeRow, otherwise length-aware writes into the single value column (getBytes + getLength, not bytes.length); wholeText → one row per entry. The per-entry UnsafeRowWriter is safe across entries because ArchiveReader opens the next entry only after the current one's iterator is exhausted (parse-before-advance look-ahead), so no returned row aliases a buffer about to be overwritten. isSplitable=false makes the archive one split. BOM/lineSep handling and the isSplitable/buildReader archive dispatch match CSVFileFormat/JsonFileFormat; ignoreCorruptFiles/ignoreMissingFiles are handled by FileScanRDD (not readArchive) and both are tested.

private def textBytes(s: String): Array[Byte] = s.getBytes(StandardCharsets.UTF_8)

/** Writes `entries` (name -> bytes) into the archive at `dest`; compression follows the ext. */
private def writeArchive(dest: File, entries: Seq[(String, Array[Byte])]): Unit = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional / non-blocking: writeArchive, archiveExtensions (line 50), and the inlined corrupt-archive write (line 176) are byte-identical to TarArchiveReadBase.writeArchive / archiveExtensions / writeCorruptArchive. The CSV/JSON tar suites get these for free by mixing in TarArchiveReadBase, but text can't — TarArchiveReadBase extends ArchiveReadSuiteBase, whose two-column / schema-inference tests don't fit text's single value column (your comment above correctly explains why). Consider extracting just the tar-writing helpers into a small standalone trait that both TarArchiveReadBase and this suite mix in, so the container logic lives in one place and can't silently drift from the CSV/JSON suites if tar writing is ever changed. Low priority.

…d trait

writeArchive/archiveExtensions/writeCorruptArchive were duplicated between
TarArchiveReadBase and the standalone TextTarArchiveReadSuite (text can't mix in
TarArchiveReadBase, whose two-column tests don't fit text's single value column).
Move them to a new TarArchiveTestUtils trait that both mix in, so the tar
container logic lives in one place.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants