diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java b/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java index ab812849c55..53ed7e0f5d9 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java @@ -128,6 +128,57 @@ public OptionalLong asOptional() { } } + /** + * Sum of the on-storage byte sizes of all files included in this data source along with the precision of that + * estimate. Mirrors the Rust {@code Option>} returned by {@code DataSource::byte_size}: + * {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return + * sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and + * {@link ByteSize.Exact} when every file has a known size. + */ + public ByteSize byteSize() { + long[] out = new long[2]; + NativeDataSource.byteSize(pointer, out); + return switch ((int) out[1]) { + case 1 -> new ByteSize.Estimate(out[0]); + case 2 -> new ByteSize.Exact(out[0]); + default -> ByteSize.Unknown.INSTANCE; + }; + } + + /** Precision-aware byte size. See {@link #byteSize()}. */ + public sealed interface ByteSize { + /** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */ + OptionalLong asOptional(); + + /** Byte size is not known. */ + final class Unknown implements ByteSize { + public static final Unknown INSTANCE = new Unknown(); + + private Unknown() {} + + @Override + public OptionalLong asOptional() { + return OptionalLong.empty(); + } + } + + /** Estimated byte size; the actual value may differ. */ + record Estimate(long value) implements ByteSize { + @Override + public OptionalLong asOptional() { + return OptionalLong.of(value); + } + } + + /** Exact byte size. */ + record Exact(long value) implements ByteSize { + @Override + public OptionalLong asOptional() { + return OptionalLong.of(value); + } + } + } + /** Submit a scan. */ public Scan scan(ScanOptions options) { Objects.requireNonNull(options, "options"); diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java index b7e58d2dc21..cc2aa163cee 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java @@ -33,4 +33,10 @@ private NativeDataSource() {} * {@code 1=estimate}, {@code 2=exact}. */ public static native void rowCount(long pointer, long[] out); + + /** + * Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source. + * Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}. + */ + public static native void byteSize(long pointer, long[] out); } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java index c6ec03eef80..c8b8184dd2f 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java @@ -3,21 +3,46 @@ package dev.vortex.spark.read; +import dev.vortex.api.DataSource; +import dev.vortex.api.Session; +import dev.vortex.jni.NativeFiles; +import dev.vortex.spark.VortexSparkSession; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.spark.sql.connector.catalog.CatalogV2Util; import org.apache.spark.sql.connector.catalog.Column; +import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; -/** Spark V2 {@link Scan} over a table of Vortex files. */ -public final class VortexScan implements Scan { +/** + * Spark V2 {@link Scan} over a table of Vortex files. + * + *

Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a + * Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by + * {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor + * and scaling by the pushed read schema's default size relative to the full table schema's default size. When the + * listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is + * applied. + */ +public final class VortexScan implements Scan, SupportsReportStatistics { private final List paths; + private final List tableColumns; private final List readColumns; private final Map formatOptions; + private volatile Statistics cachedStatistics; + /** * Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing * immutable collections; the constructor does not copy. @@ -26,7 +51,24 @@ public final class VortexScan implements Scan { * @param readColumns the list of columns to read from the files */ public VortexScan(List paths, List readColumns, Map formatOptions) { + this(paths, readColumns, readColumns, formatOptions); + } + + /** + * Creates a new VortexScan for the specified file paths, table columns, and requested read columns. The caller is + * responsible for passing immutable collections; the constructor does not copy. + * + * @param paths the list of Vortex file paths to scan + * @param tableColumns the full table columns before projection pushdown + * @param readColumns the columns Spark requested after projection pushdown + */ + public VortexScan( + List paths, + List tableColumns, + List readColumns, + Map formatOptions) { this.paths = paths; + this.tableColumns = tableColumns; this.readColumns = readColumns; this.formatOptions = formatOptions; } @@ -72,4 +114,77 @@ public Batch toBatch() { public ColumnarSupportMode columnarSupportMode() { return ColumnarSupportMode.SUPPORTED; } + + /** + * Returns statistics for this scan. + * + *

Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the + * data source (sum of file-footer row counts; extrapolated from the first opened file when other files are + * deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem + * listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full + * table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no + * file size is known at all the value is left empty so Spark falls back to its default heuristic. + * + * @return statistics with row-count and Spark scan-size estimates + */ + @Override + public Statistics estimateStatistics() { + Statistics local = cachedStatistics; + if (local != null) { + return local; + } + synchronized (this) { + if (cachedStatistics == null) { + cachedStatistics = computeStatistics(); + } + return cachedStatistics; + } + } + + private Statistics computeStatistics() { + Session session = VortexSparkSession.get(formatOptions); + // Expand directory paths to concrete files the way VortexBatchExec does, so we use the + // same per-path resolution end-to-end. + List resolvedPaths = paths.stream() + .flatMap(path -> path.endsWith(".vortex") + ? Stream.of(path) + : NativeFiles.listFiles(session, path, formatOptions).stream()) + .collect(Collectors.toList()); + + if (resolvedPaths.isEmpty()) { + return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty()); + } + + DataSource source = DataSource.open(session, resolvedPaths, formatOptions); + return new VortexStatistics( + source.rowCount().asOptional(), + scaleSizeInBytes(source.byteSize().asOptional())); + } + + private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) { + if (fileBytes.isEmpty()) { + return OptionalLong.empty(); + } + + StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0])); + StructType readSchema = readSchema(); + int tableDefaultSize = tableSchema.defaultSize(); + if (tableDefaultSize <= 0) { + return fileBytes; + } + + double scaled = SQLConf.get().fileCompressionFactor() + * fileBytes.getAsLong() + / tableDefaultSize + * readSchema.defaultSize(); + return OptionalLong.of((long) scaled); + } + + private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics { + + @Override + public Map columnStats() { + return new HashMap<>(); + } + } } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java index a53472bc33b..1710374e008 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java @@ -19,13 +19,15 @@ /** Spark V2 {@link ScanBuilder} for table scans over Vortex files. */ public final class VortexScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { private final ImmutableList.Builder paths; - private final List columns; + private final List tableColumns; + private final List readColumns; private final Map formatOptions; /** Creates a new VortexScanBuilder with empty paths and columns. */ public VortexScanBuilder(Map formatOptions) { this.paths = ImmutableList.builder(); - this.columns = new ArrayList<>(); + this.tableColumns = new ArrayList<>(); + this.readColumns = new ArrayList<>(); this.formatOptions = Map.copyOf(formatOptions); } @@ -47,7 +49,8 @@ public VortexScanBuilder addPath(String path) { * @return this builder for method chaining */ public VortexScanBuilder addColumn(Column column) { - this.columns.add(column); + this.tableColumns.add(column); + this.readColumns.add(column); return this; } @@ -70,7 +73,7 @@ public VortexScanBuilder addAllPaths(Iterable paths) { */ public VortexScanBuilder addAllColumns(Iterable columns) { for (Column column : columns) { - this.columns.add(column); + addColumn(column); } return this; } @@ -89,7 +92,7 @@ public Scan build() { // Allow empty columns for operations like count() that don't need actual column data // If no columns are specified, we'll read the minimal schema needed - return new VortexScan(paths, List.copyOf(this.columns), this.formatOptions); + return new VortexScan(paths, List.copyOf(this.tableColumns), List.copyOf(this.readColumns), this.formatOptions); } /** @@ -103,9 +106,9 @@ public Scan build() { @Override public void pruneColumns(StructType requiredSchema) { // TODO(aduffy): support deeply nested schema prunes - columns.clear(); + readColumns.clear(); for (StructField field : requiredSchema.fields()) { - columns.add(Column.create(field.name(), field.dataType())); + readColumns.add(Column.create(field.name(), field.dataType())); } } } diff --git a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java new file mode 100644 index 00000000000..c28ec8b3569 --- /dev/null +++ b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java @@ -0,0 +1,232 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.vortex.spark.read.VortexScan; +import dev.vortex.spark.read.VortexScanBuilder; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Column; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; + +/** + * Integration tests for {@link VortexScan#estimateStatistics()}. + * + *

Verifies that the Spark V2 scan surfaces both the row count Vortex stores in each file footer and the sum of the + * on-storage file sizes reported by the filesystem listing. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public final class VortexDataSourceStatsTest { + private static final String FILE_COMPRESSION_FACTOR_KEY = "spark.sql.sources.fileCompressionFactor"; + + private SparkSession spark; + + @TempDir + Path tempDir; + + @BeforeAll + public void setUp() { + spark = SparkSession.builder() + .appName("VortexStatsTest") + .master("local[2]") + .config("spark.driver.host", "127.0.0.1") + .config("spark.sql.shuffle.partitions", "2") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.ui.enabled", "false") + .getOrCreate(); + } + + @AfterAll + public void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + @Test + @DisplayName("VortexScan reports exact row count for single-file scans") + public void testEstimateStatisticsReportsRowCount() throws IOException { + int numRows = 250; + Path outputPath = writeRows(numRows, "single_file"); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue( + stats.numRows().isPresent(), + "VortexScan should report a row count for a Vortex dataset with a populated footer"); + assertEquals(numRows, stats.numRows().getAsLong(), "Row count should match the rows we wrote"); + } + + @Test + @DisplayName("VortexScan reports aggregate row count across multi-file scans") + public void testEstimateStatisticsAcrossMultipleFiles() throws IOException { + int numRows = 400; + Path outputPath = writeRows(numRows, "multi_file", 4); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue(stats.numRows().isPresent(), "Row count should be reported for multi-file Vortex datasets"); + assertEquals(numRows, stats.numRows().getAsLong(), "Row count should sum across all files"); + } + + @Test + @DisplayName("VortexScan reports sizeInBytes equal to the sum of on-storage file sizes") + public void testEstimateStatisticsReportsSizeInBytes() throws IOException { + Path outputPath = writeRows(120, "with_size", 3); + + long expectedTotalBytes = totalVortexFileBytes(outputPath); + assertTrue(expectedTotalBytes > 0, "Test setup should produce at least one non-empty .vortex file"); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue( + stats.sizeInBytes().isPresent(), + "VortexScan should surface a sizeInBytes when the filesystem listing reports file sizes"); + assertEquals( + expectedTotalBytes, + stats.sizeInBytes().getAsLong(), + "sizeInBytes should equal the sum of on-storage .vortex file sizes"); + } + + @Test + @DisplayName("VortexScan scales sizeInBytes by the pushed read schema") + public void testEstimateStatisticsScalesSizeInBytesForProjection() throws IOException { + Path outputPath = writeRows(120, "projected_size", 3); + long fileBytes = totalVortexFileBytes(outputPath); + + StructType fullSchema = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load() + .schema(); + StructType idOnlySchema = new StructType(new StructField[] {fullSchema.fields()[0]}); + + String previousCompressionFactor = spark.conf().get(FILE_COMPRESSION_FACTOR_KEY); + spark.conf().set(FILE_COMPRESSION_FACTOR_KEY, "0.5"); + try { + VortexScan scan = buildScan(outputPath, idOnlySchema); + Statistics stats = scan.estimateStatistics(); + + long expectedSize = (long) (0.5 * fileBytes / fullSchema.defaultSize() * idOnlySchema.defaultSize()); + assertTrue(stats.sizeInBytes().isPresent(), "Projected scans should still surface sizeInBytes"); + assertEquals( + expectedSize, + stats.sizeInBytes().getAsLong(), + "sizeInBytes should follow Spark FileScan's compression and schema-width scaling"); + assertTrue( + stats.sizeInBytes().getAsLong() < fileBytes, + "Projected scan stats should be smaller than full file bytes"); + } finally { + spark.conf().set(FILE_COMPRESSION_FACTOR_KEY, previousCompressionFactor); + } + } + + @Test + @DisplayName("VortexScan caches statistics across repeated calls") + public void testEstimateStatisticsIsCached() throws IOException { + Path outputPath = writeRows(50, "cached", 1); + + VortexScan scan = buildScan(outputPath); + Statistics first = scan.estimateStatistics(); + Statistics second = scan.estimateStatistics(); + + // Same instance returned -- the second call hits the cached value. + assertEquals(first, second, "estimateStatistics() should return the same Statistics object on repeat calls"); + assertInstanceOf(Statistics.class, first); + } + + private VortexScan buildScan(Path outputPath) { + return buildScan(outputPath, null); + } + + private VortexScan buildScan(Path outputPath, StructType requiredSchema) { + Dataset readDf = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load(); + StructType readSchema = readDf.schema(); + + VortexScanBuilder builder = new VortexScanBuilder(Map.of()); + builder.addPath(outputPath.toUri().toString()); + for (StructField field : readSchema.fields()) { + builder.addColumn(Column.create(field.name(), field.dataType())); + } + if (requiredSchema != null) { + builder.pruneColumns(requiredSchema); + } + return (VortexScan) builder.build(); + } + + private Path writeRows(int numRows, String name) throws IOException { + return writeRows(numRows, name, 1); + } + + private long totalVortexFileBytes(Path outputPath) throws IOException { + try (Stream paths = Files.walk(outputPath)) { + return paths.filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().endsWith(".vortex")) + .mapToLong(path -> { + try { + return Files.size(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .sum(); + } + } + + private Path writeRows(int numRows, String name, int partitions) throws IOException { + Path outputPath = tempDir.resolve(name); + Dataset df = spark.range(0, numRows) + .selectExpr("cast(id as int) as id", "concat('value_', cast(id as string)) as value"); + + df.repartition(partitions) + .write() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .mode(SaveMode.Overwrite) + .save(); + return outputPath; + } + + @AfterEach + public void cleanupTempFiles() throws IOException { + if (tempDir != null && Files.exists(tempDir)) { + try (Stream paths = Files.walk(tempDir)) { + paths.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + System.err.println("Failed to delete: " + path); + } + }); + } + } + } +} diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index a1fa15a1b40..8b65ed4f17c 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -590,7 +590,7 @@ impl TableFunction for T { if children.len() != 1 { return None; } - let MultiLayoutChild::Opened(reader) = &children[0] else { + let MultiLayoutChild::Opened { reader, .. } = &children[0] else { return None; }; let stats_sets = match reader.as_any().downcast_ref::() { diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index bb2a9b00d81..d128999483d 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -12,6 +12,8 @@ pub fn vortex_file::multi::MultiFileDataSource::new(vortex_session::VortexSessio pub fn vortex_file::multi::MultiFileDataSource::with_glob(self, impl core::convert::Into, core::option::Option) -> Self +pub fn vortex_file::multi::MultiFileDataSource::with_listing(self, vortex_io::filesystem::FileListing, vortex_io::filesystem::FileSystemRef) -> Self + pub fn vortex_file::multi::MultiFileDataSource::with_open_options(self, impl core::ops::function::Fn(vortex_file::VortexOpenOptions) -> vortex_file::VortexOpenOptions + core::marker::Send + core::marker::Sync + 'static) -> Self pub mod vortex_file::segments diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index 78c18ea438e..1e2ba3523c3 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -60,6 +60,9 @@ pub struct MultiFileDataSource { /// List of (glob, optional filesystem) pairs to resolve. /// When the filesystem is None, a local filesystem will be created in build(). glob_sources: Vec<(String, Option)>, + /// Pre-resolved file listings that skip glob expansion. The caller is responsible for + /// supplying the [`FileListing::size`] when stats reporting matters. + listing_sources: Vec<(FileListing, FileSystemRef)>, open_options_fn: Arc VortexOpenOptions + Send + Sync>, } @@ -69,6 +72,7 @@ impl MultiFileDataSource { Self { session, glob_sources: Vec::new(), + listing_sources: Vec::new(), open_options_fn: Arc::new(|opts| opts), } } @@ -94,6 +98,23 @@ impl MultiFileDataSource { self } + /// Add a pre-resolved file listing. + /// + /// Use this when the caller already knows the exact file path and (optionally) its size, + /// avoiding the glob expansion done by [`Self::with_glob`]. Supplying + /// [`FileListing::size`] is required for [`DataSource::byte_size`] to surface a contribution + /// from this file; otherwise the source size remains unknown for this file and the + /// data-source-level total is extrapolated from the files that do report a size. + pub fn with_listing(mut self, listing: FileListing, fs: FileSystemRef) -> Self { + let FileListing { path, size } = listing; + let listing = FileListing { + path: path.trim_start_matches('/').to_string(), + size, + }; + self.listing_sources.push((listing, fs)); + self + } + /// Customize [`VortexOpenOptions`] applied to each file. /// /// Use this to configure segment caches, metrics registries, or other per-file options. @@ -110,8 +131,10 @@ impl MultiFileDataSource { /// Discovers files via glob, opens the first file eagerly to determine the schema, /// and creates lazy factories for the remaining files. pub async fn build(self) -> VortexResult { - if self.glob_sources.is_empty() { - vortex_bail!("MultiFileDataSource requires at least one glob pattern"); + if self.glob_sources.is_empty() && self.listing_sources.is_empty() { + vortex_bail!( + "MultiFileDataSource requires at least one glob pattern or pre-resolved listing" + ); } // Create local filesystem lazily if needed (only if any glob lacks a filesystem). @@ -139,6 +162,7 @@ impl MultiFileDataSource { all_files.push((file, Arc::clone(&fs))); } } + all_files.extend(self.listing_sources); if all_files.is_empty() { let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect(); @@ -155,6 +179,8 @@ impl MultiFileDataSource { let first_file = open_file(first_fs, first_file_listing, &self.session, open_fn).await?; let first_reader = layout_reader_with_stats(&first_file)?; + let byte_sizes: Vec> = all_files.iter().map(|(file, _)| file.size).collect(); + let factories: Vec> = all_files[1..] .iter() .map(|(file, fs)| { @@ -167,7 +193,12 @@ impl MultiFileDataSource { }) .collect(); - let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session); + let inner = MultiLayoutDataSource::new_with_first( + first_reader, + factories, + byte_sizes, + &self.session, + ); debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); diff --git a/vortex-jni/src/data_source.rs b/vortex-jni/src/data_source.rs index d62b583a4a9..f29862b6c48 100644 --- a/vortex-jni/src/data_source.rs +++ b/vortex-jni/src/data_source.rs @@ -16,6 +16,8 @@ use std::path::PathBuf; use std::path::absolute; use std::sync::Arc; +use futures::StreamExt; +use futures::stream; use jni::EnvUnowned; use jni::objects::JClass; use jni::objects::JLongArray; @@ -28,6 +30,7 @@ use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::stats::Precision; use vortex::file::multi::MultiFileDataSource; +use vortex::io::filesystem::FileListing; use vortex::io::filesystem::FileSystemRef; use vortex::io::runtime::BlockingRuntime; use vortex::io::session::RuntimeSessionExt; @@ -41,6 +44,10 @@ use crate::file::extract_properties; use crate::object_store::object_store_fs; use crate::session::session_ref; +/// In-flight size lookups while resolving exact paths to file listings. Balances HEAD +/// throughput on remote stores against connection overhead. +const SIZE_LOOKUP_CONCURRENCY: usize = 16; + /// Wraps an `Arc` behind a single pointer. pub(crate) struct NativeDataSource { inner: DataSourceRef, @@ -103,14 +110,49 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_open( } } - let mut builder = MultiFileDataSource::new(session.clone()); + // Split inputs into glob patterns (which fs.glob() expands via list(), capturing sizes + // automatically) and exact paths (which are resolved one-by-one with a HEAD-style size + // lookup so the data source can report total bytes for Spark-style stats). + let mut glob_inputs: Vec<(String, FileSystemRef)> = Vec::new(); + let mut exact_inputs: Vec<(String, FileSystemRef)> = Vec::new(); for glob_url in &glob_urls { let base = base_url(glob_url); let fs = fs_cache .get(&base) .cloned() .unwrap_or_else(|| unreachable!("fs cached for every base url")); - builder = builder.with_glob(glob_url.path(), Some(fs)); + let path = glob_url.path().to_string(); + if path.contains(['*', '?', '[']) { + glob_inputs.push((path, fs)); + } else { + exact_inputs.push((path, fs)); + } + } + + let resolved_listings: Vec<(FileListing, FileSystemRef)> = if exact_inputs.is_empty() { + Vec::new() + } else { + RUNTIME.block_on(async { + stream::iter(exact_inputs) + .map(|(path, fs)| async move { + let size = match fs.open_read(&path).await { + Ok(source) => source.size().await.ok(), + Err(_) => None, + }; + (FileListing { path, size }, fs) + }) + .buffer_unordered(SIZE_LOOKUP_CONCURRENCY) + .collect::>() + .await + }) + }; + + let mut builder = MultiFileDataSource::new(session.clone()); + for (glob, fs) in glob_inputs { + builder = builder.with_glob(glob, Some(fs)); + } + for (listing, fs) in resolved_listings { + builder = builder.with_listing(listing, fs); } let inner = RUNTIME @@ -211,6 +253,27 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_rowCount( }); } +/// Write the byte size into the two-slot jlong pair `out`: +/// `out[0]` receives the size in bytes (0 when unknown), `out[1]` the precision (0=unknown, 1=estimate, 2=exact). +#[unsafe(no_mangle)] +pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_byteSize( + mut env: EnvUnowned, + _class: JClass, + pointer: jlong, + out: JLongArray, +) { + try_or_throw(&mut env, |env| { + let ds = unsafe { NativeDataSource::from_ptr(pointer) }; + let (bytes, precision) = match ds.inner.byte_size() { + Some(Precision::Exact(b)) => (b as jlong, 2), + Some(Precision::Inexact(b)) => (b as jlong, 1), + None => (0, 0), + }; + out.set_region(env, 0, &[bytes, precision])?; + Ok(()) + }); +} + #[cfg(test)] mod tests { use super::*; diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index 1c6ae70dd6f..e39fdadce2f 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -990,9 +990,21 @@ pub mod vortex_layout::scan::multi pub enum vortex_layout::scan::multi::MultiLayoutChild -pub vortex_layout::scan::multi::MultiLayoutChild::Deferred(alloc::sync::Arc) +pub vortex_layout::scan::multi::MultiLayoutChild::Deferred -pub vortex_layout::scan::multi::MultiLayoutChild::Opened(vortex_layout::LayoutReaderRef) +pub vortex_layout::scan::multi::MultiLayoutChild::Deferred::byte_size: core::option::Option + +pub vortex_layout::scan::multi::MultiLayoutChild::Deferred::factory: alloc::sync::Arc + +pub vortex_layout::scan::multi::MultiLayoutChild::Opened + +pub vortex_layout::scan::multi::MultiLayoutChild::Opened::byte_size: core::option::Option + +pub vortex_layout::scan::multi::MultiLayoutChild::Opened::reader: vortex_layout::LayoutReaderRef + +impl vortex_layout::scan::multi::MultiLayoutChild + +pub fn vortex_layout::scan::multi::MultiLayoutChild::byte_size(&self) -> core::option::Option pub struct vortex_layout::scan::multi::MultiLayoutDataSource @@ -1000,14 +1012,16 @@ impl vortex_layout::scan::multi::MultiLayoutDataSource pub fn vortex_layout::scan::multi::MultiLayoutDataSource::children(&self) -> &alloc::vec::Vec -pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_deferred(vortex_array::dtype::DType, alloc::vec::Vec>, &vortex_session::VortexSession) -> Self +pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_deferred(vortex_array::dtype::DType, alloc::vec::Vec>, alloc::vec::Vec>, &vortex_session::VortexSession) -> Self -pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_with_first(vortex_layout::LayoutReaderRef, alloc::vec::Vec>, &vortex_session::VortexSession) -> Self +pub fn vortex_layout::scan::multi::MultiLayoutDataSource::new_with_first(vortex_layout::LayoutReaderRef, alloc::vec::Vec>, alloc::vec::Vec>, &vortex_session::VortexSession) -> Self pub fn vortex_layout::scan::multi::MultiLayoutDataSource::with_concurrency(self, usize) -> Self impl vortex_scan::DataSource for vortex_layout::scan::multi::MultiLayoutDataSource +pub fn vortex_layout::scan::multi::MultiLayoutDataSource::byte_size(&self) -> core::option::Option> + pub fn vortex_layout::scan::multi::MultiLayoutDataSource::deserialize_partition(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult pub fn vortex_layout::scan::multi::MultiLayoutDataSource::dtype(&self) -> &vortex_array::dtype::DType diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 38503c77a63..00d2d5af056 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -32,6 +32,7 @@ use async_trait::async_trait; use futures::FutureExt; use futures::StreamExt; use futures::stream; +use itertools::Itertools; use tracing::Instrument; use vortex_array::dtype::DType; use vortex_array::dtype::FieldPath; @@ -87,26 +88,68 @@ pub struct MultiLayoutDataSource { } pub enum MultiLayoutChild { - Opened(LayoutReaderRef), - Deferred(Arc), + Opened { + reader: LayoutReaderRef, + /// On-storage file size in bytes, if known from the listing metadata. + byte_size: Option, + }, + Deferred { + factory: Arc, + /// On-storage file size in bytes, if known from the listing metadata. + byte_size: Option, + }, +} + +impl MultiLayoutChild { + /// On-storage file size in bytes for this child, if known. + pub fn byte_size(&self) -> Option { + match self { + MultiLayoutChild::Opened { byte_size, .. } => *byte_size, + MultiLayoutChild::Deferred { byte_size, .. } => *byte_size, + } + } } impl MultiLayoutDataSource { /// Creates a multi-layout data source with the first reader pre-opened. /// /// The first reader determines the dtype. Remaining readers are opened lazily during - /// scanning via their factories. + /// scanning via their factories. `byte_sizes` carries the on-storage file size in bytes for + /// each child (first followed by remaining); pass `None` for entries where the size is + /// unknown. Must be empty or have length `1 + remaining.len()`. pub fn new_with_first( first: LayoutReaderRef, remaining: Vec>, + byte_sizes: Vec>, session: &VortexSession, ) -> Self { let dtype = first.dtype().clone(); let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); - let mut children = Vec::with_capacity(1 + remaining.len()); - children.push(MultiLayoutChild::Opened(first)); - children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred)); + let total = 1 + remaining.len(); + let mut sizes = byte_sizes; + if sizes.is_empty() { + sizes = vec![None; total]; + } + debug_assert_eq!( + sizes.len(), + total, + "byte_sizes length must match the number of children" + ); + + let mut children = Vec::with_capacity(total); + let mut sizes_iter = sizes.into_iter(); + let first_size = sizes_iter.next().unwrap_or(None); + children.push(MultiLayoutChild::Opened { + reader: first, + byte_size: first_size, + }); + children.extend( + remaining + .into_iter() + .zip_eq(sizes_iter) + .map(|(factory, byte_size)| MultiLayoutChild::Deferred { factory, byte_size }), + ); Self { dtype, @@ -120,20 +163,34 @@ impl MultiLayoutDataSource { /// /// The dtype must be provided externally since there is no pre-opened reader to infer it /// from. This avoids eagerly opening any file when the schema is already known (e.g. from - /// a catalog or a prior scan). + /// a catalog or a prior scan). `byte_sizes` carries the on-storage file size in bytes for + /// each factory; pass `None` for entries where the size is unknown. Must be empty or have + /// the same length as `factories`. pub fn new_deferred( dtype: DType, factories: Vec>, + byte_sizes: Vec>, session: &VortexSession, ) -> Self { let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); + let mut sizes = byte_sizes; + if sizes.is_empty() { + sizes = vec![None; factories.len()]; + } + debug_assert_eq!( + sizes.len(), + factories.len(), + "byte_sizes length must match the number of factories" + ); + Self { dtype, session: session.clone(), children: factories .into_iter() - .map(MultiLayoutChild::Deferred) + .zip_eq(sizes) + .map(|(factory, byte_size)| MultiLayoutChild::Deferred { factory, byte_size }) .collect(), concurrency, } @@ -166,11 +223,11 @@ impl DataSource for MultiLayoutDataSource { for child in &self.children { match child { - MultiLayoutChild::Opened(reader) => { + MultiLayoutChild::Opened { reader, .. } => { opened_count += 1; sum = sum.saturating_add(reader.row_count()); } - MultiLayoutChild::Deferred(_) => { + MultiLayoutChild::Deferred { .. } => { deferred_count += 1; } } @@ -192,6 +249,34 @@ impl DataSource for MultiLayoutDataSource { } } + fn byte_size(&self) -> Option> { + let total_count = self.children.len() as u64; + if total_count == 0 { + return Some(Precision::exact(0u64)); + } + + let mut sum: u64 = 0; + let mut known_count: u64 = 0; + for child in &self.children { + if let Some(size) = child.byte_size() { + sum = sum.saturating_add(size); + known_count += 1; + } + } + + if known_count == 0 { + return None; + } + + if known_count == total_count { + Some(Precision::exact(sum)) + } else { + let avg = sum / known_count; + let extrapolated = avg.saturating_mul(total_count); + Some(Precision::inexact(extrapolated)) + } + } + fn deserialize_partition( &self, _data: &[u8], @@ -206,8 +291,10 @@ impl DataSource for MultiLayoutDataSource { for child in &self.children { match child { - MultiLayoutChild::Opened(reader) => ready.push_back(Arc::clone(reader)), - MultiLayoutChild::Deferred(factory) => deferred.push_back(Arc::clone(factory)), + MultiLayoutChild::Opened { reader, .. } => ready.push_back(Arc::clone(reader)), + MultiLayoutChild::Deferred { factory, .. } => { + deferred.push_back(Arc::clone(factory)) + } } }