diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 05cc17f34e05..14eebf948563 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -867,6 +867,30 @@
Integer |
Level threshold of lookup to generate remote lookup files. Level files below this threshold will not generate remote lookup files. |
+
+ map.shredding.columns |
+ (none) |
+ String |
+ Comma-separated MAP<STRING, T> column paths to write with map shredding. |
+
+
+ map.shredding.maxInferBufferMemory |
+ 32 mb |
+ MemorySize |
+ Maximum memory used to buffer rows while inferring map shredding keys. |
+
+
+ map.shredding.maxInferBufferRow |
+ 10000 |
+ Integer |
+ Maximum number of rows to buffer for map shredding key inference. |
+
+
+ map.shredding.maxKeys |
+ 64 |
+ Integer |
+ Maximum number of hot keys extracted for each map. |
+
manifest.compression |
"zstd" |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 75eb4744dbf7..40f44d605d43 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -436,6 +436,33 @@ public InlineElement getDescription() {
.defaultValue(4096)
.withDescription("Maximum number of rows to buffer for schema inference.");
+ public static final ConfigOption MAP_SHREDDING_COLUMNS =
+ key("map.shredding.columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Comma-separated MAP column paths to write with map shredding.");
+
+ public static final ConfigOption MAP_SHREDDING_MAX_KEYS =
+ key("map.shredding.maxKeys")
+ .intType()
+ .defaultValue(64)
+ .withDescription("Maximum number of hot keys extracted for each map.");
+
+ public static final ConfigOption MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY =
+ key("map.shredding.maxInferBufferMemory")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(32))
+ .withDescription(
+ "Maximum memory used to buffer rows while inferring map shredding keys.");
+
+ public static final ConfigOption MAP_SHREDDING_MAX_INFER_BUFFER_ROW =
+ key("map.shredding.maxInferBufferRow")
+ .intType()
+ .defaultValue(10000)
+ .withDescription(
+ "Maximum number of rows to buffer for map shredding key inference.");
+
public static final ConfigOption MANIFEST_FORMAT =
key("manifest.format")
.stringType()
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingKeyExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingKeyExtractor.java
new file mode 100644
index 000000000000..580bd2531a74
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingKeyExtractor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.InternalRowToSizeVisitor;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/** Extract file-level hot keys for map shredding columns. */
+public class MapShreddingKeyExtractor {
+
+ private final List columns;
+ private final InternalRowSerializer serializer;
+ private final int maxKeys;
+ private final long memoryLimit;
+ private final int rowLimit;
+ private final List bufferedRows;
+ private final Map> keyValueSizes;
+ private final List> rowFieldSizeVisitors;
+
+ private boolean finished;
+ private long currentBufferedSize;
+
+ public MapShreddingKeyExtractor(RowType rowType, Options options) {
+ this.columns =
+ MapShreddingUtils.resolveColumns(
+ rowType,
+ MapShreddingUtils.parseColumnPaths(
+ options.get(CoreOptions.MAP_SHREDDING_COLUMNS)));
+ this.serializer = new InternalRowSerializer(rowType);
+ this.maxKeys = options.get(CoreOptions.MAP_SHREDDING_MAX_KEYS);
+ this.memoryLimit =
+ options.get(CoreOptions.MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY).getBytes();
+ this.rowLimit = options.get(CoreOptions.MAP_SHREDDING_MAX_INFER_BUFFER_ROW);
+ this.bufferedRows = new ArrayList<>();
+ this.keyValueSizes = new LinkedHashMap<>();
+ this.rowFieldSizeVisitors = createRowFieldSizeVisitors(rowType);
+
+ if (columns.isEmpty() || maxKeys <= 0 || memoryLimit <= 0 || rowLimit <= 0) {
+ this.finished = true;
+ } else {
+ columns.forEach(column -> keyValueSizes.put(column.path(), new LinkedHashMap<>()));
+ }
+ }
+
+ public boolean finished() {
+ return finished;
+ }
+
+ public List bufferedRows() {
+ return bufferedRows;
+ }
+
+ public void add(InternalRow row) {
+ if (finished) {
+ throw new IllegalStateException("MapShreddingKeyExtractor is already finished.");
+ }
+
+ InternalRow copied = serializer.copy(row);
+ bufferedRows.add(copied);
+ currentBufferedSize += estimateRowSize(copied);
+
+ for (MapShreddingUtils.ResolvedMapShreddingColumn column : columns) {
+ InternalMap map = MapShreddingUtils.extractMap(copied, column);
+ if (map == null) {
+ continue;
+ }
+
+ InternalArray keyArray = map.keyArray();
+ InternalArray valueArray = map.valueArray();
+ BiFunction valueSizer =
+ column.valueType().accept(new InternalRowToSizeVisitor());
+ Map sizes = keyValueSizes.get(column.path());
+ for (int i = 0; i < map.size(); i++) {
+ if (keyArray.isNullAt(i)) {
+ continue;
+ }
+ BinaryString key = keyArray.getString(i);
+ long valueSize = valueSizer.apply(valueArray, i);
+ sizes.merge(key.toString(), valueSize, Long::sum);
+ }
+ }
+
+ if (bufferedRows.size() >= rowLimit || currentBufferedSize >= memoryLimit) {
+ populate();
+ }
+ }
+
+ public Map> populate() {
+ Map> result = currentDynamicKeys();
+ finished = true;
+ return result;
+ }
+
+ public Map> finish() {
+ return finished ? currentDynamicKeys() : populate();
+ }
+
+ public Map> currentDynamicKeys() {
+ Map> result = new LinkedHashMap<>();
+ keyValueSizes.forEach(
+ (path, sizes) ->
+ result.put(
+ path,
+ sizes.entrySet().stream()
+ .sorted(
+ Map.Entry.comparingByValue()
+ .reversed())
+ .limit(maxKeys)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList())));
+ return result;
+ }
+
+ private long estimateRowSize(InternalRow row) {
+ long size = 0L;
+ for (int i = 0; i < rowFieldSizeVisitors.size(); i++) {
+ size += rowFieldSizeVisitors.get(i).apply(row, i);
+ }
+ return size;
+ }
+
+ private List> createRowFieldSizeVisitors(
+ RowType rowType) {
+ List> visitors = new ArrayList<>();
+ for (DataType fieldType : rowType.getFieldTypes()) {
+ BiFunction visitor =
+ fieldType.accept(new InternalRowToSizeVisitor());
+ visitors.add((row, pos) -> visitor.apply(row, pos));
+ }
+ return visitors;
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingUtils.java
new file mode 100644
index 000000000000..9ae541f1cde5
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingUtils.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utilities shared by map shredding writer and reader. */
+public class MapShreddingUtils {
+
+ public static final String DYNAMIC_VALUE_PREFIX = "value_";
+ public static final String DYNAMIC_COLUMN_NAME = "dynamic_column";
+ public static final String FOOTER_METADATA_PREFIX = "parquet.meta.dynamic.column.map.keys.of.";
+
+ private MapShreddingUtils() {}
+
+ public static boolean isMapShreddingEnabled(Options options) {
+ String configured = options.get(CoreOptions.MAP_SHREDDING_COLUMNS);
+ return configured != null && !configured.trim().isEmpty();
+ }
+
+ public static List parseColumnPaths(@Nullable String configuredColumns) {
+ if (configuredColumns == null || configuredColumns.trim().isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return Arrays.stream(configuredColumns.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ public static List resolveColumns(
+ RowType rowType, List paths) {
+ List resolved = new ArrayList<>(paths.size());
+ for (String path : paths) {
+ resolved.add(resolveColumn(rowType, path));
+ }
+ return resolved;
+ }
+
+ public static ResolvedMapShreddingColumn resolveColumn(RowType rowType, String path) {
+ String[] segments = path.split("\\.");
+ if (segments.length == 0) {
+ throw new IllegalArgumentException("Invalid map shredding column path: " + path);
+ }
+
+ RowType current = rowType;
+ int[] positions = new int[segments.length];
+ int[] nestedRowFieldCounts = new int[Math.max(segments.length - 1, 0)];
+ for (int i = 0; i < segments.length; i++) {
+ String segment = segments[i];
+ if (!current.containsField(segment)) {
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "Cannot resolve map shredding column '%s' from row type '%s'.",
+ path,
+ rowType));
+ }
+
+ positions[i] = current.getFieldIndex(segment);
+ DataField field = current.getField(segment);
+ DataType fieldType = field.type();
+ if (i < segments.length - 1) {
+ if (!(fieldType instanceof RowType)) {
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "Map shredding column '%s' expects ROW on intermediate path '%s' but found '%s'.",
+ path,
+ segment,
+ fieldType));
+ }
+ nestedRowFieldCounts[i] = ((RowType) fieldType).getFieldCount();
+ current = (RowType) fieldType;
+ continue;
+ }
+
+ if (!(fieldType instanceof MapType)) {
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "Map shredding column '%s' must be MAP but found '%s'.",
+ path,
+ fieldType));
+ }
+
+ MapType mapType = (MapType) fieldType;
+ if (!isStringType(mapType.getKeyType())) {
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "Map shredding column '%s' must use STRING key type but found '%s'.",
+ path,
+ mapType.getKeyType()));
+ }
+ return new ResolvedMapShreddingColumn(
+ path, segments, positions, nestedRowFieldCounts, mapType);
+ }
+ throw new IllegalArgumentException("Invalid map shredding column path: " + path);
+ }
+
+ public static boolean isMapShreddingPath(Map> dynamicKeys, String path) {
+ List keys = dynamicKeys.get(path);
+ return keys != null && !keys.isEmpty();
+ }
+
+ public static String sidecarColumnName(String mapColumn, int keyOrdinal) {
+ return DYNAMIC_COLUMN_NAME
+ + "_"
+ + sanitize(mapColumn)
+ + "_"
+ + DYNAMIC_VALUE_PREFIX
+ + keyOrdinal;
+ }
+
+ public static String path(String parentPath, String fieldName) {
+ return parentPath == null || parentPath.isEmpty()
+ ? fieldName
+ : parentPath + "." + fieldName;
+ }
+
+ public static Map toFooterMetadata(Map> dynamicKeys) {
+ Map metadata = new LinkedHashMap<>();
+ dynamicKeys.forEach(
+ (path, keys) -> {
+ if (!keys.isEmpty()) {
+ metadata.put(FOOTER_METADATA_PREFIX + path, String.join(",", keys));
+ }
+ });
+ return metadata;
+ }
+
+ public static Map> fromFooterMetadata(Map keyValueMeta) {
+ Map> result = new LinkedHashMap<>();
+ keyValueMeta.forEach(
+ (key, value) -> {
+ if (key.startsWith(FOOTER_METADATA_PREFIX)) {
+ String path = key.substring(FOOTER_METADATA_PREFIX.length());
+ result.put(path, parseColumnPaths(value));
+ }
+ });
+ return result;
+ }
+
+ @Nullable
+ public static InternalMap extractMap(InternalRow row, ResolvedMapShreddingColumn column) {
+ InternalRow current = row;
+ for (int i = 0; i < column.fieldPositions.length - 1; i++) {
+ int fieldPos = column.fieldPositions[i];
+ if (current.isNullAt(fieldPos)) {
+ return null;
+ }
+ current = current.getRow(fieldPos, column.nestedRowFieldCounts[i]);
+ }
+ int mapPos = column.fieldPositions[column.fieldPositions.length - 1];
+ if (current.isNullAt(mapPos)) {
+ return null;
+ }
+ return current.getMap(mapPos);
+ }
+
+ private static String sanitize(String value) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < value.length(); i++) {
+ char c = value.charAt(i);
+ if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) {
+ builder.append(c);
+ } else {
+ builder.append('_');
+ }
+ }
+ return builder.toString();
+ }
+
+ private static boolean isStringType(DataType dataType) {
+ DataTypeRoot root = dataType.getTypeRoot();
+ return root == DataTypeRoot.VARCHAR || root == DataTypeRoot.CHAR;
+ }
+
+ /** Resolved map shredding logical column path. */
+ public static class ResolvedMapShreddingColumn {
+
+ private final String path;
+ private final String[] fieldNames;
+ private final int[] fieldPositions;
+ private final int[] nestedRowFieldCounts;
+ private final MapType mapType;
+
+ public ResolvedMapShreddingColumn(
+ String path,
+ String[] fieldNames,
+ int[] fieldPositions,
+ int[] nestedRowFieldCounts,
+ MapType mapType) {
+ this.path = path;
+ this.fieldNames = fieldNames;
+ this.fieldPositions = fieldPositions;
+ this.nestedRowFieldCounts = nestedRowFieldCounts;
+ this.mapType = mapType;
+ }
+
+ public String path() {
+ return path;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public int[] fieldPositions() {
+ return fieldPositions;
+ }
+
+ public int[] nestedRowFieldCounts() {
+ return nestedRowFieldCounts;
+ }
+
+ public MapType mapType() {
+ return mapType;
+ }
+
+ public DataType valueType() {
+ return mapType.getValueType();
+ }
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index 9ccf88bd17ac..7584895dd0d4 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -69,13 +69,18 @@ public FormatReaderFactory createReaderFactory(
RowType projectedRowType,
@Nullable List filters) {
return new ParquetReaderFactory(
- options, projectedRowType, readBatchSize, ParquetFilters.convert(filters));
+ options,
+ dataSchemaRowType,
+ projectedRowType,
+ readBatchSize,
+ ParquetFilters.convert(filters));
}
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
ParquetWriterFactory baseFactory =
- new ParquetWriterFactory(new RowDataParquetBuilder(type, options));
+ new ParquetWriterFactory(
+ new RowDataParquetBuilder(type, options, formatContext.options()));
// Wrap with variant inference decorator
return new VariantInferenceWriterFactory(
baseFactory, new VariantInferenceConfig(type, formatContext.options()));
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 9a741a63ed73..a99f1096b427 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -56,6 +56,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -64,6 +65,9 @@
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.paimon.data.variant.VariantMetadataUtils.path;
+import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME;
+import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME;
+import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
@@ -98,6 +102,15 @@ public class ParquetReaderFactory implements FormatReaderFactory {
public ParquetReaderFactory(
Options conf, RowType readType, int batchSize, @Nullable FilterCompat.Filter filter) {
+ this(conf, readType, readType, batchSize, filter);
+ }
+
+ public ParquetReaderFactory(
+ Options conf,
+ RowType dataType,
+ RowType readType,
+ int batchSize,
+ @Nullable FilterCompat.Filter filter) {
this.conf = conf;
this.readFields = readType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
@@ -124,7 +137,10 @@ public FileRecordReader createReader(FormatReaderFactory.Context co
builder.build(),
context.selection());
MessageType fileSchema = reader.getFileMetaData().getSchema();
- RequestedSchema requestedSchema = getOrCreateRequestedSchema(fileSchema);
+ Map> dynamicMapKeys =
+ MapShreddingUtils.fromFooterMetadata(
+ reader.getFileMetaData().getKeyValueMetaData());
+ RequestedSchema requestedSchema = getOrCreateRequestedSchema(fileSchema, dynamicMapKeys);
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -147,26 +163,33 @@ public FileRecordReader createReader(FormatReaderFactory.Context co
context.fileIO());
}
- private RequestedSchema getOrCreateRequestedSchema(MessageType fileSchema) {
+ private RequestedSchema getOrCreateRequestedSchema(
+ MessageType fileSchema, Map> dynamicMapKeys) {
// clipParquetSchema and buildFieldsList are pure functions of (readFields, fileSchema).
// Cache the result keyed by fileSchema so that files sharing the same on-disk schema
// within this factory instance avoid redundant computation. Keying by fileSchema (rather
// than a simple "compute once" flag) correctly handles edge cases where different files
// read by the same factory instance may have different on-disk schemas, e.g. externally
// migrated Parquet files.
- return requestedSchemaCache.computeIfAbsent(fileSchema, this::createRequestedSchema);
+ if (!dynamicMapKeys.isEmpty()) {
+ return createRequestedSchema(fileSchema, dynamicMapKeys);
+ }
+ return requestedSchemaCache.computeIfAbsent(
+ fileSchema, schema -> createRequestedSchema(schema, dynamicMapKeys));
}
- private RequestedSchema createRequestedSchema(MessageType fileSchema) {
- MessageType rs = clipParquetSchema(fileSchema);
+ private RequestedSchema createRequestedSchema(
+ MessageType fileSchema, Map> dynamicMapKeys) {
+ MessageType rs = clipParquetSchema(fileSchema, dynamicMapKeys);
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(rs);
- List f = buildFieldsList(readFields, columnIO, rs);
+ List f = buildFieldsList(readFields, columnIO, rs, dynamicMapKeys);
return new RequestedSchema(rs, f);
}
/** Clips `parquetSchema` according to `fieldNames`. */
- private MessageType clipParquetSchema(GroupType parquetSchema) {
- Type[] types = new Type[readFields.length];
+ private MessageType clipParquetSchema(
+ GroupType parquetSchema, Map> dynamicMapKeys) {
+ List types = new ArrayList<>();
for (int i = 0; i < readFields.length; ++i) {
String fieldName = readFields[i].name();
if (!parquetSchema.containsField(fieldName)) {
@@ -174,18 +197,39 @@ private MessageType clipParquetSchema(GroupType parquetSchema) {
"{} does not exist in {}, will fill the field with null.",
fieldName,
parquetSchema);
- types[i] = ParquetSchemaConverter.convertToParquetType(readFields[i]);
+ types.add(ParquetSchemaConverter.convertToParquetType(readFields[i]));
} else {
Type parquetType = parquetSchema.getType(fieldName);
- types[i] = clipParquetType(readFields[i].type(), parquetType);
+ types.add(
+ clipParquetType(
+ readFields[i].type(), parquetType, fieldName, dynamicMapKeys));
+ if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, fieldName)) {
+ List keys = dynamicMapKeys.get(fieldName);
+ for (int keyOrdinal = 0; keyOrdinal < keys.size(); keyOrdinal++) {
+ String sidecarName =
+ MapShreddingUtils.sidecarColumnName(fieldName, keyOrdinal);
+ if (parquetSchema.containsField(sidecarName)) {
+ types.add(parquetSchema.getType(sidecarName));
+ }
+ }
+ }
}
}
- return Types.buildMessage().addFields(types).named(PAIMON_SCHEMA);
+ return Types.buildMessage().addFields(types.toArray(new Type[0])).named(PAIMON_SCHEMA);
}
/** Clips `parquetType` by `readType`. */
private Type clipParquetType(DataType readType, Type parquetType) {
+ return clipParquetType(
+ readType, parquetType, parquetType.getName(), Collections.emptyMap());
+ }
+
+ private Type clipParquetType(
+ DataType readType,
+ Type parquetType,
+ String path,
+ Map> dynamicMapKeys) {
switch (readType.getTypeRoot()) {
case ROW:
RowType rowType = (RowType) readType;
@@ -196,9 +240,21 @@ private Type clipParquetType(DataType readType, Type parquetType) {
List rowGroupFields = new ArrayList<>();
for (DataField field : rowType.getFields()) {
String fieldName = field.name();
+ String childPath = MapShreddingUtils.path(path, fieldName);
if (rowGroup.containsField(fieldName)) {
Type type = rowGroup.getType(fieldName);
- rowGroupFields.add(clipParquetType(field.type(), type));
+ rowGroupFields.add(
+ clipParquetType(field.type(), type, childPath, dynamicMapKeys));
+ if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, childPath)) {
+ List keys = dynamicMapKeys.get(childPath);
+ for (int keyOrdinal = 0; keyOrdinal < keys.size(); keyOrdinal++) {
+ String sidecarName =
+ MapShreddingUtils.sidecarColumnName(childPath, keyOrdinal);
+ if (rowGroup.containsField(sidecarName)) {
+ rowGroupFields.add(rowGroup.getType(sidecarName));
+ }
+ }
+ }
} else {
// todo: support nested field missing
throw new RuntimeException("field " + fieldName + " is missing");
@@ -217,8 +273,16 @@ private Type clipParquetType(DataType readType, Type parquetType) {
mapGroup.getRepetition(),
mapGroup.getName(),
mapGroup.getType(0).getName(),
- clipParquetType(mapType.getKeyType(), keyValueType.getLeft()),
- clipParquetType(mapType.getValueType(), keyValueType.getRight()));
+ clipParquetType(
+ mapType.getKeyType(),
+ keyValueType.getLeft(),
+ path + "." + MAP_KEY_NAME,
+ dynamicMapKeys),
+ clipParquetType(
+ mapType.getValueType(),
+ keyValueType.getRight(),
+ path + "." + MAP_VALUE_NAME,
+ dynamicMapKeys));
case ARRAY:
ArrayType arrayType = (ArrayType) readType;
GroupType arrayGroup = (GroupType) parquetType;
@@ -232,7 +296,10 @@ private Type clipParquetType(DataType readType, Type parquetType) {
int level = arrayGroup.getType(0) instanceof GroupType ? 3 : 2;
Type elementType =
clipParquetType(
- arrayType.getElementType(), parquetListElementType(arrayGroup));
+ arrayType.getElementType(),
+ parquetListElementType(arrayGroup),
+ path + "." + LIST_ELEMENT_NAME,
+ dynamicMapKeys);
if (level == 3) {
// In case that the name in middle level is not "list".
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 37a69fe9aebd..e77c67dc99d9 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -42,6 +42,7 @@
import org.apache.parquet.schema.Types;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
@@ -63,18 +64,75 @@ public static MessageType convertToParquetMessageType(RowType rowType) {
return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType));
}
+ public static MessageType convertToParquetMessageType(
+ RowType rowType, Map> dynamicMapKeys) {
+ if (dynamicMapKeys.isEmpty()) {
+ return convertToParquetMessageType(rowType);
+ }
+ return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType, dynamicMapKeys));
+ }
+
public static Type[] convertToParquetTypes(RowType rowType) {
return rowType.getFields().stream()
.map(ParquetSchemaConverter::convertToParquetType)
.toArray(Type[]::new);
}
+ public static Type[] convertToParquetTypes(
+ RowType rowType, Map> dynamicMapKeys) {
+ return convertToParquetTypes(rowType, dynamicMapKeys, "", 0);
+ }
+
+ private static Type[] convertToParquetTypes(
+ RowType rowType,
+ Map> dynamicMapKeys,
+ String parentPath,
+ int depth) {
+ List types = new java.util.ArrayList<>();
+ for (DataField field : rowType.getFields()) {
+ String path = MapShreddingUtils.path(parentPath, field.name());
+ types.add(
+ convertToParquetType(
+ field.name(), field.type(), field.id(), depth, path, dynamicMapKeys));
+ if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, path)) {
+ MapType mapType = (MapType) field.type();
+ List keys = dynamicMapKeys.get(path);
+ for (int i = 0; i < keys.size(); i++) {
+ types.add(
+ convertToParquetType(
+ MapShreddingUtils.sidecarColumnName(path, i),
+ mapType.getValueType(),
+ SpecialFields.getMapValueFieldId(field.id(), 1)
+ + 1024
+ + i,
+ 0)
+ .withId(
+ SpecialFields.getMapValueFieldId(field.id(), 1)
+ + 1024
+ + i));
+ }
+ }
+ }
+ return types.toArray(new Type[0]);
+ }
+
/** Convert paimon {@link DataField} to parquet {@link Type}. */
public static Type convertToParquetType(DataField field) {
return convertToParquetType(field.name(), field.type(), field.id(), 0);
}
public static Type convertToParquetType(String name, DataType type, int fieldId, int depth) {
+ return convertToParquetType(
+ name, type, fieldId, depth, name, java.util.Collections.emptyMap());
+ }
+
+ private static Type convertToParquetType(
+ String name,
+ DataType type,
+ int fieldId,
+ int depth,
+ String path,
+ Map> dynamicMapKeys) {
Type.Repetition repetition =
type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;
switch (type.getTypeRoot()) {
@@ -165,7 +223,9 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
LIST_ELEMENT_NAME,
arrayType.getElementType(),
fieldId,
- depth + 1)
+ depth + 1,
+ path + "." + LIST_ELEMENT_NAME,
+ dynamicMapKeys)
.withId(SpecialFields.getArrayElementFieldId(fieldId, depth + 1));
return ConversionPatterns.listOfElements(repetition, name, elementParquetType)
.withId(fieldId);
@@ -178,11 +238,22 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
keyType = keyType.copy(false);
}
Type mapKeyParquetType =
- convertToParquetType(MAP_KEY_NAME, keyType, fieldId, depth + 1)
+ convertToParquetType(
+ MAP_KEY_NAME,
+ keyType,
+ fieldId,
+ depth + 1,
+ path + "." + MAP_KEY_NAME,
+ dynamicMapKeys)
.withId(SpecialFields.getMapKeyFieldId(fieldId, depth + 1));
Type mapValueParquetType =
convertToParquetType(
- MAP_VALUE_NAME, mapType.getValueType(), fieldId, depth + 1)
+ MAP_VALUE_NAME,
+ mapType.getValueType(),
+ fieldId,
+ depth + 1,
+ path + "." + MAP_VALUE_NAME,
+ dynamicMapKeys)
.withId(SpecialFields.getMapValueFieldId(fieldId, depth + 1));
return ConversionPatterns.mapType(
repetition,
@@ -200,10 +271,22 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
elementType = elementType.copy(false);
}
Type multisetKeyParquetType =
- convertToParquetType(MAP_KEY_NAME, elementType, fieldId, depth + 1)
+ convertToParquetType(
+ MAP_KEY_NAME,
+ elementType,
+ fieldId,
+ depth + 1,
+ path + "." + MAP_KEY_NAME,
+ dynamicMapKeys)
.withId(SpecialFields.getMapKeyFieldId(fieldId, depth + 1));
Type multisetValueParquetType =
- convertToParquetType(MAP_VALUE_NAME, new IntType(false), fieldId, depth + 1)
+ convertToParquetType(
+ MAP_VALUE_NAME,
+ new IntType(false),
+ fieldId,
+ depth + 1,
+ path + "." + MAP_VALUE_NAME,
+ dynamicMapKeys)
.withId(SpecialFields.getMapValueFieldId(fieldId, depth + 1));
return ConversionPatterns.mapType(
repetition,
@@ -221,7 +304,7 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
// LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION));
// }
return groupTypeBuilder
- .addFields(convertToParquetTypes(rowType))
+ .addFields(convertToParquetTypes(rowType, dynamicMapKeys, path, depth + 1))
.named(name)
.withId(fieldId);
case VARIANT:
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
index 282805897a5f..93c3e6927089 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
@@ -52,11 +52,15 @@ public ParquetWriterFactory(ParquetBuilder writerBuilder) {
@Override
public FormatWriter create(PositionOutputStream stream, String compression) throws IOException {
- final OutputFile out = new StreamOutputFile(stream);
if (HadoopCompressionType.NONE.value().equals(compression)) {
compression = null;
}
+ if (writerBuilder instanceof RowDataParquetBuilder) {
+ return ((RowDataParquetBuilder) writerBuilder).createFormatWriter(stream, compression);
+ }
+
+ final OutputFile out = new StreamOutputFile(stream);
final ParquetWriter writer = writerBuilder.createWriter(out, compression);
return new ParquetBulkWriter(writer);
}
@@ -65,7 +69,6 @@ public FormatWriter create(PositionOutputStream stream, String compression) thro
public FormatWriter createWithShreddingSchema(
PositionOutputStream stream, String compression, RowType inferredShreddingSchema)
throws IOException {
- final OutputFile out = new StreamOutputFile(stream);
if (HadoopCompressionType.NONE.value().equals(compression)) {
compression = null;
}
@@ -73,6 +76,11 @@ public FormatWriter createWithShreddingSchema(
ParquetBuilder newBuilder =
((RowDataParquetBuilder) writerBuilder)
.withShreddingSchemas(inferredShreddingSchema);
+ if (newBuilder instanceof RowDataParquetBuilder) {
+ return ((RowDataParquetBuilder) newBuilder).createFormatWriter(stream, compression);
+ }
+
+ final OutputFile out = new StreamOutputFile(stream);
final ParquetWriter writer = newBuilder.createWriter(out, compression);
return new ParquetBulkWriter(writer);
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
index 37fc4272c6d3..8878b35a10d2 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java
@@ -18,17 +18,59 @@
package org.apache.paimon.format.parquet.reader;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.ArrayColumnVector;
+import org.apache.paimon.data.columnar.BooleanColumnVector;
+import org.apache.paimon.data.columnar.ByteColumnVector;
+import org.apache.paimon.data.columnar.BytesColumnVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.DecimalColumnVector;
+import org.apache.paimon.data.columnar.DoubleColumnVector;
+import org.apache.paimon.data.columnar.FloatColumnVector;
+import org.apache.paimon.data.columnar.IntColumnVector;
+import org.apache.paimon.data.columnar.LongColumnVector;
+import org.apache.paimon.data.columnar.MapColumnVector;
+import org.apache.paimon.data.columnar.RowColumnVector;
+import org.apache.paimon.data.columnar.ShortColumnVector;
+import org.apache.paimon.data.columnar.TimestampColumnVector;
import org.apache.paimon.data.columnar.heap.AbstractArrayBasedVector;
import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
+import org.apache.paimon.data.columnar.writable.WritableByteVector;
+import org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableDoubleVector;
+import org.apache.paimon.data.columnar.writable.WritableFloatVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
+import org.apache.paimon.data.columnar.writable.WritableLongVector;
+import org.apache.paimon.data.columnar.writable.WritableShortVector;
+import org.apache.paimon.data.columnar.writable.WritableTimestampVector;
+import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.data.variant.PaimonShreddingUtils;
import org.apache.paimon.data.variant.PaimonShreddingUtils.FieldToExtract;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.data.variant.VariantSchema;
+import org.apache.paimon.format.parquet.type.MapShreddingField;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -97,6 +139,28 @@ public class ParquetColumnVector {
PaimonShreddingUtils.getFieldsToExtract(column.getType(), variantSchema);
repetitionLevels = contentVector.repetitionLevels;
definitionLevels = contentVector.definitionLevels;
+ } else if (column instanceof MapShreddingField) {
+ MapShreddingField mapField = (MapShreddingField) column;
+ WritableColumnVector residualMap =
+ ParquetReaderUtil.createWritableColumnVector(capacity, column.getType());
+ ParquetColumnVector residualVector =
+ new ParquetColumnVector(
+ mapField.residualMapField(),
+ residualMap,
+ capacity,
+ missingColumns,
+ false);
+ children.add(residualVector);
+ for (ParquetField sidecarField : mapField.sidecarFields()) {
+ WritableColumnVector sidecar =
+ ParquetReaderUtil.createWritableColumnVector(
+ capacity, sidecarField.getType());
+ children.add(
+ new ParquetColumnVector(
+ sidecarField, sidecar, capacity, missingColumns, true));
+ }
+ repetitionLevels = residualVector.repetitionLevels;
+ definitionLevels = residualVector.definitionLevels;
} else if (isPrimitive) {
if (column.getRepetitionLevel() > 0) {
repetitionLevels = new HeapIntVector(capacity);
@@ -184,6 +248,11 @@ void assemble() {
return;
}
+ if (column instanceof MapShreddingField) {
+ assembleMapShredding((MapShreddingField) column);
+ return;
+ }
+
// nothing to do if the column itself is missing
if (vector.isAllNull()) {
return;
@@ -356,6 +425,413 @@ private void assembleStruct() {
vector.addElementsAppended(rowId);
}
+ private void assembleMapShredding(MapShreddingField mapField) {
+ for (ParquetColumnVector child : children) {
+ child.assemble();
+ }
+
+ HeapMapVector residualMapVector = (HeapMapVector) children.get(0).getValueVector();
+ HeapMapVector targetMapVector = (HeapMapVector) vector;
+ HeapBytesVector targetKeys = (HeapBytesVector) targetMapVector.getKeys();
+ WritableColumnVector targetValues = (WritableColumnVector) targetMapVector.getValues();
+ MapType mapType = (MapType) mapField.getType();
+
+ int rowCount = residualMapVector.getElementsAppended();
+ for (int rowId = 0; rowId < rowCount; rowId++) {
+ InternalMap residualMap = residualMapVector.getMap(rowId);
+ int sidecarValueCount = sidecarValueCount(rowId);
+ if (residualMapVector.isNullAt(rowId) && sidecarValueCount == 0) {
+ targetMapVector.appendNull();
+ continue;
+ }
+
+ int residualSize = residualMapVector.isNullAt(rowId) ? 0 : residualMap.size();
+ targetMapVector.appendArray(residualSize + sidecarValueCount);
+ if (residualSize > 0) {
+ appendResidualMap(residualMap, mapType, targetKeys, targetValues);
+ }
+ appendSidecars(rowId, mapField, mapType, targetKeys, targetValues);
+ }
+ }
+
+ private int sidecarValueCount(int rowId) {
+ int count = 0;
+ for (int i = 1; i < children.size(); i++) {
+ if (!children.get(i).getValueVector().isNullAt(rowId)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private void appendResidualMap(
+ InternalMap residualMap,
+ MapType mapType,
+ HeapBytesVector targetKeys,
+ WritableColumnVector targetValues) {
+ InternalArray keyArray = residualMap.keyArray();
+ InternalArray valueArray = residualMap.valueArray();
+ for (int i = 0; i < residualMap.size(); i++) {
+ appendString(targetKeys, keyArray.getString(i));
+ appendInternalArrayValue(valueArray, i, mapType.getValueType(), targetValues);
+ }
+ }
+
+ private void appendSidecars(
+ int rowId,
+ MapShreddingField mapField,
+ MapType mapType,
+ HeapBytesVector targetKeys,
+ WritableColumnVector targetValues) {
+ for (int i = 1; i < children.size(); i++) {
+ WritableColumnVector sidecarVector = children.get(i).getValueVector();
+ if (!sidecarVector.isNullAt(rowId)) {
+ appendString(targetKeys, mapField.sidecarKeys().get(i - 1));
+ appendColumnVectorValue(sidecarVector, rowId, mapType.getValueType(), targetValues);
+ }
+ }
+ }
+
+ private static void appendString(HeapBytesVector vector, BinaryString value) {
+ byte[] bytes = value.toBytes();
+ vector.appendByteArray(bytes, 0, bytes.length);
+ }
+
+ private static void appendColumnVectorValue(
+ ColumnVector source, int sourcePos, DataType type, WritableColumnVector target) {
+ if (source.isNullAt(sourcePos)) {
+ target.appendNull();
+ return;
+ }
+
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ ((WritableBooleanVector) target)
+ .appendBoolean(((BooleanColumnVector) source).getBoolean(sourcePos));
+ return;
+ case TINYINT:
+ ((WritableByteVector) target)
+ .appendByte(((ByteColumnVector) source).getByte(sourcePos));
+ return;
+ case SMALLINT:
+ ((WritableShortVector) target)
+ .appendShort(((ShortColumnVector) source).getShort(sourcePos));
+ return;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ ((WritableIntVector) target)
+ .appendInt(((IntColumnVector) source).getInt(sourcePos));
+ return;
+ case BIGINT:
+ ((WritableLongVector) target)
+ .appendLong(((LongColumnVector) source).getLong(sourcePos));
+ return;
+ case FLOAT:
+ ((WritableFloatVector) target)
+ .appendFloat(((FloatColumnVector) source).getFloat(sourcePos));
+ return;
+ case DOUBLE:
+ ((WritableDoubleVector) target)
+ .appendDouble(((DoubleColumnVector) source).getDouble(sourcePos));
+ return;
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ case BLOB:
+ BytesColumnVector.Bytes bytes = ((BytesColumnVector) source).getBytes(sourcePos);
+ ((WritableBytesVector) target).appendByteArray(bytes.data, bytes.offset, bytes.len);
+ return;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ if (source instanceof DecimalColumnVector) {
+ appendDecimal(
+ ((DecimalColumnVector) source)
+ .getDecimal(
+ sourcePos,
+ decimalType.getPrecision(),
+ decimalType.getScale()),
+ decimalType,
+ target);
+ } else if (decimalType.getPrecision() <= 9) {
+ ((WritableIntVector) target)
+ .appendInt(((IntColumnVector) source).getInt(sourcePos));
+ } else if (decimalType.getPrecision() <= 18) {
+ ((WritableLongVector) target)
+ .appendLong(((LongColumnVector) source).getLong(sourcePos));
+ } else {
+ BytesColumnVector.Bytes decimalBytes =
+ ((BytesColumnVector) source).getBytes(sourcePos);
+ ((WritableBytesVector) target)
+ .appendByteArray(
+ decimalBytes.data, decimalBytes.offset, decimalBytes.len);
+ }
+ return;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ int precision = DataTypeChecks.getPrecision(type);
+ if (source instanceof TimestampColumnVector) {
+ appendTimestamp(
+ type,
+ ((TimestampColumnVector) source).getTimestamp(sourcePos, precision),
+ target);
+ } else if (precision <= 6 && source instanceof LongColumnVector) {
+ ((WritableLongVector) target)
+ .appendLong(((LongColumnVector) source).getLong(sourcePos));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp vector for map shredding: " + source);
+ }
+ return;
+ case ARRAY:
+ appendInternalValue(((ArrayColumnVector) source).getArray(sourcePos), type, target);
+ return;
+ case MAP:
+ case MULTISET:
+ appendInternalValue(((MapColumnVector) source).getMap(sourcePos), type, target);
+ return;
+ case ROW:
+ appendInternalValue(((RowColumnVector) source).getRow(sourcePos), type, target);
+ return;
+ case VARIANT:
+ InternalRow variantRow = ((RowColumnVector) source).getRow(sourcePos);
+ appendInternalValue(
+ new GenericVariant(variantRow.getBinary(0), variantRow.getBinary(1)),
+ type,
+ target);
+ return;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported map shredding value type: " + type);
+ }
+ }
+
+ private static void appendInternalArrayValue(
+ InternalArray array, int sourcePos, DataType type, WritableColumnVector target) {
+ if (array.isNullAt(sourcePos)) {
+ target.appendNull();
+ return;
+ }
+
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ ((WritableBooleanVector) target).appendBoolean(array.getBoolean(sourcePos));
+ return;
+ case TINYINT:
+ ((WritableByteVector) target).appendByte(array.getByte(sourcePos));
+ return;
+ case SMALLINT:
+ ((WritableShortVector) target).appendShort(array.getShort(sourcePos));
+ return;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ ((WritableIntVector) target).appendInt(array.getInt(sourcePos));
+ return;
+ case BIGINT:
+ ((WritableLongVector) target).appendLong(array.getLong(sourcePos));
+ return;
+ case FLOAT:
+ ((WritableFloatVector) target).appendFloat(array.getFloat(sourcePos));
+ return;
+ case DOUBLE:
+ ((WritableDoubleVector) target).appendDouble(array.getDouble(sourcePos));
+ return;
+ case CHAR:
+ case VARCHAR:
+ appendString((HeapBytesVector) target, array.getString(sourcePos));
+ return;
+ case BINARY:
+ case VARBINARY:
+ case BLOB:
+ byte[] bytes = array.getBinary(sourcePos);
+ ((WritableBytesVector) target).appendByteArray(bytes, 0, bytes.length);
+ return;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ appendDecimal(
+ array.getDecimal(
+ sourcePos, decimalType.getPrecision(), decimalType.getScale()),
+ decimalType,
+ target);
+ return;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ appendTimestamp(
+ type,
+ array.getTimestamp(sourcePos, DataTypeChecks.getPrecision(type)),
+ target);
+ return;
+ case ARRAY:
+ appendInternalValue(array.getArray(sourcePos), type, target);
+ return;
+ case MAP:
+ case MULTISET:
+ appendInternalValue(array.getMap(sourcePos), type, target);
+ return;
+ case ROW:
+ appendInternalValue(
+ array.getRow(sourcePos, ((RowType) type).getFieldCount()), type, target);
+ return;
+ case VARIANT:
+ appendInternalValue(array.getVariant(sourcePos), type, target);
+ return;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported map shredding value type: " + type);
+ }
+ }
+
+ private static void appendInternalValue(
+ Object value, DataType type, WritableColumnVector target) {
+ if (value == null) {
+ target.appendNull();
+ return;
+ }
+
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ ((WritableBooleanVector) target).appendBoolean((boolean) value);
+ return;
+ case TINYINT:
+ ((WritableByteVector) target).appendByte((byte) value);
+ return;
+ case SMALLINT:
+ ((WritableShortVector) target).appendShort((short) value);
+ return;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ ((WritableIntVector) target).appendInt((int) value);
+ return;
+ case BIGINT:
+ ((WritableLongVector) target).appendLong((long) value);
+ return;
+ case FLOAT:
+ ((WritableFloatVector) target).appendFloat((float) value);
+ return;
+ case DOUBLE:
+ ((WritableDoubleVector) target).appendDouble((double) value);
+ return;
+ case CHAR:
+ case VARCHAR:
+ appendString((HeapBytesVector) target, (BinaryString) value);
+ return;
+ case BINARY:
+ case VARBINARY:
+ case BLOB:
+ byte[] bytes = (byte[]) value;
+ ((WritableBytesVector) target).appendByteArray(bytes, 0, bytes.length);
+ return;
+ case DECIMAL:
+ appendDecimal((Decimal) value, (DecimalType) type, target);
+ return;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ appendTimestamp(type, (Timestamp) value, target);
+ return;
+ case ARRAY:
+ appendInternalArray(
+ (InternalArray) value, (ArrayType) type, (HeapArrayVector) target);
+ return;
+ case MAP:
+ appendInternalMap((InternalMap) value, (MapType) type, (HeapMapVector) target);
+ return;
+ case MULTISET:
+ appendInternalMap(
+ (InternalMap) value,
+ new MapType(((MultisetType) type).getElementType(), new IntType(false)),
+ (HeapMapVector) target);
+ return;
+ case ROW:
+ appendInternalRow((InternalRow) value, (RowType) type, (HeapRowVector) target);
+ return;
+ case VARIANT:
+ appendVariant((Variant) value, (HeapRowVector) target);
+ return;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported map shredding value type: " + type);
+ }
+ }
+
+ private static void appendInternalArray(
+ InternalArray array, ArrayType type, HeapArrayVector target) {
+ WritableColumnVector child = (WritableColumnVector) target.getChildren()[0];
+ target.appendArray(array.size());
+ for (int i = 0; i < array.size(); i++) {
+ appendInternalArrayValue(array, i, type.getElementType(), child);
+ }
+ }
+
+ private static void appendInternalMap(InternalMap map, MapType type, HeapMapVector target) {
+ InternalArray keyArray = map.keyArray();
+ InternalArray valueArray = map.valueArray();
+ WritableColumnVector keyVector = (WritableColumnVector) target.getKeys();
+ WritableColumnVector valueVector = (WritableColumnVector) target.getValues();
+ target.appendArray(map.size());
+ for (int i = 0; i < map.size(); i++) {
+ appendInternalArrayValue(keyArray, i, type.getKeyType(), keyVector);
+ appendInternalArrayValue(valueArray, i, type.getValueType(), valueVector);
+ }
+ }
+
+ private static void appendInternalRow(InternalRow row, RowType type, HeapRowVector target) {
+ target.appendRow();
+ for (int i = 0; i < type.getFieldCount(); i++) {
+ WritableColumnVector child = (WritableColumnVector) target.getChildren()[i];
+ DataType fieldType = type.getTypeAt(i);
+ if (row.isNullAt(i)) {
+ child.appendNull();
+ } else {
+ appendInternalValue(
+ InternalRow.createFieldGetter(fieldType, i).getFieldOrNull(row),
+ fieldType,
+ child);
+ }
+ }
+ }
+
+ private static void appendVariant(Variant variant, HeapRowVector target) {
+ target.appendRow();
+ byte[] value = variant.value();
+ byte[] metadata = variant.metadata();
+ ((WritableBytesVector) target.getChildren()[0]).appendByteArray(value, 0, value.length);
+ ((WritableBytesVector) target.getChildren()[1])
+ .appendByteArray(metadata, 0, metadata.length);
+ }
+
+ private static void appendDecimal(
+ Decimal decimal, DecimalType type, WritableColumnVector target) {
+ if (decimal == null) {
+ target.appendNull();
+ } else if (type.getPrecision() <= 9) {
+ ((WritableIntVector) target).appendInt((int) decimal.toUnscaledLong());
+ } else if (type.getPrecision() <= 18) {
+ ((WritableLongVector) target).appendLong(decimal.toUnscaledLong());
+ } else {
+ byte[] bytes = decimal.toUnscaledBytes();
+ ((WritableBytesVector) target).appendByteArray(bytes, 0, bytes.length);
+ }
+ }
+
+ private static void appendTimestamp(
+ DataType type, Timestamp timestamp, WritableColumnVector target) {
+ if (timestamp == null) {
+ target.appendNull();
+ } else if (target instanceof WritableTimestampVector) {
+ ((WritableTimestampVector) target).appendTimestamp(timestamp);
+ } else if (DataTypeChecks.getPrecision(type) <= 3) {
+ ((WritableLongVector) target).appendLong(timestamp.getMillisecond());
+ } else if (DataTypeChecks.getPrecision(type) <= 6) {
+ ((WritableLongVector) target).appendLong(timestamp.toMicros());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp vector for map shredding: " + target);
+ }
+ }
+
/**
* For a collection (i.e., array or map) element at index 'idx', returns the starting index of
* the next collection after it.
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
index a2741f869ab6..a704c815bc25 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.parquet.reader;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.heap.CastedArrayColumnVector;
import org.apache.paimon.data.columnar.heap.CastedMapColumnVector;
@@ -36,8 +37,10 @@
import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.variant.VariantMetadataUtils;
+import org.apache.paimon.format.parquet.MapShreddingUtils;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.format.parquet.VariantUtils;
+import org.apache.paimon.format.parquet.type.MapShreddingField;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.format.parquet.type.ParquetGroupField;
import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
@@ -60,6 +63,7 @@
import org.apache.parquet.io.GroupColumnIO;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.PrimitiveColumnIO;
+import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -67,8 +71,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME;
+import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
@@ -224,19 +231,103 @@ public static ColumnVector createReadableColumnVector(
public static List buildFieldsList(
DataField[] readFields, MessageColumnIO columnIO, MessageType requestedFileSchema) {
+ return buildFieldsList(readFields, columnIO, requestedFileSchema, Collections.emptyMap());
+ }
+
+ public static List buildFieldsList(
+ DataField[] readFields,
+ MessageColumnIO columnIO,
+ MessageType requestedFileSchema,
+ Map> dynamicMapKeys) {
+ return buildFieldsList(readFields, columnIO, requestedFileSchema, dynamicMapKeys, "");
+ }
+
+ private static List buildFieldsList(
+ DataField[] readFields,
+ GroupColumnIO columnIO,
+ Type requestedFileSchema,
+ Map> dynamicMapKeys,
+ String parentPath) {
List list = new ArrayList<>();
for (int i = 0; i < readFields.length; i++) {
+ DataField readField = readFields[i];
+ String path = MapShreddingUtils.path(parentPath, readField.name());
+ if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, path)
+ && requestedFileSchema
+ .asGroupType()
+ .containsField(MapShreddingUtils.sidecarColumnName(path, 0))) {
+ list.add(
+ constructMapShreddingField(
+ readField,
+ columnIO,
+ requestedFileSchema.asGroupType(),
+ dynamicMapKeys.get(path),
+ path,
+ dynamicMapKeys));
+ continue;
+ }
list.add(
constructField(
- readFields[i],
- lookupColumnByName(columnIO, readFields[i].name()),
- requestedFileSchema.getType(i)));
+ readField,
+ lookupColumnByName(columnIO, readField.name()),
+ requestedFileSchema.asGroupType().getType(readField.name()),
+ path,
+ dynamicMapKeys));
}
return list;
}
+ private static ParquetField constructMapShreddingField(
+ DataField dataField,
+ GroupColumnIO columnIO,
+ GroupType requestedFileSchema,
+ List keys,
+ String path,
+ Map> dynamicMapKeys) {
+ ParquetGroupField residualMapField =
+ (ParquetGroupField)
+ constructField(
+ dataField,
+ lookupColumnByName(columnIO, dataField.name()),
+ requestedFileSchema.getType(dataField.name()),
+ path,
+ dynamicMapKeys);
+
+ List sidecarFields = new ArrayList<>();
+ List sidecarKeys = new ArrayList<>();
+ int keyOrdinal = 0;
+ while (keyOrdinal < keys.size()) {
+ String sidecarName = MapShreddingUtils.sidecarColumnName(path, keyOrdinal);
+ if (!requestedFileSchema.containsField(sidecarName)) {
+ break;
+ }
+ MapType mapType = (MapType) dataField.type();
+ sidecarFields.add(
+ constructField(
+ new DataField(0, sidecarName, mapType.getValueType()),
+ lookupColumnByName(columnIO, sidecarName),
+ requestedFileSchema.getType(sidecarName),
+ sidecarName,
+ dynamicMapKeys));
+ sidecarKeys.add(BinaryString.fromString(keys.get(keyOrdinal)));
+ keyOrdinal++;
+ }
+ return new MapShreddingField(
+ dataField.type(), residualMapField, sidecarFields, sidecarKeys);
+ }
+
private static ParquetField constructField(
DataField dataField, ColumnIO columnIO, Type parquetType) {
+ return constructField(
+ dataField, columnIO, parquetType, dataField.name(), Collections.emptyMap());
+ }
+
+ private static ParquetField constructField(
+ DataField dataField,
+ ColumnIO columnIO,
+ Type parquetType,
+ String path,
+ Map> dynamicMapKeys) {
boolean required = columnIO.getType().getRepetition() == REQUIRED;
int repetitionLevel = columnIO.getRepetitionLevel();
int definitionLevel = columnIO.getDefinitionLevel();
@@ -249,24 +340,20 @@ private static ParquetField constructField(
return constructVariantField(dataField, columnIO, parquetType);
}
- ImmutableList.Builder fieldsBuilder = ImmutableList.builder();
- List fieldNames = rowType.getFieldNames();
- List children = rowType.getFields();
- for (int i = 0; i < children.size(); i++) {
- String childName = fieldNames.get(i);
- fieldsBuilder.add(
- constructField(
- children.get(i),
- lookupColumnByName(groupColumnIO, childName),
- parquetType.asGroupType().getType(childName)));
- }
+ List fields =
+ buildFieldsList(
+ rowType.getFields().toArray(new DataField[0]),
+ groupColumnIO,
+ parquetType,
+ dynamicMapKeys,
+ path);
return new ParquetGroupField(
type,
repetitionLevel,
definitionLevel,
required,
- fieldsBuilder.build(),
+ fields,
groupColumnIO.getFieldPath());
}
@@ -283,12 +370,16 @@ private static ParquetField constructField(
constructField(
new DataField(0, "", mapType.getKeyType()),
keyValueColumnIO.getChild(0),
- keyValueType.getKey());
+ keyValueType.getKey(),
+ path + "." + MAP_KEY_NAME,
+ dynamicMapKeys);
ParquetField valueField =
constructField(
new DataField(0, "", mapType.getValueType()),
keyValueColumnIO.getChild(1),
- keyValueType.getValue());
+ keyValueType.getValue(),
+ path + "." + MAP_VALUE_NAME,
+ dynamicMapKeys);
return new ParquetGroupField(
type,
repetitionLevel,
@@ -307,12 +398,16 @@ private static ParquetField constructField(
constructField(
new DataField(0, "", multisetType.getElementType()),
keyValueColumnIO.getChild(0),
- keyValueType.getKey());
+ keyValueType.getKey(),
+ path + "." + MAP_KEY_NAME,
+ dynamicMapKeys);
ParquetField valueField =
constructField(
new DataField(0, "", new IntType()),
keyValueColumnIO.getChild(1),
- keyValueType.getValue());
+ keyValueType.getValue(),
+ path + "." + MAP_VALUE_NAME,
+ dynamicMapKeys);
return new ParquetGroupField(
type,
repetitionLevel,
@@ -349,7 +444,9 @@ private static ParquetField constructField(
constructField(
new DataField(0, "", arrayType.getElementType()),
getArrayElementColumn(elementTypeColumnIO),
- parquetListElementType(parquetType.asGroupType()));
+ parquetListElementType(parquetType.asGroupType()),
+ path + "." + ParquetSchemaConverter.LIST_ELEMENT_NAME,
+ dynamicMapKeys);
if (repetitionLevel == field.getRepetitionLevel()) {
repetitionLevel = columnIO.getParent().getRepetitionLevel();
}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/MapShreddingField.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/MapShreddingField.java
new file mode 100644
index 000000000000..474dcdd0d1e0
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/MapShreddingField.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataType;
+
+import java.util.List;
+
+/**
+ * Parquet field for a logical map shredding assembled from residual map and sidecar value columns.
+ */
+public class MapShreddingField extends ParquetGroupField {
+
+ private final ParquetGroupField residualMapField;
+ private final List sidecarFields;
+ private final List sidecarKeys;
+
+ public MapShreddingField(
+ DataType type,
+ ParquetGroupField residualMapField,
+ List sidecarFields,
+ List sidecarKeys) {
+ super(
+ type,
+ residualMapField.getRepetitionLevel(),
+ residualMapField.getDefinitionLevel(),
+ residualMapField.isRequired(),
+ residualMapField.getChildren(),
+ residualMapField.path());
+ this.residualMapField = residualMapField;
+ this.sidecarFields = sidecarFields;
+ this.sidecarKeys = sidecarKeys;
+ }
+
+ public ParquetGroupField residualMapField() {
+ return residualMapField;
+ }
+
+ public List sidecarFields() {
+ return sidecarFields;
+ }
+
+ public List sidecarKeys() {
+ return sidecarKeys;
+ }
+}
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
index 14970e548e9b..6cee8fd1bb46 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet.writer;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.parquet.MapShreddingUtils;
import org.apache.paimon.format.parquet.VariantUtils;
import org.apache.paimon.types.RowType;
@@ -31,7 +32,10 @@
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType;
@@ -40,13 +44,23 @@ public class ParquetRowDataBuilder
extends ParquetWriter.Builder {
private final RowType rowType;
+ private final Map> dynamicMapKeys;
@Nullable private final RowType shreddingSchemas;
public ParquetRowDataBuilder(
OutputFile path, RowType rowType, @Nullable RowType shreddingSchemas) {
+ this(path, rowType, shreddingSchemas, Collections.emptyMap());
+ }
+
+ public ParquetRowDataBuilder(
+ OutputFile path,
+ RowType rowType,
+ @Nullable RowType shreddingSchemas,
+ Map> dynamicMapKeys) {
super(path);
this.rowType = rowType;
this.shreddingSchemas = shreddingSchemas;
+ this.dynamicMapKeys = dynamicMapKeys;
}
@Override
@@ -70,19 +84,27 @@ private ParquetWriteSupport(Configuration conf) {
this.conf = conf;
this.schema =
convertToParquetMessageType(
- VariantUtils.replaceWithShreddingType(rowType, shreddingSchemas));
+ VariantUtils.replaceWithShreddingType(rowType, shreddingSchemas),
+ dynamicMapKeys);
}
@Override
public WriteContext init(Configuration configuration) {
- return new WriteContext(schema, new HashMap<>());
+ Map metadata = new HashMap<>();
+ metadata.putAll(MapShreddingUtils.toFooterMetadata(dynamicMapKeys));
+ return new WriteContext(schema, metadata);
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.writer =
new ParquetRowDataWriter(
- recordConsumer, rowType, schema, conf, shreddingSchemas);
+ recordConsumer,
+ rowType,
+ schema,
+ conf,
+ shreddingSchemas,
+ dynamicMapKeys);
}
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
index 80b788733342..273a42c3343f 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
@@ -29,6 +29,7 @@
import org.apache.paimon.data.variant.PaimonShreddingUtils;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.data.variant.VariantSchema;
+import org.apache.paimon.format.parquet.MapShreddingUtils;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
@@ -53,7 +54,11 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
@@ -71,6 +76,7 @@ public class ParquetRowDataWriter {
private final RowWriter rowWriter;
private final RecordConsumer recordConsumer;
@Nullable private final RowType shreddingSchemas;
+ private final Map> dynamicMapKeys;
public ParquetRowDataWriter(
RecordConsumer recordConsumer,
@@ -78,10 +84,21 @@ public ParquetRowDataWriter(
GroupType schema,
Configuration conf,
@Nullable RowType shreddingSchemas) {
+ this(recordConsumer, rowType, schema, conf, shreddingSchemas, Collections.emptyMap());
+ }
+
+ public ParquetRowDataWriter(
+ RecordConsumer recordConsumer,
+ RowType rowType,
+ GroupType schema,
+ Configuration conf,
+ @Nullable RowType shreddingSchemas,
+ Map> dynamicMapKeys) {
this.conf = conf;
this.recordConsumer = recordConsumer;
this.shreddingSchemas = shreddingSchemas;
- this.rowWriter = new RowWriter(rowType, schema);
+ this.dynamicMapKeys = dynamicMapKeys;
+ this.rowWriter = new RowWriter(rowType, schema, dynamicMapKeys, "");
}
/**
@@ -96,6 +113,10 @@ public void write(final InternalRow record) {
}
private FieldWriter createWriter(DataType t, Type type) {
+ return createWriter(t, type, type.getName());
+ }
+
+ private FieldWriter createWriter(DataType t, Type type, String path) {
if (type.isPrimitive()) {
switch (t.getTypeRoot()) {
case CHAR:
@@ -144,13 +165,13 @@ private FieldWriter createWriter(DataType t, Type type) {
} else if (t instanceof MapType
&& annotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return new MapWriter(
- ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType);
+ ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType, null);
} else if (t instanceof MultisetType
&& annotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return new MapWriter(
- ((MultisetType) t).getElementType(), new IntType(false), groupType);
+ ((MultisetType) t).getElementType(), new IntType(false), groupType, null);
} else if (t instanceof RowType && type instanceof GroupType) {
- return new RowWriter((RowType) t, groupType);
+ return new RowWriter((RowType) t, groupType, dynamicMapKeys, path);
} else if (t instanceof VariantType && type instanceof GroupType) {
RowType shreddingSchema =
shreddingSchemas != null && shreddingSchemas.containsField(type.getName())
@@ -442,7 +463,14 @@ private class MapWriter implements FieldWriter {
private final FieldWriter keyWriter;
private final FieldWriter valueWriter;
- private MapWriter(DataType keyType, DataType valueType, GroupType groupType) {
+ @Nullable private final Set residualKeysToSkip;
+
+ private MapWriter(
+ DataType keyType,
+ DataType valueType,
+ GroupType groupType,
+ @Nullable Set residualKeysToSkip) {
+ this.residualKeysToSkip = residualKeysToSkip;
// Get the internal map structure (MAP_KEY_VALUE)
GroupType repeatedType = groupType.getType(0).asGroupType();
this.repeatedGroupName = repeatedType.getName();
@@ -471,12 +499,15 @@ public void write(InternalArray arrayData, int ordinal) {
private void writeMapData(InternalMap mapData) {
recordConsumer.startGroup();
- if (mapData != null && mapData.size() > 0) {
+ if (mapData != null && hasResidualEntry(mapData)) {
recordConsumer.startField(repeatedGroupName, 0);
InternalArray keyArray = mapData.keyArray();
InternalArray valueArray = mapData.valueArray();
for (int i = 0; i < keyArray.size(); i++) {
+ if (shouldSkipResidualEntry(keyArray, valueArray, i)) {
+ continue;
+ }
recordConsumer.startGroup();
if (!keyArray.isNullAt(i)) {
// write key element
@@ -503,6 +534,25 @@ private void writeMapData(InternalMap mapData) {
}
recordConsumer.endGroup();
}
+
+ private boolean hasResidualEntry(InternalMap mapData) {
+ InternalArray keyArray = mapData.keyArray();
+ InternalArray valueArray = mapData.valueArray();
+ for (int i = 0; i < keyArray.size(); i++) {
+ if (!shouldSkipResidualEntry(keyArray, valueArray, i)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean shouldSkipResidualEntry(
+ InternalArray keyArray, InternalArray valueArray, int ordinal) {
+ return residualKeysToSkip != null
+ && !keyArray.isNullAt(ordinal)
+ && !valueArray.isNullAt(ordinal)
+ && residualKeysToSkip.contains(keyArray.getString(ordinal));
+ }
}
/** It writes an array type field to parquet. */
@@ -557,38 +607,87 @@ private void writeArrayData(InternalArray arrayData) {
/** It writes a row type field to parquet. */
private class RowWriter implements FieldWriter {
- private final FieldWriter[] fieldWriters;
- private final String[] fieldNames;
- private final boolean[] isNullable;
+ private final RowFieldWriter[] rowFieldWriters;
+ private final int fieldCount;
public RowWriter(RowType rowType, GroupType groupType) {
- this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
+ this(rowType, groupType, null, "");
+ }
+
+ public RowWriter(
+ RowType rowType,
+ GroupType groupType,
+ @Nullable Map> mapShreddingKeys,
+ String parentPath) {
+ this.fieldCount = rowType.getFieldCount();
List fieldTypes = rowType.getFieldTypes();
- this.fieldWriters = new FieldWriter[rowType.getFieldCount()];
- this.isNullable = new boolean[rowType.getFieldCount()];
- for (int i = 0; i < fieldWriters.length; i++) {
- fieldWriters[i] = createWriter(fieldTypes.get(i), groupType.getType(i));
- isNullable[i] = fieldTypes.get(i).isNullable();
+ List writers = new java.util.ArrayList<>();
+ int physicalOrdinal = 0;
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fieldName = rowType.getFieldNames().get(i);
+ String path = MapShreddingUtils.path(parentPath, fieldName);
+ DataType fieldType = fieldTypes.get(i);
+ Type parquetType = groupType.getType(physicalOrdinal);
+ if (mapShreddingKeys != null
+ && MapShreddingUtils.isMapShreddingPath(mapShreddingKeys, path)) {
+ List keys = mapShreddingKeys.get(path);
+ Set keysToSkip = new HashSet<>();
+ for (String key : keys) {
+ keysToSkip.add(BinaryString.fromString(key));
+ }
+ MapType mapType = (MapType) fieldType;
+ writers.add(
+ new RowFieldWriter(
+ fieldName,
+ i,
+ fieldType.isNullable(),
+ new MapWriter(
+ mapType.getKeyType(),
+ mapType.getValueType(),
+ parquetType.asGroupType(),
+ keysToSkip)));
+ physicalOrdinal++;
+ for (int keyOrdinal = 0; keyOrdinal < keys.size(); keyOrdinal++) {
+ Type sidecarType = groupType.getType(physicalOrdinal);
+ writers.add(
+ new RowFieldWriter(
+ sidecarType.getName(),
+ i,
+ true,
+ new MapSidecarWriter(
+ keys.get(keyOrdinal),
+ createWriter(
+ mapType.getValueType(), sidecarType))));
+ physicalOrdinal++;
+ }
+ } else {
+ writers.add(
+ new RowFieldWriter(
+ fieldName,
+ i,
+ fieldType.isNullable(),
+ createWriter(fieldType, parquetType, path)));
+ physicalOrdinal++;
+ }
}
+ this.rowFieldWriters = writers.toArray(new RowFieldWriter[0]);
}
public void write(InternalRow row) {
- for (int i = 0; i < fieldWriters.length; i++) {
- if (!row.isNullAt(i)) {
- String fieldName = fieldNames[i];
- FieldWriter writer = fieldWriters[i];
-
- recordConsumer.startField(fieldName, i);
- writer.write(row, i);
- recordConsumer.endField(fieldName, i);
+ for (int i = 0; i < rowFieldWriters.length; i++) {
+ RowFieldWriter rowFieldWriter = rowFieldWriters[i];
+ if (rowFieldWriter.shouldWrite(row)) {
+ recordConsumer.startField(rowFieldWriter.fieldName, i);
+ rowFieldWriter.write(row);
+ recordConsumer.endField(rowFieldWriter.fieldName, i);
} else {
- if (!isNullable[i]) {
+ if (!rowFieldWriter.isNullable) {
throw new IllegalArgumentException(
String.format(
"Field '%s' expected not null but found null value. A possible cause is that the "
+ "table used %s or %s merge-engine and the aggregate function produced "
+ "null value when retracting.",
- fieldNames[i],
+ rowFieldWriter.fieldName,
CoreOptions.MergeEngine.PARTIAL_UPDATE,
CoreOptions.MergeEngine.AGGREGATE));
}
@@ -599,7 +698,7 @@ public void write(InternalRow row) {
@Override
public void write(InternalRow row, int ordinal) {
recordConsumer.startGroup();
- InternalRow rowData = row.getRow(ordinal, fieldWriters.length);
+ InternalRow rowData = row.getRow(ordinal, fieldCount);
write(rowData);
recordConsumer.endGroup();
}
@@ -607,12 +706,81 @@ public void write(InternalRow row, int ordinal) {
@Override
public void write(InternalArray arrayData, int ordinal) {
recordConsumer.startGroup();
- InternalRow rowData = arrayData.getRow(ordinal, fieldWriters.length);
+ InternalRow rowData = arrayData.getRow(ordinal, fieldCount);
write(rowData);
recordConsumer.endGroup();
}
}
+ private class RowFieldWriter {
+
+ private final String fieldName;
+ private final int logicalOrdinal;
+ private final boolean isNullable;
+ private final FieldWriter writer;
+
+ private RowFieldWriter(
+ String fieldName, int logicalOrdinal, boolean isNullable, FieldWriter writer) {
+ this.fieldName = fieldName;
+ this.logicalOrdinal = logicalOrdinal;
+ this.isNullable = isNullable;
+ this.writer = writer;
+ }
+
+ private boolean shouldWrite(InternalRow row) {
+ if (writer instanceof MapSidecarWriter) {
+ return !row.isNullAt(logicalOrdinal)
+ && ((MapSidecarWriter) writer).findValueIndex(row.getMap(logicalOrdinal))
+ >= 0;
+ }
+ return !row.isNullAt(logicalOrdinal);
+ }
+
+ private void write(InternalRow row) {
+ writer.write(row, logicalOrdinal);
+ }
+ }
+
+ private class MapSidecarWriter implements FieldWriter {
+
+ private final BinaryString key;
+ private final FieldWriter valueWriter;
+ private int valueIndex = -1;
+
+ private MapSidecarWriter(String key, FieldWriter valueWriter) {
+ this.key = BinaryString.fromString(key);
+ this.valueWriter = valueWriter;
+ }
+
+ @Override
+ public void write(InternalRow row, int ordinal) {
+ InternalMap map = row.getMap(ordinal);
+ int index = valueIndex >= 0 ? valueIndex : findValueIndex(map);
+ valueIndex = -1;
+ valueWriter.write(map.valueArray(), index);
+ }
+
+ @Override
+ public void write(InternalArray arrayData, int ordinal) {
+ throw new UnsupportedOperationException("Map sidecar writer only supports rows.");
+ }
+
+ private int findValueIndex(InternalMap map) {
+ InternalArray keyArray = map.keyArray();
+ InternalArray valueArray = map.valueArray();
+ for (int i = 0; i < keyArray.size(); i++) {
+ if (!keyArray.isNullAt(i)
+ && !valueArray.isNullAt(i)
+ && key.equals(keyArray.getString(i))) {
+ valueIndex = i;
+ return i;
+ }
+ }
+ valueIndex = -1;
+ return -1;
+ }
+ }
+
private class VariantWriter implements FieldWriter {
@Nullable private final VariantSchema variantSchema;
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index 1a66a55130e1..b218e4e01252 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -19,8 +19,12 @@
package org.apache.paimon.format.parquet.writer;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.parquet.ColumnConfigParser;
+import org.apache.paimon.format.parquet.MapShreddingKeyExtractor;
+import org.apache.paimon.format.parquet.MapShreddingUtils;
import org.apache.paimon.format.parquet.VariantUtils;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
@@ -34,17 +38,26 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
/** A {@link ParquetBuilder} for {@link InternalRow}. */
public class RowDataParquetBuilder implements ParquetBuilder {
private final RowType rowType;
private final Configuration conf;
+ private final Options mapShreddingOptions;
@Nullable private RowType shreddingSchemas;
public RowDataParquetBuilder(RowType rowType, Options options) {
+ this(rowType, options, options);
+ }
+
+ public RowDataParquetBuilder(RowType rowType, Options options, Options mapShreddingOptions) {
this.rowType = rowType;
this.conf = new Configuration(false);
+ this.mapShreddingOptions = mapShreddingOptions;
this.shreddingSchemas = VariantUtils.shreddingSchemasFromOptions(options);
options.toMap().forEach(conf::set);
}
@@ -57,8 +70,23 @@ public RowDataParquetBuilder withShreddingSchemas(RowType shreddingSchemas) {
@Override
public ParquetWriter createWriter(OutputFile out, String compression)
throws IOException {
+ return createWriter(out, compression, Collections.emptyMap());
+ }
+
+ public FormatWriter createFormatWriter(PositionOutputStream stream, String compression)
+ throws IOException {
+ if (MapShreddingUtils.isMapShreddingEnabled(mapShreddingOptions)) {
+ return new MapShreddingFormatWriter(stream, compression);
+ }
+ OutputFile out = new StreamOutputFile(stream);
+ return new ParquetBulkWriter(createWriter(out, compression));
+ }
+
+ private ParquetWriter createWriter(
+ OutputFile out, String compression, Map> dynamicMapKeys)
+ throws IOException {
ParquetRowDataBuilder builder =
- new ParquetRowDataBuilder(out, rowType, shreddingSchemas)
+ new ParquetRowDataBuilder(out, rowType, shreddingSchemas, dynamicMapKeys)
.withConf(conf)
.withCompressionCodec(
CompressionCodecName.fromConf(getCompression(compression)))
@@ -121,4 +149,64 @@ public ParquetWriter createWriter(OutputFile out, String compressio
public String getCompression(String compression) {
return conf.get("parquet.compression", compression);
}
+
+ private class MapShreddingFormatWriter implements FormatWriter {
+
+ private final PositionOutputStream stream;
+ private final String compression;
+ private final MapShreddingKeyExtractor extractor;
+ private FormatWriter delegate;
+
+ private MapShreddingFormatWriter(PositionOutputStream stream, String compression) {
+ this.stream = stream;
+ this.compression = compression;
+ this.extractor = new MapShreddingKeyExtractor(rowType, mapShreddingOptions);
+ }
+
+ @Override
+ public void addElement(InternalRow row) throws IOException {
+ if (delegate != null) {
+ delegate.addElement(row);
+ return;
+ }
+
+ if (!extractor.finished()) {
+ extractor.add(row);
+ }
+ if (extractor.finished()) {
+ initializeDelegate();
+ }
+ }
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
+ if (delegate != null) {
+ return delegate.reachTargetSize(suggestedCheck, targetSize);
+ }
+ return suggestedCheck && stream.getPos() >= targetSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (delegate == null) {
+ initializeDelegate();
+ }
+ delegate.close();
+ }
+
+ @Nullable
+ @Override
+ public Object writerMetadata() {
+ return delegate == null ? null : delegate.writerMetadata();
+ }
+
+ private void initializeDelegate() throws IOException {
+ Map> dynamicKeys = extractor.finish();
+ OutputFile out = new StreamOutputFile(stream);
+ delegate = new ParquetBulkWriter(createWriter(out, compression, dynamicKeys));
+ for (InternalRow bufferedRow : extractor.bufferedRows()) {
+ delegate.addElement(bufferedRow);
+ }
+ }
+ }
}
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/MapShreddingStorageBenchmark.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/MapShreddingStorageBenchmark.java
new file mode 100644
index 000000000000..108523887bc8
--- /dev/null
+++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/MapShreddingStorageBenchmark.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Benchmark for map shredding storage size. */
+public class MapShreddingStorageBenchmark {
+
+ private static final String MAP_FIELD = "headers";
+ private static final String MAP_SHREDDING_COLUMNS = "map.shredding.columns";
+ private static final String MAP_SHREDDING_MAX_KEYS = "map.shredding.maxKeys";
+ private static final String MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY =
+ "map.shredding.maxInferBufferMemory";
+ private static final String MAP_SHREDDING_MAX_INFER_BUFFER_ROW =
+ "map.shredding.maxInferBufferRow";
+ private static final int DEFAULT_ROW_COUNT = 100_000;
+ private static final int DEFAULT_HOT_KEY_COUNT = 32;
+ private static final int DEFAULT_VALUE_LENGTH = 16;
+ private static final int DEFAULT_KEY_PADDING_LENGTH = 128;
+ private static final int DEFAULT_VALUE_RUN_LENGTH = 128;
+ private static final int DEFAULT_VALUE_CARDINALITY = 4;
+ private static final VarCharType STRING = new VarCharType(VarCharType.MAX_LENGTH);
+ private static final RowType ROW_TYPE =
+ RowType.builder()
+ .field("id", new IntType())
+ .field(MAP_FIELD, new MapType(STRING, STRING))
+ .build();
+
+ @TempDir File folder;
+
+ @Test
+ public void benchmarkLongHotKeyStorageSize() throws Exception {
+ int rowCount = intProperty("paimon.benchmark.map-shredding.rows", DEFAULT_ROW_COUNT);
+ int hotKeyCount =
+ intProperty("paimon.benchmark.map-shredding.hot-keys", DEFAULT_HOT_KEY_COUNT);
+ int valueLength =
+ intProperty("paimon.benchmark.map-shredding.value-length", DEFAULT_VALUE_LENGTH);
+ int keyPaddingLength =
+ intProperty(
+ "paimon.benchmark.map-shredding.key-padding-length",
+ DEFAULT_KEY_PADDING_LENGTH);
+
+ assertStorageSaving(
+ "Map shredding long header key storage benchmark",
+ Scenario.LONG_HOT_KEYS,
+ rowCount,
+ hotKeyCount,
+ valueLength,
+ keyPaddingLength,
+ 1,
+ 1,
+ false);
+ }
+
+ @Test
+ public void benchmarkColumnarValueStorageSize() throws Exception {
+ int rowCount = intProperty("paimon.benchmark.map-shredding.rows", DEFAULT_ROW_COUNT);
+ int hotKeyCount =
+ intProperty("paimon.benchmark.map-shredding.hot-keys", DEFAULT_HOT_KEY_COUNT);
+ int valueLength =
+ intProperty("paimon.benchmark.map-shredding.value-length", DEFAULT_VALUE_LENGTH);
+ int valueRunLength =
+ intProperty(
+ "paimon.benchmark.map-shredding.value-run-length",
+ DEFAULT_VALUE_RUN_LENGTH);
+ int valueCardinality =
+ intProperty(
+ "paimon.benchmark.map-shredding.value-cardinality",
+ DEFAULT_VALUE_CARDINALITY);
+
+ assertStorageSaving(
+ "Map shredding columnar header value storage benchmark",
+ Scenario.COLUMNAR_VALUES,
+ rowCount,
+ hotKeyCount,
+ valueLength,
+ 0,
+ valueRunLength,
+ valueCardinality,
+ true);
+ }
+
+ private int intProperty(String key, int defaultValue) {
+ String value = System.getProperties().getProperty(key);
+ return value == null ? defaultValue : Integer.parseInt(value);
+ }
+
+ private void assertStorageSaving(
+ String benchmarkName,
+ Scenario scenario,
+ int rowCount,
+ int hotKeyCount,
+ int valueLength,
+ int keyPaddingLength,
+ int valueRunLength,
+ int valueCardinality,
+ boolean dictionaryEnabled)
+ throws Exception {
+ long regularSize =
+ writeFile(
+ false,
+ scenario,
+ rowCount,
+ hotKeyCount,
+ valueLength,
+ keyPaddingLength,
+ valueRunLength,
+ valueCardinality,
+ dictionaryEnabled);
+ long mapShreddingSize =
+ writeFile(
+ true,
+ scenario,
+ rowCount,
+ hotKeyCount,
+ valueLength,
+ keyPaddingLength,
+ valueRunLength,
+ valueCardinality,
+ dictionaryEnabled);
+ long savedBytes = regularSize - mapShreddingSize;
+ double savedPercent = savedBytes * 100.0D / regularSize;
+
+ System.out.printf(
+ Locale.ROOT,
+ "%s: rows=%d, hotKeys=%d, valueLength=%d, keyPaddingLength=%d,"
+ + " valueRunLength=%d, valueCardinality=%d, dictionary=%s,"
+ + " regular=%d bytes, mapShredding=%d bytes, saved=%d bytes (%.2f%%)%n",
+ benchmarkName,
+ rowCount,
+ hotKeyCount,
+ valueLength,
+ keyPaddingLength,
+ valueRunLength,
+ valueCardinality,
+ dictionaryEnabled,
+ regularSize,
+ mapShreddingSize,
+ savedBytes,
+ savedPercent);
+
+ assertThat(mapShreddingSize).isLessThan(regularSize);
+ }
+
+ private Options options(boolean mapShredding, int hotKeyCount, boolean dictionaryEnabled) {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
+ options.setInteger("parquet.block.size", 256 * 1024 * 1024);
+ options.set("parquet.enable.dictionary", Boolean.toString(dictionaryEnabled));
+ if (mapShredding) {
+ options.set(MAP_SHREDDING_COLUMNS, MAP_FIELD);
+ options.setInteger(MAP_SHREDDING_MAX_KEYS, hotKeyCount);
+ options.setInteger(MAP_SHREDDING_MAX_INFER_BUFFER_ROW, 10_000);
+ options.set(MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY, "64 mb");
+ }
+ return options;
+ }
+
+ private long writeFile(
+ boolean mapShredding,
+ Scenario scenario,
+ int rowCount,
+ int hotKeyCount,
+ int valueLength,
+ int keyPaddingLength,
+ int valueRunLength,
+ int valueCardinality,
+ boolean dictionaryEnabled)
+ throws Exception {
+ Options options = options(mapShredding, hotKeyCount, dictionaryEnabled);
+ Path path =
+ new Path(
+ folder.getPath(),
+ (mapShredding ? "map-shredding-" : "regular-") + UUID.randomUUID());
+ ParquetWriterFactory factory =
+ new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, options));
+ LocalFileIO fileIO = new LocalFileIO();
+ try (FormatWriter writer = factory.create(fileIO.newOutputStream(path, false), "snappy")) {
+ for (int rowId = 0; rowId < rowCount; rowId++) {
+ writer.addElement(
+ row(
+ scenario,
+ rowId,
+ hotKeyCount,
+ valueLength,
+ keyPaddingLength,
+ valueRunLength,
+ valueCardinality));
+ }
+ }
+ return fileIO.getFileSize(path);
+ }
+
+ private InternalRow row(
+ Scenario scenario,
+ int rowId,
+ int hotKeyCount,
+ int valueLength,
+ int keyPaddingLength,
+ int valueRunLength,
+ int valueCardinality) {
+ GenericRow row = new GenericRow(2);
+ row.setField(0, rowId);
+ row.setField(
+ 1,
+ map(
+ scenario,
+ rowId,
+ hotKeyCount,
+ valueLength,
+ keyPaddingLength,
+ valueRunLength,
+ valueCardinality));
+ return row;
+ }
+
+ private GenericMap map(
+ Scenario scenario,
+ int rowId,
+ int hotKeyCount,
+ int valueLength,
+ int keyPaddingLength,
+ int valueRunLength,
+ int valueCardinality) {
+ Map map = new LinkedHashMap<>();
+ for (int keyId = 0; keyId < hotKeyCount; keyId++) {
+ map.put(
+ hotKey(keyId, keyPaddingLength),
+ value(scenario, rowId, keyId, valueLength, valueRunLength, valueCardinality));
+ }
+ return new GenericMap(map);
+ }
+
+ private BinaryString hotKey(int keyId, int keyPaddingLength) {
+ String prefix = String.format(Locale.ROOT, "very_common_header_key_%03d_", keyId);
+ StringBuilder builder = new StringBuilder(prefix);
+ while (builder.length() < prefix.length() + keyPaddingLength) {
+ builder.append("0123456789abcdef");
+ }
+ int keyLength = prefix.length() + keyPaddingLength;
+ return BinaryString.fromString(builder.substring(0, Math.min(builder.length(), keyLength)));
+ }
+
+ private BinaryString value(
+ Scenario scenario,
+ int rowId,
+ int keyId,
+ int valueLength,
+ int valueRunLength,
+ int valueCardinality) {
+ int valueId =
+ scenario == Scenario.COLUMNAR_VALUES
+ ? rowId / valueRunLength % valueCardinality
+ : rowId;
+ String prefix = String.format(Locale.ROOT, "v_%03d_%03d_", keyId, valueId);
+ StringBuilder builder = new StringBuilder(valueLength);
+ while (builder.length() < valueLength) {
+ builder.append(prefix);
+ }
+ return BinaryString.fromString(builder.substring(0, valueLength));
+ }
+
+ private enum Scenario {
+ LONG_HOT_KEYS,
+ COLUMNAR_VALUES
+ }
+}
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 0f71fbbcd719..b9276498adff 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -18,13 +18,22 @@
package org.apache.paimon.format.parquet;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.variant.GenericVariant;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -34,12 +43,19 @@
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.data.serializer.InternalMapSerializer.convertToJavaMap;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** A parquet {@link FormatReadWriteTest}. */
public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
@@ -89,4 +105,250 @@ public void testEnableBloomFilter(boolean enabled) throws Exception {
}
}
}
+
+ @Test
+ public void testMapShreddingRoundTrip() throws Exception {
+ Options options = new Options();
+ options.set("map.shredding.columns", "labels");
+ options.set("map.shredding.maxKeys", "2");
+ options.set("map.shredding.maxInferBufferRow", "10");
+ ParquetFileFormat format =
+ new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "labels",
+ DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.STRING())));
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
+ writer.addElement(GenericRow.of(labels("job", "api", "namespace", "prod", "region", "us")));
+ writer.close();
+ out.close();
+
+ try (ParquetFileReader reader =
+ ParquetUtil.getParquetReader(
+ fileIO, file, fileIO.getFileSize(file), new Options())) {
+ assertThat(reader.getFooter().getFileMetaData().getSchema().containsField("labels"))
+ .isTrue();
+ assertThat(
+ reader.getFooter()
+ .getFileMetaData()
+ .getSchema()
+ .containsField("dynamic_column_labels_value_0"))
+ .isTrue();
+ assertThat(
+ reader.getFooter()
+ .getFileMetaData()
+ .getKeyValueMetaData()
+ .get("parquet.meta.dynamic.column.map.keys.of.labels")
+ .split(","))
+ .containsExactlyInAnyOrder("job", "namespace");
+ }
+
+ InternalMap logicalMap = readOne(format, rowType).getMap(0);
+ assertThat(toStringMap(logicalMap))
+ .containsExactlyInAnyOrderEntriesOf(
+ stringMap("job", "api", "namespace", "prod", "region", "us"));
+ }
+
+ @Test
+ public void testMapShreddingNonStringValueRoundTrip() throws Exception {
+ Options options = new Options();
+ options.set("map.shredding.columns", "headers");
+ options.set("map.shredding.maxKeys", "2");
+ options.set("map.shredding.maxInferBufferRow", "10");
+ ParquetFileFormat format =
+ new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "headers",
+ DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.INT())));
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
+ writer.addElement(GenericRow.of(intLabels("status", 200, "retry", 3, "region", 1)));
+ writer.close();
+ out.close();
+
+ InternalMap logicalMap = readOne(format, rowType).getMap(0);
+ assertThat(toIntMap(logicalMap))
+ .containsExactlyInAnyOrderEntriesOf(intMap("status", 200, "retry", 3, "region", 1));
+ }
+
+ @Test
+ public void testMapShreddingVariantValueRoundTrip() throws Exception {
+ Options options = new Options();
+ options.set("map.shredding.columns", "params");
+ options.set("map.shredding.maxKeys", "2");
+ options.set("map.shredding.maxInferBufferRow", "10");
+ ParquetFileFormat format =
+ new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "params",
+ DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.VARIANT())));
+
+ GenericVariant user = GenericVariant.fromJson("{\"id\":1,\"name\":\"alice\"}");
+ GenericVariant request = GenericVariant.fromJson("{\"id\":\"r1\"}");
+ GenericVariant extra = GenericVariant.fromJson("{\"debug\":true}");
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
+ writer.addElement(
+ GenericRow.of(variantLabels("user", user, "request", request, "extra", extra)));
+ writer.close();
+ out.close();
+
+ InternalMap logicalMap = readOne(format, rowType).getMap(0);
+ assertThat(toVariantMap(logicalMap))
+ .containsExactlyInAnyOrderEntriesOf(
+ variantMap("user", user, "request", request, "extra", extra));
+ }
+
+ @Test
+ public void testNestedMapShreddingRoundTrip() throws Exception {
+ Options options = new Options();
+ options.set("map.shredding.columns", "payload.labels");
+ options.set("map.shredding.maxKeys", "2");
+ options.set("map.shredding.maxInferBufferRow", "10");
+ ParquetFileFormat format =
+ new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ RowType payloadType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "labels",
+ DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.STRING())));
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "payload", payloadType));
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
+ writer.addElement(
+ GenericRow.of(
+ GenericRow.of(labels("job", "api", "namespace", "prod", "region", "us"))));
+ writer.close();
+ out.close();
+
+ try (ParquetFileReader reader =
+ ParquetUtil.getParquetReader(
+ fileIO, file, fileIO.getFileSize(file), new Options())) {
+ assertThat(
+ reader.getFooter()
+ .getFileMetaData()
+ .getSchema()
+ .getType("payload")
+ .asGroupType()
+ .containsField("dynamic_column_payload_labels_value_0"))
+ .isTrue();
+ assertThat(
+ reader.getFooter()
+ .getFileMetaData()
+ .getKeyValueMetaData()
+ .get("parquet.meta.dynamic.column.map.keys.of.payload.labels")
+ .split(","))
+ .containsExactlyInAnyOrder("job", "namespace");
+ }
+
+ InternalMap logicalMap = readOne(format, rowType).getRow(0, 1).getMap(0);
+ assertThat(toStringMap(logicalMap))
+ .containsExactlyInAnyOrderEntriesOf(
+ stringMap("job", "api", "namespace", "prod", "region", "us"));
+ }
+
+ private InternalRow readOne(FileFormat format, RowType rowType) throws Exception {
+ RecordReader reader =
+ format.createReaderFactory(rowType, rowType, new ArrayList<>())
+ .createReader(
+ new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)));
+ InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+ List result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+ assertThat(result).hasSize(1);
+ return result.get(0);
+ }
+
+ private static GenericMap labels(String... kvs) {
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < kvs.length; i += 2) {
+ map.put(BinaryString.fromString(kvs[i]), BinaryString.fromString(kvs[i + 1]));
+ }
+ return new GenericMap(map);
+ }
+
+ private static GenericMap intLabels(Object... kvs) {
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < kvs.length; i += 2) {
+ map.put(BinaryString.fromString((String) kvs[i]), (Integer) kvs[i + 1]);
+ }
+ return new GenericMap(map);
+ }
+
+ private static GenericMap variantLabels(Object... kvs) {
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < kvs.length; i += 2) {
+ map.put(BinaryString.fromString((String) kvs[i]), (GenericVariant) kvs[i + 1]);
+ }
+ return new GenericMap(map);
+ }
+
+ private static Map toStringMap(InternalMap map) {
+ Map