Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,57 @@ public OptionalLong asOptional() {
}
}

/**
* Sum of the on-storage byte sizes of all files included in this data source along with the precision of that
* estimate. Mirrors the Rust {@code Option<Precision<u64>>} returned by {@code DataSource::byte_size}:
* {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return
* sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and
* {@link ByteSize.Exact} when every file has a known size.
*/
public ByteSize byteSize() {
long[] out = new long[2];
NativeDataSource.byteSize(pointer, out);
return switch ((int) out[1]) {
case 1 -> new ByteSize.Estimate(out[0]);
case 2 -> new ByteSize.Exact(out[0]);
default -> ByteSize.Unknown.INSTANCE;
};
}

/** Precision-aware byte size. See {@link #byteSize()}. */
public sealed interface ByteSize {
/** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */
OptionalLong asOptional();

/** Byte size is not known. */
final class Unknown implements ByteSize {
public static final Unknown INSTANCE = new Unknown();

private Unknown() {}

@Override
public OptionalLong asOptional() {
return OptionalLong.empty();
}
}

/** Estimated byte size; the actual value may differ. */
record Estimate(long value) implements ByteSize {
@Override
public OptionalLong asOptional() {
return OptionalLong.of(value);
}
}

/** Exact byte size. */
record Exact(long value) implements ByteSize {
@Override
public OptionalLong asOptional() {
return OptionalLong.of(value);
}
}
}

/** Submit a scan. */
public Scan scan(ScanOptions options) {
Objects.requireNonNull(options, "options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ private NativeDataSource() {}
* {@code 1=estimate}, {@code 2=exact}.
*/
public static native void rowCount(long pointer, long[] out);

/**
* Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source.
* Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}.
*/
public static native void byteSize(long pointer, long[] out);
}
119 changes: 117 additions & 2 deletions java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,46 @@

package dev.vortex.spark.read;

import dev.vortex.api.DataSource;
import dev.vortex.api.Session;
import dev.vortex.jni.NativeFiles;
import dev.vortex.spark.VortexSparkSession;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;

/** Spark V2 {@link Scan} over a table of Vortex files. */
public final class VortexScan implements Scan {
/**
* Spark V2 {@link Scan} over a table of Vortex files.
*
* <p>Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a
* Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by
* {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor
* and scaling by the pushed read schema's default size relative to the full table schema's default size. When the
* listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is
* applied.
*/
public final class VortexScan implements Scan, SupportsReportStatistics {

private final List<String> paths;
private final List<Column> tableColumns;
private final List<Column> readColumns;
private final Map<String, String> formatOptions;

private volatile Statistics cachedStatistics;

/**
* Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing
* immutable collections; the constructor does not copy.
Expand All @@ -26,7 +51,24 @@ public final class VortexScan implements Scan {
* @param readColumns the list of columns to read from the files
*/
public VortexScan(List<String> paths, List<Column> readColumns, Map<String, String> formatOptions) {
this(paths, readColumns, readColumns, formatOptions);
}

/**
* Creates a new VortexScan for the specified file paths, table columns, and requested read columns. The caller is
* responsible for passing immutable collections; the constructor does not copy.
*
* @param paths the list of Vortex file paths to scan
* @param tableColumns the full table columns before projection pushdown
* @param readColumns the columns Spark requested after projection pushdown
*/
public VortexScan(
List<String> paths,
List<Column> tableColumns,
List<Column> readColumns,
Map<String, String> formatOptions) {
this.paths = paths;
this.tableColumns = tableColumns;
this.readColumns = readColumns;
this.formatOptions = formatOptions;
}
Expand Down Expand Up @@ -72,4 +114,77 @@ public Batch toBatch() {
public ColumnarSupportMode columnarSupportMode() {
return ColumnarSupportMode.SUPPORTED;
}

/**
* Returns statistics for this scan.
*
* <p>Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the
* data source (sum of file-footer row counts; extrapolated from the first opened file when other files are
* deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem
* listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full
* table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no
* file size is known at all the value is left empty so Spark falls back to its default heuristic.
*
* @return statistics with row-count and Spark scan-size estimates
*/
@Override
public Statistics estimateStatistics() {
Statistics local = cachedStatistics;
if (local != null) {
return local;
}
synchronized (this) {
if (cachedStatistics == null) {
cachedStatistics = computeStatistics();
}
return cachedStatistics;
}
}

private Statistics computeStatistics() {
Session session = VortexSparkSession.get(formatOptions);
// Expand directory paths to concrete files the way VortexBatchExec does, so we use the
// same per-path resolution end-to-end.
List<String> resolvedPaths = paths.stream()
.flatMap(path -> path.endsWith(".vortex")
? Stream.of(path)
: NativeFiles.listFiles(session, path, formatOptions).stream())
.collect(Collectors.toList());

if (resolvedPaths.isEmpty()) {
return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty());
}

DataSource source = DataSource.open(session, resolvedPaths, formatOptions);
return new VortexStatistics(
source.rowCount().asOptional(),
scaleSizeInBytes(source.byteSize().asOptional()));
}

private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) {
if (fileBytes.isEmpty()) {
return OptionalLong.empty();
}

StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0]));
StructType readSchema = readSchema();
int tableDefaultSize = tableSchema.defaultSize();
if (tableDefaultSize <= 0) {
return fileBytes;
}

double scaled = SQLConf.get().fileCompressionFactor()
* fileBytes.getAsLong()
/ tableDefaultSize
* readSchema.defaultSize();
return OptionalLong.of((long) scaled);
}

private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics {

@Override
public Map<NamedReference, ColumnStatistics> columnStats() {
return new HashMap<>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
/** Spark V2 {@link ScanBuilder} for table scans over Vortex files. */
public final class VortexScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {
private final ImmutableList.Builder<String> paths;
private final List<Column> columns;
private final List<Column> tableColumns;
private final List<Column> readColumns;
private final Map<String, String> formatOptions;

/** Creates a new VortexScanBuilder with empty paths and columns. */
public VortexScanBuilder(Map<String, String> formatOptions) {
this.paths = ImmutableList.builder();
this.columns = new ArrayList<>();
this.tableColumns = new ArrayList<>();
this.readColumns = new ArrayList<>();
this.formatOptions = Map.copyOf(formatOptions);
}

Expand All @@ -47,7 +49,8 @@ public VortexScanBuilder addPath(String path) {
* @return this builder for method chaining
*/
public VortexScanBuilder addColumn(Column column) {
this.columns.add(column);
this.tableColumns.add(column);
this.readColumns.add(column);
return this;
}

Expand All @@ -70,7 +73,7 @@ public VortexScanBuilder addAllPaths(Iterable<String> paths) {
*/
public VortexScanBuilder addAllColumns(Iterable<Column> columns) {
for (Column column : columns) {
this.columns.add(column);
addColumn(column);
}
return this;
}
Expand All @@ -89,7 +92,7 @@ public Scan build() {
// Allow empty columns for operations like count() that don't need actual column data
// If no columns are specified, we'll read the minimal schema needed

return new VortexScan(paths, List.copyOf(this.columns), this.formatOptions);
return new VortexScan(paths, List.copyOf(this.tableColumns), List.copyOf(this.readColumns), this.formatOptions);
}

/**
Expand All @@ -103,9 +106,9 @@ public Scan build() {
@Override
public void pruneColumns(StructType requiredSchema) {
// TODO(aduffy): support deeply nested schema prunes
columns.clear();
readColumns.clear();
for (StructField field : requiredSchema.fields()) {
columns.add(Column.create(field.name(), field.dataType()));
readColumns.add(Column.create(field.name(), field.dataType()));
}
}
}
Loading
Loading