diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 05cc17f34e05..14eebf948563 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -867,6 +867,30 @@ Integer Level threshold of lookup to generate remote lookup files. Level files below this threshold will not generate remote lookup files. + +
map.shredding.columns
+ (none) + String + Comma-separated MAP<STRING, T> column paths to write with map shredding. + + +
map.shredding.maxInferBufferMemory
+ 32 mb + MemorySize + Maximum memory used to buffer rows while inferring map shredding keys. + + +
map.shredding.maxInferBufferRow
+ 10000 + Integer + Maximum number of rows to buffer for map shredding key inference. + + +
map.shredding.maxKeys
+ 64 + Integer + Maximum number of hot keys extracted for each map. +
manifest.compression
"zstd" diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 75eb4744dbf7..40f44d605d43 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -436,6 +436,33 @@ public InlineElement getDescription() { .defaultValue(4096) .withDescription("Maximum number of rows to buffer for schema inference."); + public static final ConfigOption MAP_SHREDDING_COLUMNS = + key("map.shredding.columns") + .stringType() + .noDefaultValue() + .withDescription( + "Comma-separated MAP column paths to write with map shredding."); + + public static final ConfigOption MAP_SHREDDING_MAX_KEYS = + key("map.shredding.maxKeys") + .intType() + .defaultValue(64) + .withDescription("Maximum number of hot keys extracted for each map."); + + public static final ConfigOption MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY = + key("map.shredding.maxInferBufferMemory") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(32)) + .withDescription( + "Maximum memory used to buffer rows while inferring map shredding keys."); + + public static final ConfigOption MAP_SHREDDING_MAX_INFER_BUFFER_ROW = + key("map.shredding.maxInferBufferRow") + .intType() + .defaultValue(10000) + .withDescription( + "Maximum number of rows to buffer for map shredding key inference."); + public static final ConfigOption MANIFEST_FORMAT = key("manifest.format") .stringType() diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingKeyExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingKeyExtractor.java new file mode 100644 index 000000000000..580bd2531a74 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingKeyExtractor.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.DataGetters; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.InternalRowToSizeVisitor; +import org.apache.paimon.types.RowType; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** Extract file-level hot keys for map shredding columns. */ +public class MapShreddingKeyExtractor { + + private final List columns; + private final InternalRowSerializer serializer; + private final int maxKeys; + private final long memoryLimit; + private final int rowLimit; + private final List bufferedRows; + private final Map> keyValueSizes; + private final List> rowFieldSizeVisitors; + + private boolean finished; + private long currentBufferedSize; + + public MapShreddingKeyExtractor(RowType rowType, Options options) { + this.columns = + MapShreddingUtils.resolveColumns( + rowType, + MapShreddingUtils.parseColumnPaths( + options.get(CoreOptions.MAP_SHREDDING_COLUMNS))); + this.serializer = new InternalRowSerializer(rowType); + this.maxKeys = options.get(CoreOptions.MAP_SHREDDING_MAX_KEYS); + this.memoryLimit = + options.get(CoreOptions.MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY).getBytes(); + this.rowLimit = options.get(CoreOptions.MAP_SHREDDING_MAX_INFER_BUFFER_ROW); + this.bufferedRows = new ArrayList<>(); + this.keyValueSizes = new LinkedHashMap<>(); + this.rowFieldSizeVisitors = createRowFieldSizeVisitors(rowType); + + if (columns.isEmpty() || maxKeys <= 0 || memoryLimit <= 0 || rowLimit <= 0) { + this.finished = true; + } else { + columns.forEach(column -> keyValueSizes.put(column.path(), new LinkedHashMap<>())); + } + } + + public boolean finished() { + return finished; + } + + public List bufferedRows() { + return bufferedRows; + } + + public void add(InternalRow row) { + if (finished) { + throw new IllegalStateException("MapShreddingKeyExtractor is already finished."); + } + + InternalRow copied = serializer.copy(row); + bufferedRows.add(copied); + currentBufferedSize += estimateRowSize(copied); + + for (MapShreddingUtils.ResolvedMapShreddingColumn column : columns) { + InternalMap map = MapShreddingUtils.extractMap(copied, column); + if (map == null) { + continue; + } + + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + BiFunction valueSizer = + column.valueType().accept(new InternalRowToSizeVisitor()); + Map sizes = keyValueSizes.get(column.path()); + for (int i = 0; i < map.size(); i++) { + if (keyArray.isNullAt(i)) { + continue; + } + BinaryString key = keyArray.getString(i); + long valueSize = valueSizer.apply(valueArray, i); + sizes.merge(key.toString(), valueSize, Long::sum); + } + } + + if (bufferedRows.size() >= rowLimit || currentBufferedSize >= memoryLimit) { + populate(); + } + } + + public Map> populate() { + Map> result = currentDynamicKeys(); + finished = true; + return result; + } + + public Map> finish() { + return finished ? currentDynamicKeys() : populate(); + } + + public Map> currentDynamicKeys() { + Map> result = new LinkedHashMap<>(); + keyValueSizes.forEach( + (path, sizes) -> + result.put( + path, + sizes.entrySet().stream() + .sorted( + Map.Entry.comparingByValue() + .reversed()) + .limit(maxKeys) + .map(Map.Entry::getKey) + .collect(Collectors.toList()))); + return result; + } + + private long estimateRowSize(InternalRow row) { + long size = 0L; + for (int i = 0; i < rowFieldSizeVisitors.size(); i++) { + size += rowFieldSizeVisitors.get(i).apply(row, i); + } + return size; + } + + private List> createRowFieldSizeVisitors( + RowType rowType) { + List> visitors = new ArrayList<>(); + for (DataType fieldType : rowType.getFieldTypes()) { + BiFunction visitor = + fieldType.accept(new InternalRowToSizeVisitor()); + visitors.add((row, pos) -> visitor.apply(row, pos)); + } + return visitors; + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingUtils.java new file mode 100644 index 000000000000..9ae541f1cde5 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/MapShreddingUtils.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +/** Utilities shared by map shredding writer and reader. */ +public class MapShreddingUtils { + + public static final String DYNAMIC_VALUE_PREFIX = "value_"; + public static final String DYNAMIC_COLUMN_NAME = "dynamic_column"; + public static final String FOOTER_METADATA_PREFIX = "parquet.meta.dynamic.column.map.keys.of."; + + private MapShreddingUtils() {} + + public static boolean isMapShreddingEnabled(Options options) { + String configured = options.get(CoreOptions.MAP_SHREDDING_COLUMNS); + return configured != null && !configured.trim().isEmpty(); + } + + public static List parseColumnPaths(@Nullable String configuredColumns) { + if (configuredColumns == null || configuredColumns.trim().isEmpty()) { + return Collections.emptyList(); + } + + return Arrays.stream(configuredColumns.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .distinct() + .collect(Collectors.toList()); + } + + public static List resolveColumns( + RowType rowType, List paths) { + List resolved = new ArrayList<>(paths.size()); + for (String path : paths) { + resolved.add(resolveColumn(rowType, path)); + } + return resolved; + } + + public static ResolvedMapShreddingColumn resolveColumn(RowType rowType, String path) { + String[] segments = path.split("\\."); + if (segments.length == 0) { + throw new IllegalArgumentException("Invalid map shredding column path: " + path); + } + + RowType current = rowType; + int[] positions = new int[segments.length]; + int[] nestedRowFieldCounts = new int[Math.max(segments.length - 1, 0)]; + for (int i = 0; i < segments.length; i++) { + String segment = segments[i]; + if (!current.containsField(segment)) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Cannot resolve map shredding column '%s' from row type '%s'.", + path, + rowType)); + } + + positions[i] = current.getFieldIndex(segment); + DataField field = current.getField(segment); + DataType fieldType = field.type(); + if (i < segments.length - 1) { + if (!(fieldType instanceof RowType)) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Map shredding column '%s' expects ROW on intermediate path '%s' but found '%s'.", + path, + segment, + fieldType)); + } + nestedRowFieldCounts[i] = ((RowType) fieldType).getFieldCount(); + current = (RowType) fieldType; + continue; + } + + if (!(fieldType instanceof MapType)) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Map shredding column '%s' must be MAP but found '%s'.", + path, + fieldType)); + } + + MapType mapType = (MapType) fieldType; + if (!isStringType(mapType.getKeyType())) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Map shredding column '%s' must use STRING key type but found '%s'.", + path, + mapType.getKeyType())); + } + return new ResolvedMapShreddingColumn( + path, segments, positions, nestedRowFieldCounts, mapType); + } + throw new IllegalArgumentException("Invalid map shredding column path: " + path); + } + + public static boolean isMapShreddingPath(Map> dynamicKeys, String path) { + List keys = dynamicKeys.get(path); + return keys != null && !keys.isEmpty(); + } + + public static String sidecarColumnName(String mapColumn, int keyOrdinal) { + return DYNAMIC_COLUMN_NAME + + "_" + + sanitize(mapColumn) + + "_" + + DYNAMIC_VALUE_PREFIX + + keyOrdinal; + } + + public static String path(String parentPath, String fieldName) { + return parentPath == null || parentPath.isEmpty() + ? fieldName + : parentPath + "." + fieldName; + } + + public static Map toFooterMetadata(Map> dynamicKeys) { + Map metadata = new LinkedHashMap<>(); + dynamicKeys.forEach( + (path, keys) -> { + if (!keys.isEmpty()) { + metadata.put(FOOTER_METADATA_PREFIX + path, String.join(",", keys)); + } + }); + return metadata; + } + + public static Map> fromFooterMetadata(Map keyValueMeta) { + Map> result = new LinkedHashMap<>(); + keyValueMeta.forEach( + (key, value) -> { + if (key.startsWith(FOOTER_METADATA_PREFIX)) { + String path = key.substring(FOOTER_METADATA_PREFIX.length()); + result.put(path, parseColumnPaths(value)); + } + }); + return result; + } + + @Nullable + public static InternalMap extractMap(InternalRow row, ResolvedMapShreddingColumn column) { + InternalRow current = row; + for (int i = 0; i < column.fieldPositions.length - 1; i++) { + int fieldPos = column.fieldPositions[i]; + if (current.isNullAt(fieldPos)) { + return null; + } + current = current.getRow(fieldPos, column.nestedRowFieldCounts[i]); + } + int mapPos = column.fieldPositions[column.fieldPositions.length - 1]; + if (current.isNullAt(mapPos)) { + return null; + } + return current.getMap(mapPos); + } + + private static String sanitize(String value) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) { + builder.append(c); + } else { + builder.append('_'); + } + } + return builder.toString(); + } + + private static boolean isStringType(DataType dataType) { + DataTypeRoot root = dataType.getTypeRoot(); + return root == DataTypeRoot.VARCHAR || root == DataTypeRoot.CHAR; + } + + /** Resolved map shredding logical column path. */ + public static class ResolvedMapShreddingColumn { + + private final String path; + private final String[] fieldNames; + private final int[] fieldPositions; + private final int[] nestedRowFieldCounts; + private final MapType mapType; + + public ResolvedMapShreddingColumn( + String path, + String[] fieldNames, + int[] fieldPositions, + int[] nestedRowFieldCounts, + MapType mapType) { + this.path = path; + this.fieldNames = fieldNames; + this.fieldPositions = fieldPositions; + this.nestedRowFieldCounts = nestedRowFieldCounts; + this.mapType = mapType; + } + + public String path() { + return path; + } + + public String[] fieldNames() { + return fieldNames; + } + + public int[] fieldPositions() { + return fieldPositions; + } + + public int[] nestedRowFieldCounts() { + return nestedRowFieldCounts; + } + + public MapType mapType() { + return mapType; + } + + public DataType valueType() { + return mapType.getValueType(); + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java index 9ccf88bd17ac..7584895dd0d4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java @@ -69,13 +69,18 @@ public FormatReaderFactory createReaderFactory( RowType projectedRowType, @Nullable List filters) { return new ParquetReaderFactory( - options, projectedRowType, readBatchSize, ParquetFilters.convert(filters)); + options, + dataSchemaRowType, + projectedRowType, + readBatchSize, + ParquetFilters.convert(filters)); } @Override public FormatWriterFactory createWriterFactory(RowType type) { ParquetWriterFactory baseFactory = - new ParquetWriterFactory(new RowDataParquetBuilder(type, options)); + new ParquetWriterFactory( + new RowDataParquetBuilder(type, options, formatContext.options())); // Wrap with variant inference decorator return new VariantInferenceWriterFactory( baseFactory, new VariantInferenceConfig(type, formatContext.options())); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 9a741a63ed73..a99f1096b427 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,6 +65,9 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.paimon.data.variant.VariantMetadataUtils.path; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType; @@ -98,6 +102,15 @@ public class ParquetReaderFactory implements FormatReaderFactory { public ParquetReaderFactory( Options conf, RowType readType, int batchSize, @Nullable FilterCompat.Filter filter) { + this(conf, readType, readType, batchSize, filter); + } + + public ParquetReaderFactory( + Options conf, + RowType dataType, + RowType readType, + int batchSize, + @Nullable FilterCompat.Filter filter) { this.conf = conf; this.readFields = readType.getFields().toArray(new DataField[0]); this.batchSize = batchSize; @@ -124,7 +137,10 @@ public FileRecordReader createReader(FormatReaderFactory.Context co builder.build(), context.selection()); MessageType fileSchema = reader.getFileMetaData().getSchema(); - RequestedSchema requestedSchema = getOrCreateRequestedSchema(fileSchema); + Map> dynamicMapKeys = + MapShreddingUtils.fromFooterMetadata( + reader.getFileMetaData().getKeyValueMetaData()); + RequestedSchema requestedSchema = getOrCreateRequestedSchema(fileSchema, dynamicMapKeys); if (LOG.isDebugEnabled()) { LOG.debug( @@ -147,26 +163,33 @@ public FileRecordReader createReader(FormatReaderFactory.Context co context.fileIO()); } - private RequestedSchema getOrCreateRequestedSchema(MessageType fileSchema) { + private RequestedSchema getOrCreateRequestedSchema( + MessageType fileSchema, Map> dynamicMapKeys) { // clipParquetSchema and buildFieldsList are pure functions of (readFields, fileSchema). // Cache the result keyed by fileSchema so that files sharing the same on-disk schema // within this factory instance avoid redundant computation. Keying by fileSchema (rather // than a simple "compute once" flag) correctly handles edge cases where different files // read by the same factory instance may have different on-disk schemas, e.g. externally // migrated Parquet files. - return requestedSchemaCache.computeIfAbsent(fileSchema, this::createRequestedSchema); + if (!dynamicMapKeys.isEmpty()) { + return createRequestedSchema(fileSchema, dynamicMapKeys); + } + return requestedSchemaCache.computeIfAbsent( + fileSchema, schema -> createRequestedSchema(schema, dynamicMapKeys)); } - private RequestedSchema createRequestedSchema(MessageType fileSchema) { - MessageType rs = clipParquetSchema(fileSchema); + private RequestedSchema createRequestedSchema( + MessageType fileSchema, Map> dynamicMapKeys) { + MessageType rs = clipParquetSchema(fileSchema, dynamicMapKeys); MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(rs); - List f = buildFieldsList(readFields, columnIO, rs); + List f = buildFieldsList(readFields, columnIO, rs, dynamicMapKeys); return new RequestedSchema(rs, f); } /** Clips `parquetSchema` according to `fieldNames`. */ - private MessageType clipParquetSchema(GroupType parquetSchema) { - Type[] types = new Type[readFields.length]; + private MessageType clipParquetSchema( + GroupType parquetSchema, Map> dynamicMapKeys) { + List types = new ArrayList<>(); for (int i = 0; i < readFields.length; ++i) { String fieldName = readFields[i].name(); if (!parquetSchema.containsField(fieldName)) { @@ -174,18 +197,39 @@ private MessageType clipParquetSchema(GroupType parquetSchema) { "{} does not exist in {}, will fill the field with null.", fieldName, parquetSchema); - types[i] = ParquetSchemaConverter.convertToParquetType(readFields[i]); + types.add(ParquetSchemaConverter.convertToParquetType(readFields[i])); } else { Type parquetType = parquetSchema.getType(fieldName); - types[i] = clipParquetType(readFields[i].type(), parquetType); + types.add( + clipParquetType( + readFields[i].type(), parquetType, fieldName, dynamicMapKeys)); + if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, fieldName)) { + List keys = dynamicMapKeys.get(fieldName); + for (int keyOrdinal = 0; keyOrdinal < keys.size(); keyOrdinal++) { + String sidecarName = + MapShreddingUtils.sidecarColumnName(fieldName, keyOrdinal); + if (parquetSchema.containsField(sidecarName)) { + types.add(parquetSchema.getType(sidecarName)); + } + } + } } } - return Types.buildMessage().addFields(types).named(PAIMON_SCHEMA); + return Types.buildMessage().addFields(types.toArray(new Type[0])).named(PAIMON_SCHEMA); } /** Clips `parquetType` by `readType`. */ private Type clipParquetType(DataType readType, Type parquetType) { + return clipParquetType( + readType, parquetType, parquetType.getName(), Collections.emptyMap()); + } + + private Type clipParquetType( + DataType readType, + Type parquetType, + String path, + Map> dynamicMapKeys) { switch (readType.getTypeRoot()) { case ROW: RowType rowType = (RowType) readType; @@ -196,9 +240,21 @@ private Type clipParquetType(DataType readType, Type parquetType) { List rowGroupFields = new ArrayList<>(); for (DataField field : rowType.getFields()) { String fieldName = field.name(); + String childPath = MapShreddingUtils.path(path, fieldName); if (rowGroup.containsField(fieldName)) { Type type = rowGroup.getType(fieldName); - rowGroupFields.add(clipParquetType(field.type(), type)); + rowGroupFields.add( + clipParquetType(field.type(), type, childPath, dynamicMapKeys)); + if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, childPath)) { + List keys = dynamicMapKeys.get(childPath); + for (int keyOrdinal = 0; keyOrdinal < keys.size(); keyOrdinal++) { + String sidecarName = + MapShreddingUtils.sidecarColumnName(childPath, keyOrdinal); + if (rowGroup.containsField(sidecarName)) { + rowGroupFields.add(rowGroup.getType(sidecarName)); + } + } + } } else { // todo: support nested field missing throw new RuntimeException("field " + fieldName + " is missing"); @@ -217,8 +273,16 @@ private Type clipParquetType(DataType readType, Type parquetType) { mapGroup.getRepetition(), mapGroup.getName(), mapGroup.getType(0).getName(), - clipParquetType(mapType.getKeyType(), keyValueType.getLeft()), - clipParquetType(mapType.getValueType(), keyValueType.getRight())); + clipParquetType( + mapType.getKeyType(), + keyValueType.getLeft(), + path + "." + MAP_KEY_NAME, + dynamicMapKeys), + clipParquetType( + mapType.getValueType(), + keyValueType.getRight(), + path + "." + MAP_VALUE_NAME, + dynamicMapKeys)); case ARRAY: ArrayType arrayType = (ArrayType) readType; GroupType arrayGroup = (GroupType) parquetType; @@ -232,7 +296,10 @@ private Type clipParquetType(DataType readType, Type parquetType) { int level = arrayGroup.getType(0) instanceof GroupType ? 3 : 2; Type elementType = clipParquetType( - arrayType.getElementType(), parquetListElementType(arrayGroup)); + arrayType.getElementType(), + parquetListElementType(arrayGroup), + path + "." + LIST_ELEMENT_NAME, + dynamicMapKeys); if (level == 3) { // In case that the name in middle level is not "list". diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 37a69fe9aebd..e77c67dc99d9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -42,6 +42,7 @@ import org.apache.parquet.schema.Types; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; @@ -63,18 +64,75 @@ public static MessageType convertToParquetMessageType(RowType rowType) { return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType)); } + public static MessageType convertToParquetMessageType( + RowType rowType, Map> dynamicMapKeys) { + if (dynamicMapKeys.isEmpty()) { + return convertToParquetMessageType(rowType); + } + return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType, dynamicMapKeys)); + } + public static Type[] convertToParquetTypes(RowType rowType) { return rowType.getFields().stream() .map(ParquetSchemaConverter::convertToParquetType) .toArray(Type[]::new); } + public static Type[] convertToParquetTypes( + RowType rowType, Map> dynamicMapKeys) { + return convertToParquetTypes(rowType, dynamicMapKeys, "", 0); + } + + private static Type[] convertToParquetTypes( + RowType rowType, + Map> dynamicMapKeys, + String parentPath, + int depth) { + List types = new java.util.ArrayList<>(); + for (DataField field : rowType.getFields()) { + String path = MapShreddingUtils.path(parentPath, field.name()); + types.add( + convertToParquetType( + field.name(), field.type(), field.id(), depth, path, dynamicMapKeys)); + if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, path)) { + MapType mapType = (MapType) field.type(); + List keys = dynamicMapKeys.get(path); + for (int i = 0; i < keys.size(); i++) { + types.add( + convertToParquetType( + MapShreddingUtils.sidecarColumnName(path, i), + mapType.getValueType(), + SpecialFields.getMapValueFieldId(field.id(), 1) + + 1024 + + i, + 0) + .withId( + SpecialFields.getMapValueFieldId(field.id(), 1) + + 1024 + + i)); + } + } + } + return types.toArray(new Type[0]); + } + /** Convert paimon {@link DataField} to parquet {@link Type}. */ public static Type convertToParquetType(DataField field) { return convertToParquetType(field.name(), field.type(), field.id(), 0); } public static Type convertToParquetType(String name, DataType type, int fieldId, int depth) { + return convertToParquetType( + name, type, fieldId, depth, name, java.util.Collections.emptyMap()); + } + + private static Type convertToParquetType( + String name, + DataType type, + int fieldId, + int depth, + String path, + Map> dynamicMapKeys) { Type.Repetition repetition = type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; switch (type.getTypeRoot()) { @@ -165,7 +223,9 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, LIST_ELEMENT_NAME, arrayType.getElementType(), fieldId, - depth + 1) + depth + 1, + path + "." + LIST_ELEMENT_NAME, + dynamicMapKeys) .withId(SpecialFields.getArrayElementFieldId(fieldId, depth + 1)); return ConversionPatterns.listOfElements(repetition, name, elementParquetType) .withId(fieldId); @@ -178,11 +238,22 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, keyType = keyType.copy(false); } Type mapKeyParquetType = - convertToParquetType(MAP_KEY_NAME, keyType, fieldId, depth + 1) + convertToParquetType( + MAP_KEY_NAME, + keyType, + fieldId, + depth + 1, + path + "." + MAP_KEY_NAME, + dynamicMapKeys) .withId(SpecialFields.getMapKeyFieldId(fieldId, depth + 1)); Type mapValueParquetType = convertToParquetType( - MAP_VALUE_NAME, mapType.getValueType(), fieldId, depth + 1) + MAP_VALUE_NAME, + mapType.getValueType(), + fieldId, + depth + 1, + path + "." + MAP_VALUE_NAME, + dynamicMapKeys) .withId(SpecialFields.getMapValueFieldId(fieldId, depth + 1)); return ConversionPatterns.mapType( repetition, @@ -200,10 +271,22 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, elementType = elementType.copy(false); } Type multisetKeyParquetType = - convertToParquetType(MAP_KEY_NAME, elementType, fieldId, depth + 1) + convertToParquetType( + MAP_KEY_NAME, + elementType, + fieldId, + depth + 1, + path + "." + MAP_KEY_NAME, + dynamicMapKeys) .withId(SpecialFields.getMapKeyFieldId(fieldId, depth + 1)); Type multisetValueParquetType = - convertToParquetType(MAP_VALUE_NAME, new IntType(false), fieldId, depth + 1) + convertToParquetType( + MAP_VALUE_NAME, + new IntType(false), + fieldId, + depth + 1, + path + "." + MAP_VALUE_NAME, + dynamicMapKeys) .withId(SpecialFields.getMapValueFieldId(fieldId, depth + 1)); return ConversionPatterns.mapType( repetition, @@ -221,7 +304,7 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, // LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)); // } return groupTypeBuilder - .addFields(convertToParquetTypes(rowType)) + .addFields(convertToParquetTypes(rowType, dynamicMapKeys, path, depth + 1)) .named(name) .withId(fieldId); case VARIANT: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java index 282805897a5f..93c3e6927089 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java @@ -52,11 +52,15 @@ public ParquetWriterFactory(ParquetBuilder writerBuilder) { @Override public FormatWriter create(PositionOutputStream stream, String compression) throws IOException { - final OutputFile out = new StreamOutputFile(stream); if (HadoopCompressionType.NONE.value().equals(compression)) { compression = null; } + if (writerBuilder instanceof RowDataParquetBuilder) { + return ((RowDataParquetBuilder) writerBuilder).createFormatWriter(stream, compression); + } + + final OutputFile out = new StreamOutputFile(stream); final ParquetWriter writer = writerBuilder.createWriter(out, compression); return new ParquetBulkWriter(writer); } @@ -65,7 +69,6 @@ public FormatWriter create(PositionOutputStream stream, String compression) thro public FormatWriter createWithShreddingSchema( PositionOutputStream stream, String compression, RowType inferredShreddingSchema) throws IOException { - final OutputFile out = new StreamOutputFile(stream); if (HadoopCompressionType.NONE.value().equals(compression)) { compression = null; } @@ -73,6 +76,11 @@ public FormatWriter createWithShreddingSchema( ParquetBuilder newBuilder = ((RowDataParquetBuilder) writerBuilder) .withShreddingSchemas(inferredShreddingSchema); + if (newBuilder instanceof RowDataParquetBuilder) { + return ((RowDataParquetBuilder) newBuilder).createFormatWriter(stream, compression); + } + + final OutputFile out = new StreamOutputFile(stream); final ParquetWriter writer = newBuilder.createWriter(out, compression); return new ParquetBulkWriter(writer); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java index 37fc4272c6d3..8878b35a10d2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetColumnVector.java @@ -18,17 +18,59 @@ package org.apache.paimon.format.parquet.reader; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.columnar.ArrayColumnVector; +import org.apache.paimon.data.columnar.BooleanColumnVector; +import org.apache.paimon.data.columnar.ByteColumnVector; +import org.apache.paimon.data.columnar.BytesColumnVector; +import org.apache.paimon.data.columnar.ColumnVector; +import org.apache.paimon.data.columnar.DecimalColumnVector; +import org.apache.paimon.data.columnar.DoubleColumnVector; +import org.apache.paimon.data.columnar.FloatColumnVector; +import org.apache.paimon.data.columnar.IntColumnVector; +import org.apache.paimon.data.columnar.LongColumnVector; +import org.apache.paimon.data.columnar.MapColumnVector; +import org.apache.paimon.data.columnar.RowColumnVector; +import org.apache.paimon.data.columnar.ShortColumnVector; +import org.apache.paimon.data.columnar.TimestampColumnVector; import org.apache.paimon.data.columnar.heap.AbstractArrayBasedVector; import org.apache.paimon.data.columnar.heap.CastedRowColumnVector; +import org.apache.paimon.data.columnar.heap.HeapArrayVector; +import org.apache.paimon.data.columnar.heap.HeapBytesVector; import org.apache.paimon.data.columnar.heap.HeapIntVector; +import org.apache.paimon.data.columnar.heap.HeapMapVector; +import org.apache.paimon.data.columnar.heap.HeapRowVector; +import org.apache.paimon.data.columnar.writable.WritableBooleanVector; +import org.apache.paimon.data.columnar.writable.WritableByteVector; +import org.apache.paimon.data.columnar.writable.WritableBytesVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.data.columnar.writable.WritableDoubleVector; +import org.apache.paimon.data.columnar.writable.WritableFloatVector; import org.apache.paimon.data.columnar.writable.WritableIntVector; +import org.apache.paimon.data.columnar.writable.WritableLongVector; +import org.apache.paimon.data.columnar.writable.WritableShortVector; +import org.apache.paimon.data.columnar.writable.WritableTimestampVector; +import org.apache.paimon.data.variant.GenericVariant; import org.apache.paimon.data.variant.PaimonShreddingUtils; import org.apache.paimon.data.variant.PaimonShreddingUtils.FieldToExtract; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.data.variant.VariantSchema; +import org.apache.paimon.format.parquet.type.MapShreddingField; import org.apache.paimon.format.parquet.type.ParquetField; import org.apache.paimon.format.parquet.type.ParquetGroupField; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeChecks; import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; @@ -97,6 +139,28 @@ public class ParquetColumnVector { PaimonShreddingUtils.getFieldsToExtract(column.getType(), variantSchema); repetitionLevels = contentVector.repetitionLevels; definitionLevels = contentVector.definitionLevels; + } else if (column instanceof MapShreddingField) { + MapShreddingField mapField = (MapShreddingField) column; + WritableColumnVector residualMap = + ParquetReaderUtil.createWritableColumnVector(capacity, column.getType()); + ParquetColumnVector residualVector = + new ParquetColumnVector( + mapField.residualMapField(), + residualMap, + capacity, + missingColumns, + false); + children.add(residualVector); + for (ParquetField sidecarField : mapField.sidecarFields()) { + WritableColumnVector sidecar = + ParquetReaderUtil.createWritableColumnVector( + capacity, sidecarField.getType()); + children.add( + new ParquetColumnVector( + sidecarField, sidecar, capacity, missingColumns, true)); + } + repetitionLevels = residualVector.repetitionLevels; + definitionLevels = residualVector.definitionLevels; } else if (isPrimitive) { if (column.getRepetitionLevel() > 0) { repetitionLevels = new HeapIntVector(capacity); @@ -184,6 +248,11 @@ void assemble() { return; } + if (column instanceof MapShreddingField) { + assembleMapShredding((MapShreddingField) column); + return; + } + // nothing to do if the column itself is missing if (vector.isAllNull()) { return; @@ -356,6 +425,413 @@ private void assembleStruct() { vector.addElementsAppended(rowId); } + private void assembleMapShredding(MapShreddingField mapField) { + for (ParquetColumnVector child : children) { + child.assemble(); + } + + HeapMapVector residualMapVector = (HeapMapVector) children.get(0).getValueVector(); + HeapMapVector targetMapVector = (HeapMapVector) vector; + HeapBytesVector targetKeys = (HeapBytesVector) targetMapVector.getKeys(); + WritableColumnVector targetValues = (WritableColumnVector) targetMapVector.getValues(); + MapType mapType = (MapType) mapField.getType(); + + int rowCount = residualMapVector.getElementsAppended(); + for (int rowId = 0; rowId < rowCount; rowId++) { + InternalMap residualMap = residualMapVector.getMap(rowId); + int sidecarValueCount = sidecarValueCount(rowId); + if (residualMapVector.isNullAt(rowId) && sidecarValueCount == 0) { + targetMapVector.appendNull(); + continue; + } + + int residualSize = residualMapVector.isNullAt(rowId) ? 0 : residualMap.size(); + targetMapVector.appendArray(residualSize + sidecarValueCount); + if (residualSize > 0) { + appendResidualMap(residualMap, mapType, targetKeys, targetValues); + } + appendSidecars(rowId, mapField, mapType, targetKeys, targetValues); + } + } + + private int sidecarValueCount(int rowId) { + int count = 0; + for (int i = 1; i < children.size(); i++) { + if (!children.get(i).getValueVector().isNullAt(rowId)) { + count++; + } + } + return count; + } + + private void appendResidualMap( + InternalMap residualMap, + MapType mapType, + HeapBytesVector targetKeys, + WritableColumnVector targetValues) { + InternalArray keyArray = residualMap.keyArray(); + InternalArray valueArray = residualMap.valueArray(); + for (int i = 0; i < residualMap.size(); i++) { + appendString(targetKeys, keyArray.getString(i)); + appendInternalArrayValue(valueArray, i, mapType.getValueType(), targetValues); + } + } + + private void appendSidecars( + int rowId, + MapShreddingField mapField, + MapType mapType, + HeapBytesVector targetKeys, + WritableColumnVector targetValues) { + for (int i = 1; i < children.size(); i++) { + WritableColumnVector sidecarVector = children.get(i).getValueVector(); + if (!sidecarVector.isNullAt(rowId)) { + appendString(targetKeys, mapField.sidecarKeys().get(i - 1)); + appendColumnVectorValue(sidecarVector, rowId, mapType.getValueType(), targetValues); + } + } + } + + private static void appendString(HeapBytesVector vector, BinaryString value) { + byte[] bytes = value.toBytes(); + vector.appendByteArray(bytes, 0, bytes.length); + } + + private static void appendColumnVectorValue( + ColumnVector source, int sourcePos, DataType type, WritableColumnVector target) { + if (source.isNullAt(sourcePos)) { + target.appendNull(); + return; + } + + switch (type.getTypeRoot()) { + case BOOLEAN: + ((WritableBooleanVector) target) + .appendBoolean(((BooleanColumnVector) source).getBoolean(sourcePos)); + return; + case TINYINT: + ((WritableByteVector) target) + .appendByte(((ByteColumnVector) source).getByte(sourcePos)); + return; + case SMALLINT: + ((WritableShortVector) target) + .appendShort(((ShortColumnVector) source).getShort(sourcePos)); + return; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + ((WritableIntVector) target) + .appendInt(((IntColumnVector) source).getInt(sourcePos)); + return; + case BIGINT: + ((WritableLongVector) target) + .appendLong(((LongColumnVector) source).getLong(sourcePos)); + return; + case FLOAT: + ((WritableFloatVector) target) + .appendFloat(((FloatColumnVector) source).getFloat(sourcePos)); + return; + case DOUBLE: + ((WritableDoubleVector) target) + .appendDouble(((DoubleColumnVector) source).getDouble(sourcePos)); + return; + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + case BLOB: + BytesColumnVector.Bytes bytes = ((BytesColumnVector) source).getBytes(sourcePos); + ((WritableBytesVector) target).appendByteArray(bytes.data, bytes.offset, bytes.len); + return; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + if (source instanceof DecimalColumnVector) { + appendDecimal( + ((DecimalColumnVector) source) + .getDecimal( + sourcePos, + decimalType.getPrecision(), + decimalType.getScale()), + decimalType, + target); + } else if (decimalType.getPrecision() <= 9) { + ((WritableIntVector) target) + .appendInt(((IntColumnVector) source).getInt(sourcePos)); + } else if (decimalType.getPrecision() <= 18) { + ((WritableLongVector) target) + .appendLong(((LongColumnVector) source).getLong(sourcePos)); + } else { + BytesColumnVector.Bytes decimalBytes = + ((BytesColumnVector) source).getBytes(sourcePos); + ((WritableBytesVector) target) + .appendByteArray( + decimalBytes.data, decimalBytes.offset, decimalBytes.len); + } + return; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + int precision = DataTypeChecks.getPrecision(type); + if (source instanceof TimestampColumnVector) { + appendTimestamp( + type, + ((TimestampColumnVector) source).getTimestamp(sourcePos, precision), + target); + } else if (precision <= 6 && source instanceof LongColumnVector) { + ((WritableLongVector) target) + .appendLong(((LongColumnVector) source).getLong(sourcePos)); + } else { + throw new UnsupportedOperationException( + "Unsupported timestamp vector for map shredding: " + source); + } + return; + case ARRAY: + appendInternalValue(((ArrayColumnVector) source).getArray(sourcePos), type, target); + return; + case MAP: + case MULTISET: + appendInternalValue(((MapColumnVector) source).getMap(sourcePos), type, target); + return; + case ROW: + appendInternalValue(((RowColumnVector) source).getRow(sourcePos), type, target); + return; + case VARIANT: + InternalRow variantRow = ((RowColumnVector) source).getRow(sourcePos); + appendInternalValue( + new GenericVariant(variantRow.getBinary(0), variantRow.getBinary(1)), + type, + target); + return; + default: + throw new UnsupportedOperationException( + "Unsupported map shredding value type: " + type); + } + } + + private static void appendInternalArrayValue( + InternalArray array, int sourcePos, DataType type, WritableColumnVector target) { + if (array.isNullAt(sourcePos)) { + target.appendNull(); + return; + } + + switch (type.getTypeRoot()) { + case BOOLEAN: + ((WritableBooleanVector) target).appendBoolean(array.getBoolean(sourcePos)); + return; + case TINYINT: + ((WritableByteVector) target).appendByte(array.getByte(sourcePos)); + return; + case SMALLINT: + ((WritableShortVector) target).appendShort(array.getShort(sourcePos)); + return; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + ((WritableIntVector) target).appendInt(array.getInt(sourcePos)); + return; + case BIGINT: + ((WritableLongVector) target).appendLong(array.getLong(sourcePos)); + return; + case FLOAT: + ((WritableFloatVector) target).appendFloat(array.getFloat(sourcePos)); + return; + case DOUBLE: + ((WritableDoubleVector) target).appendDouble(array.getDouble(sourcePos)); + return; + case CHAR: + case VARCHAR: + appendString((HeapBytesVector) target, array.getString(sourcePos)); + return; + case BINARY: + case VARBINARY: + case BLOB: + byte[] bytes = array.getBinary(sourcePos); + ((WritableBytesVector) target).appendByteArray(bytes, 0, bytes.length); + return; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + appendDecimal( + array.getDecimal( + sourcePos, decimalType.getPrecision(), decimalType.getScale()), + decimalType, + target); + return; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + appendTimestamp( + type, + array.getTimestamp(sourcePos, DataTypeChecks.getPrecision(type)), + target); + return; + case ARRAY: + appendInternalValue(array.getArray(sourcePos), type, target); + return; + case MAP: + case MULTISET: + appendInternalValue(array.getMap(sourcePos), type, target); + return; + case ROW: + appendInternalValue( + array.getRow(sourcePos, ((RowType) type).getFieldCount()), type, target); + return; + case VARIANT: + appendInternalValue(array.getVariant(sourcePos), type, target); + return; + default: + throw new UnsupportedOperationException( + "Unsupported map shredding value type: " + type); + } + } + + private static void appendInternalValue( + Object value, DataType type, WritableColumnVector target) { + if (value == null) { + target.appendNull(); + return; + } + + switch (type.getTypeRoot()) { + case BOOLEAN: + ((WritableBooleanVector) target).appendBoolean((boolean) value); + return; + case TINYINT: + ((WritableByteVector) target).appendByte((byte) value); + return; + case SMALLINT: + ((WritableShortVector) target).appendShort((short) value); + return; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + ((WritableIntVector) target).appendInt((int) value); + return; + case BIGINT: + ((WritableLongVector) target).appendLong((long) value); + return; + case FLOAT: + ((WritableFloatVector) target).appendFloat((float) value); + return; + case DOUBLE: + ((WritableDoubleVector) target).appendDouble((double) value); + return; + case CHAR: + case VARCHAR: + appendString((HeapBytesVector) target, (BinaryString) value); + return; + case BINARY: + case VARBINARY: + case BLOB: + byte[] bytes = (byte[]) value; + ((WritableBytesVector) target).appendByteArray(bytes, 0, bytes.length); + return; + case DECIMAL: + appendDecimal((Decimal) value, (DecimalType) type, target); + return; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + appendTimestamp(type, (Timestamp) value, target); + return; + case ARRAY: + appendInternalArray( + (InternalArray) value, (ArrayType) type, (HeapArrayVector) target); + return; + case MAP: + appendInternalMap((InternalMap) value, (MapType) type, (HeapMapVector) target); + return; + case MULTISET: + appendInternalMap( + (InternalMap) value, + new MapType(((MultisetType) type).getElementType(), new IntType(false)), + (HeapMapVector) target); + return; + case ROW: + appendInternalRow((InternalRow) value, (RowType) type, (HeapRowVector) target); + return; + case VARIANT: + appendVariant((Variant) value, (HeapRowVector) target); + return; + default: + throw new UnsupportedOperationException( + "Unsupported map shredding value type: " + type); + } + } + + private static void appendInternalArray( + InternalArray array, ArrayType type, HeapArrayVector target) { + WritableColumnVector child = (WritableColumnVector) target.getChildren()[0]; + target.appendArray(array.size()); + for (int i = 0; i < array.size(); i++) { + appendInternalArrayValue(array, i, type.getElementType(), child); + } + } + + private static void appendInternalMap(InternalMap map, MapType type, HeapMapVector target) { + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + WritableColumnVector keyVector = (WritableColumnVector) target.getKeys(); + WritableColumnVector valueVector = (WritableColumnVector) target.getValues(); + target.appendArray(map.size()); + for (int i = 0; i < map.size(); i++) { + appendInternalArrayValue(keyArray, i, type.getKeyType(), keyVector); + appendInternalArrayValue(valueArray, i, type.getValueType(), valueVector); + } + } + + private static void appendInternalRow(InternalRow row, RowType type, HeapRowVector target) { + target.appendRow(); + for (int i = 0; i < type.getFieldCount(); i++) { + WritableColumnVector child = (WritableColumnVector) target.getChildren()[i]; + DataType fieldType = type.getTypeAt(i); + if (row.isNullAt(i)) { + child.appendNull(); + } else { + appendInternalValue( + InternalRow.createFieldGetter(fieldType, i).getFieldOrNull(row), + fieldType, + child); + } + } + } + + private static void appendVariant(Variant variant, HeapRowVector target) { + target.appendRow(); + byte[] value = variant.value(); + byte[] metadata = variant.metadata(); + ((WritableBytesVector) target.getChildren()[0]).appendByteArray(value, 0, value.length); + ((WritableBytesVector) target.getChildren()[1]) + .appendByteArray(metadata, 0, metadata.length); + } + + private static void appendDecimal( + Decimal decimal, DecimalType type, WritableColumnVector target) { + if (decimal == null) { + target.appendNull(); + } else if (type.getPrecision() <= 9) { + ((WritableIntVector) target).appendInt((int) decimal.toUnscaledLong()); + } else if (type.getPrecision() <= 18) { + ((WritableLongVector) target).appendLong(decimal.toUnscaledLong()); + } else { + byte[] bytes = decimal.toUnscaledBytes(); + ((WritableBytesVector) target).appendByteArray(bytes, 0, bytes.length); + } + } + + private static void appendTimestamp( + DataType type, Timestamp timestamp, WritableColumnVector target) { + if (timestamp == null) { + target.appendNull(); + } else if (target instanceof WritableTimestampVector) { + ((WritableTimestampVector) target).appendTimestamp(timestamp); + } else if (DataTypeChecks.getPrecision(type) <= 3) { + ((WritableLongVector) target).appendLong(timestamp.getMillisecond()); + } else if (DataTypeChecks.getPrecision(type) <= 6) { + ((WritableLongVector) target).appendLong(timestamp.toMicros()); + } else { + throw new UnsupportedOperationException( + "Unsupported timestamp vector for map shredding: " + target); + } + } + /** * For a collection (i.e., array or map) element at index 'idx', returns the starting index of * the next collection after it. diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java index a2741f869ab6..a704c815bc25 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java @@ -18,6 +18,7 @@ package org.apache.paimon.format.parquet.reader; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.columnar.ColumnVector; import org.apache.paimon.data.columnar.heap.CastedArrayColumnVector; import org.apache.paimon.data.columnar.heap.CastedMapColumnVector; @@ -36,8 +37,10 @@ import org.apache.paimon.data.columnar.heap.HeapTimestampVector; import org.apache.paimon.data.columnar.writable.WritableColumnVector; import org.apache.paimon.data.variant.VariantMetadataUtils; +import org.apache.paimon.format.parquet.MapShreddingUtils; import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.paimon.format.parquet.VariantUtils; +import org.apache.paimon.format.parquet.type.MapShreddingField; import org.apache.paimon.format.parquet.type.ParquetField; import org.apache.paimon.format.parquet.type.ParquetGroupField; import org.apache.paimon.format.parquet.type.ParquetPrimitiveField; @@ -60,6 +63,7 @@ import org.apache.parquet.io.GroupColumnIO; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.PrimitiveColumnIO; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -67,8 +71,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME; +import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType; import static org.apache.parquet.schema.Type.Repetition.REPEATED; @@ -224,19 +231,103 @@ public static ColumnVector createReadableColumnVector( public static List buildFieldsList( DataField[] readFields, MessageColumnIO columnIO, MessageType requestedFileSchema) { + return buildFieldsList(readFields, columnIO, requestedFileSchema, Collections.emptyMap()); + } + + public static List buildFieldsList( + DataField[] readFields, + MessageColumnIO columnIO, + MessageType requestedFileSchema, + Map> dynamicMapKeys) { + return buildFieldsList(readFields, columnIO, requestedFileSchema, dynamicMapKeys, ""); + } + + private static List buildFieldsList( + DataField[] readFields, + GroupColumnIO columnIO, + Type requestedFileSchema, + Map> dynamicMapKeys, + String parentPath) { List list = new ArrayList<>(); for (int i = 0; i < readFields.length; i++) { + DataField readField = readFields[i]; + String path = MapShreddingUtils.path(parentPath, readField.name()); + if (MapShreddingUtils.isMapShreddingPath(dynamicMapKeys, path) + && requestedFileSchema + .asGroupType() + .containsField(MapShreddingUtils.sidecarColumnName(path, 0))) { + list.add( + constructMapShreddingField( + readField, + columnIO, + requestedFileSchema.asGroupType(), + dynamicMapKeys.get(path), + path, + dynamicMapKeys)); + continue; + } list.add( constructField( - readFields[i], - lookupColumnByName(columnIO, readFields[i].name()), - requestedFileSchema.getType(i))); + readField, + lookupColumnByName(columnIO, readField.name()), + requestedFileSchema.asGroupType().getType(readField.name()), + path, + dynamicMapKeys)); } return list; } + private static ParquetField constructMapShreddingField( + DataField dataField, + GroupColumnIO columnIO, + GroupType requestedFileSchema, + List keys, + String path, + Map> dynamicMapKeys) { + ParquetGroupField residualMapField = + (ParquetGroupField) + constructField( + dataField, + lookupColumnByName(columnIO, dataField.name()), + requestedFileSchema.getType(dataField.name()), + path, + dynamicMapKeys); + + List sidecarFields = new ArrayList<>(); + List sidecarKeys = new ArrayList<>(); + int keyOrdinal = 0; + while (keyOrdinal < keys.size()) { + String sidecarName = MapShreddingUtils.sidecarColumnName(path, keyOrdinal); + if (!requestedFileSchema.containsField(sidecarName)) { + break; + } + MapType mapType = (MapType) dataField.type(); + sidecarFields.add( + constructField( + new DataField(0, sidecarName, mapType.getValueType()), + lookupColumnByName(columnIO, sidecarName), + requestedFileSchema.getType(sidecarName), + sidecarName, + dynamicMapKeys)); + sidecarKeys.add(BinaryString.fromString(keys.get(keyOrdinal))); + keyOrdinal++; + } + return new MapShreddingField( + dataField.type(), residualMapField, sidecarFields, sidecarKeys); + } + private static ParquetField constructField( DataField dataField, ColumnIO columnIO, Type parquetType) { + return constructField( + dataField, columnIO, parquetType, dataField.name(), Collections.emptyMap()); + } + + private static ParquetField constructField( + DataField dataField, + ColumnIO columnIO, + Type parquetType, + String path, + Map> dynamicMapKeys) { boolean required = columnIO.getType().getRepetition() == REQUIRED; int repetitionLevel = columnIO.getRepetitionLevel(); int definitionLevel = columnIO.getDefinitionLevel(); @@ -249,24 +340,20 @@ private static ParquetField constructField( return constructVariantField(dataField, columnIO, parquetType); } - ImmutableList.Builder fieldsBuilder = ImmutableList.builder(); - List fieldNames = rowType.getFieldNames(); - List children = rowType.getFields(); - for (int i = 0; i < children.size(); i++) { - String childName = fieldNames.get(i); - fieldsBuilder.add( - constructField( - children.get(i), - lookupColumnByName(groupColumnIO, childName), - parquetType.asGroupType().getType(childName))); - } + List fields = + buildFieldsList( + rowType.getFields().toArray(new DataField[0]), + groupColumnIO, + parquetType, + dynamicMapKeys, + path); return new ParquetGroupField( type, repetitionLevel, definitionLevel, required, - fieldsBuilder.build(), + fields, groupColumnIO.getFieldPath()); } @@ -283,12 +370,16 @@ private static ParquetField constructField( constructField( new DataField(0, "", mapType.getKeyType()), keyValueColumnIO.getChild(0), - keyValueType.getKey()); + keyValueType.getKey(), + path + "." + MAP_KEY_NAME, + dynamicMapKeys); ParquetField valueField = constructField( new DataField(0, "", mapType.getValueType()), keyValueColumnIO.getChild(1), - keyValueType.getValue()); + keyValueType.getValue(), + path + "." + MAP_VALUE_NAME, + dynamicMapKeys); return new ParquetGroupField( type, repetitionLevel, @@ -307,12 +398,16 @@ private static ParquetField constructField( constructField( new DataField(0, "", multisetType.getElementType()), keyValueColumnIO.getChild(0), - keyValueType.getKey()); + keyValueType.getKey(), + path + "." + MAP_KEY_NAME, + dynamicMapKeys); ParquetField valueField = constructField( new DataField(0, "", new IntType()), keyValueColumnIO.getChild(1), - keyValueType.getValue()); + keyValueType.getValue(), + path + "." + MAP_VALUE_NAME, + dynamicMapKeys); return new ParquetGroupField( type, repetitionLevel, @@ -349,7 +444,9 @@ private static ParquetField constructField( constructField( new DataField(0, "", arrayType.getElementType()), getArrayElementColumn(elementTypeColumnIO), - parquetListElementType(parquetType.asGroupType())); + parquetListElementType(parquetType.asGroupType()), + path + "." + ParquetSchemaConverter.LIST_ELEMENT_NAME, + dynamicMapKeys); if (repetitionLevel == field.getRepetitionLevel()) { repetitionLevel = columnIO.getParent().getRepetitionLevel(); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/MapShreddingField.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/MapShreddingField.java new file mode 100644 index 000000000000..474dcdd0d1e0 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/MapShreddingField.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet.type; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.types.DataType; + +import java.util.List; + +/** + * Parquet field for a logical map shredding assembled from residual map and sidecar value columns. + */ +public class MapShreddingField extends ParquetGroupField { + + private final ParquetGroupField residualMapField; + private final List sidecarFields; + private final List sidecarKeys; + + public MapShreddingField( + DataType type, + ParquetGroupField residualMapField, + List sidecarFields, + List sidecarKeys) { + super( + type, + residualMapField.getRepetitionLevel(), + residualMapField.getDefinitionLevel(), + residualMapField.isRequired(), + residualMapField.getChildren(), + residualMapField.path()); + this.residualMapField = residualMapField; + this.sidecarFields = sidecarFields; + this.sidecarKeys = sidecarKeys; + } + + public ParquetGroupField residualMapField() { + return residualMapField; + } + + public List sidecarFields() { + return sidecarFields; + } + + public List sidecarKeys() { + return sidecarKeys; + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java index 14970e548e9b..6cee8fd1bb46 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.parquet.writer; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.parquet.MapShreddingUtils; import org.apache.paimon.format.parquet.VariantUtils; import org.apache.paimon.types.RowType; @@ -31,7 +32,10 @@ import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType; @@ -40,13 +44,23 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder { private final RowType rowType; + private final Map> dynamicMapKeys; @Nullable private final RowType shreddingSchemas; public ParquetRowDataBuilder( OutputFile path, RowType rowType, @Nullable RowType shreddingSchemas) { + this(path, rowType, shreddingSchemas, Collections.emptyMap()); + } + + public ParquetRowDataBuilder( + OutputFile path, + RowType rowType, + @Nullable RowType shreddingSchemas, + Map> dynamicMapKeys) { super(path); this.rowType = rowType; this.shreddingSchemas = shreddingSchemas; + this.dynamicMapKeys = dynamicMapKeys; } @Override @@ -70,19 +84,27 @@ private ParquetWriteSupport(Configuration conf) { this.conf = conf; this.schema = convertToParquetMessageType( - VariantUtils.replaceWithShreddingType(rowType, shreddingSchemas)); + VariantUtils.replaceWithShreddingType(rowType, shreddingSchemas), + dynamicMapKeys); } @Override public WriteContext init(Configuration configuration) { - return new WriteContext(schema, new HashMap<>()); + Map metadata = new HashMap<>(); + metadata.putAll(MapShreddingUtils.toFooterMetadata(dynamicMapKeys)); + return new WriteContext(schema, metadata); } @Override public void prepareForWrite(RecordConsumer recordConsumer) { this.writer = new ParquetRowDataWriter( - recordConsumer, rowType, schema, conf, shreddingSchemas); + recordConsumer, + rowType, + schema, + conf, + shreddingSchemas, + dynamicMapKeys); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java index 80b788733342..273a42c3343f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java @@ -29,6 +29,7 @@ import org.apache.paimon.data.variant.PaimonShreddingUtils; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.data.variant.VariantSchema; +import org.apache.paimon.format.parquet.MapShreddingUtils; import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; @@ -53,7 +54,11 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision; @@ -71,6 +76,7 @@ public class ParquetRowDataWriter { private final RowWriter rowWriter; private final RecordConsumer recordConsumer; @Nullable private final RowType shreddingSchemas; + private final Map> dynamicMapKeys; public ParquetRowDataWriter( RecordConsumer recordConsumer, @@ -78,10 +84,21 @@ public ParquetRowDataWriter( GroupType schema, Configuration conf, @Nullable RowType shreddingSchemas) { + this(recordConsumer, rowType, schema, conf, shreddingSchemas, Collections.emptyMap()); + } + + public ParquetRowDataWriter( + RecordConsumer recordConsumer, + RowType rowType, + GroupType schema, + Configuration conf, + @Nullable RowType shreddingSchemas, + Map> dynamicMapKeys) { this.conf = conf; this.recordConsumer = recordConsumer; this.shreddingSchemas = shreddingSchemas; - this.rowWriter = new RowWriter(rowType, schema); + this.dynamicMapKeys = dynamicMapKeys; + this.rowWriter = new RowWriter(rowType, schema, dynamicMapKeys, ""); } /** @@ -96,6 +113,10 @@ public void write(final InternalRow record) { } private FieldWriter createWriter(DataType t, Type type) { + return createWriter(t, type, type.getName()); + } + + private FieldWriter createWriter(DataType t, Type type, String path) { if (type.isPrimitive()) { switch (t.getTypeRoot()) { case CHAR: @@ -144,13 +165,13 @@ private FieldWriter createWriter(DataType t, Type type) { } else if (t instanceof MapType && annotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { return new MapWriter( - ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType); + ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType, null); } else if (t instanceof MultisetType && annotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { return new MapWriter( - ((MultisetType) t).getElementType(), new IntType(false), groupType); + ((MultisetType) t).getElementType(), new IntType(false), groupType, null); } else if (t instanceof RowType && type instanceof GroupType) { - return new RowWriter((RowType) t, groupType); + return new RowWriter((RowType) t, groupType, dynamicMapKeys, path); } else if (t instanceof VariantType && type instanceof GroupType) { RowType shreddingSchema = shreddingSchemas != null && shreddingSchemas.containsField(type.getName()) @@ -442,7 +463,14 @@ private class MapWriter implements FieldWriter { private final FieldWriter keyWriter; private final FieldWriter valueWriter; - private MapWriter(DataType keyType, DataType valueType, GroupType groupType) { + @Nullable private final Set residualKeysToSkip; + + private MapWriter( + DataType keyType, + DataType valueType, + GroupType groupType, + @Nullable Set residualKeysToSkip) { + this.residualKeysToSkip = residualKeysToSkip; // Get the internal map structure (MAP_KEY_VALUE) GroupType repeatedType = groupType.getType(0).asGroupType(); this.repeatedGroupName = repeatedType.getName(); @@ -471,12 +499,15 @@ public void write(InternalArray arrayData, int ordinal) { private void writeMapData(InternalMap mapData) { recordConsumer.startGroup(); - if (mapData != null && mapData.size() > 0) { + if (mapData != null && hasResidualEntry(mapData)) { recordConsumer.startField(repeatedGroupName, 0); InternalArray keyArray = mapData.keyArray(); InternalArray valueArray = mapData.valueArray(); for (int i = 0; i < keyArray.size(); i++) { + if (shouldSkipResidualEntry(keyArray, valueArray, i)) { + continue; + } recordConsumer.startGroup(); if (!keyArray.isNullAt(i)) { // write key element @@ -503,6 +534,25 @@ private void writeMapData(InternalMap mapData) { } recordConsumer.endGroup(); } + + private boolean hasResidualEntry(InternalMap mapData) { + InternalArray keyArray = mapData.keyArray(); + InternalArray valueArray = mapData.valueArray(); + for (int i = 0; i < keyArray.size(); i++) { + if (!shouldSkipResidualEntry(keyArray, valueArray, i)) { + return true; + } + } + return false; + } + + private boolean shouldSkipResidualEntry( + InternalArray keyArray, InternalArray valueArray, int ordinal) { + return residualKeysToSkip != null + && !keyArray.isNullAt(ordinal) + && !valueArray.isNullAt(ordinal) + && residualKeysToSkip.contains(keyArray.getString(ordinal)); + } } /** It writes an array type field to parquet. */ @@ -557,38 +607,87 @@ private void writeArrayData(InternalArray arrayData) { /** It writes a row type field to parquet. */ private class RowWriter implements FieldWriter { - private final FieldWriter[] fieldWriters; - private final String[] fieldNames; - private final boolean[] isNullable; + private final RowFieldWriter[] rowFieldWriters; + private final int fieldCount; public RowWriter(RowType rowType, GroupType groupType) { - this.fieldNames = rowType.getFieldNames().toArray(new String[0]); + this(rowType, groupType, null, ""); + } + + public RowWriter( + RowType rowType, + GroupType groupType, + @Nullable Map> mapShreddingKeys, + String parentPath) { + this.fieldCount = rowType.getFieldCount(); List fieldTypes = rowType.getFieldTypes(); - this.fieldWriters = new FieldWriter[rowType.getFieldCount()]; - this.isNullable = new boolean[rowType.getFieldCount()]; - for (int i = 0; i < fieldWriters.length; i++) { - fieldWriters[i] = createWriter(fieldTypes.get(i), groupType.getType(i)); - isNullable[i] = fieldTypes.get(i).isNullable(); + List writers = new java.util.ArrayList<>(); + int physicalOrdinal = 0; + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = rowType.getFieldNames().get(i); + String path = MapShreddingUtils.path(parentPath, fieldName); + DataType fieldType = fieldTypes.get(i); + Type parquetType = groupType.getType(physicalOrdinal); + if (mapShreddingKeys != null + && MapShreddingUtils.isMapShreddingPath(mapShreddingKeys, path)) { + List keys = mapShreddingKeys.get(path); + Set keysToSkip = new HashSet<>(); + for (String key : keys) { + keysToSkip.add(BinaryString.fromString(key)); + } + MapType mapType = (MapType) fieldType; + writers.add( + new RowFieldWriter( + fieldName, + i, + fieldType.isNullable(), + new MapWriter( + mapType.getKeyType(), + mapType.getValueType(), + parquetType.asGroupType(), + keysToSkip))); + physicalOrdinal++; + for (int keyOrdinal = 0; keyOrdinal < keys.size(); keyOrdinal++) { + Type sidecarType = groupType.getType(physicalOrdinal); + writers.add( + new RowFieldWriter( + sidecarType.getName(), + i, + true, + new MapSidecarWriter( + keys.get(keyOrdinal), + createWriter( + mapType.getValueType(), sidecarType)))); + physicalOrdinal++; + } + } else { + writers.add( + new RowFieldWriter( + fieldName, + i, + fieldType.isNullable(), + createWriter(fieldType, parquetType, path))); + physicalOrdinal++; + } } + this.rowFieldWriters = writers.toArray(new RowFieldWriter[0]); } public void write(InternalRow row) { - for (int i = 0; i < fieldWriters.length; i++) { - if (!row.isNullAt(i)) { - String fieldName = fieldNames[i]; - FieldWriter writer = fieldWriters[i]; - - recordConsumer.startField(fieldName, i); - writer.write(row, i); - recordConsumer.endField(fieldName, i); + for (int i = 0; i < rowFieldWriters.length; i++) { + RowFieldWriter rowFieldWriter = rowFieldWriters[i]; + if (rowFieldWriter.shouldWrite(row)) { + recordConsumer.startField(rowFieldWriter.fieldName, i); + rowFieldWriter.write(row); + recordConsumer.endField(rowFieldWriter.fieldName, i); } else { - if (!isNullable[i]) { + if (!rowFieldWriter.isNullable) { throw new IllegalArgumentException( String.format( "Field '%s' expected not null but found null value. A possible cause is that the " + "table used %s or %s merge-engine and the aggregate function produced " + "null value when retracting.", - fieldNames[i], + rowFieldWriter.fieldName, CoreOptions.MergeEngine.PARTIAL_UPDATE, CoreOptions.MergeEngine.AGGREGATE)); } @@ -599,7 +698,7 @@ public void write(InternalRow row) { @Override public void write(InternalRow row, int ordinal) { recordConsumer.startGroup(); - InternalRow rowData = row.getRow(ordinal, fieldWriters.length); + InternalRow rowData = row.getRow(ordinal, fieldCount); write(rowData); recordConsumer.endGroup(); } @@ -607,12 +706,81 @@ public void write(InternalRow row, int ordinal) { @Override public void write(InternalArray arrayData, int ordinal) { recordConsumer.startGroup(); - InternalRow rowData = arrayData.getRow(ordinal, fieldWriters.length); + InternalRow rowData = arrayData.getRow(ordinal, fieldCount); write(rowData); recordConsumer.endGroup(); } } + private class RowFieldWriter { + + private final String fieldName; + private final int logicalOrdinal; + private final boolean isNullable; + private final FieldWriter writer; + + private RowFieldWriter( + String fieldName, int logicalOrdinal, boolean isNullable, FieldWriter writer) { + this.fieldName = fieldName; + this.logicalOrdinal = logicalOrdinal; + this.isNullable = isNullable; + this.writer = writer; + } + + private boolean shouldWrite(InternalRow row) { + if (writer instanceof MapSidecarWriter) { + return !row.isNullAt(logicalOrdinal) + && ((MapSidecarWriter) writer).findValueIndex(row.getMap(logicalOrdinal)) + >= 0; + } + return !row.isNullAt(logicalOrdinal); + } + + private void write(InternalRow row) { + writer.write(row, logicalOrdinal); + } + } + + private class MapSidecarWriter implements FieldWriter { + + private final BinaryString key; + private final FieldWriter valueWriter; + private int valueIndex = -1; + + private MapSidecarWriter(String key, FieldWriter valueWriter) { + this.key = BinaryString.fromString(key); + this.valueWriter = valueWriter; + } + + @Override + public void write(InternalRow row, int ordinal) { + InternalMap map = row.getMap(ordinal); + int index = valueIndex >= 0 ? valueIndex : findValueIndex(map); + valueIndex = -1; + valueWriter.write(map.valueArray(), index); + } + + @Override + public void write(InternalArray arrayData, int ordinal) { + throw new UnsupportedOperationException("Map sidecar writer only supports rows."); + } + + private int findValueIndex(InternalMap map) { + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + for (int i = 0; i < keyArray.size(); i++) { + if (!keyArray.isNullAt(i) + && !valueArray.isNullAt(i) + && key.equals(keyArray.getString(i))) { + valueIndex = i; + return i; + } + } + valueIndex = -1; + return -1; + } + } + private class VariantWriter implements FieldWriter { @Nullable private final VariantSchema variantSchema; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java index 1a66a55130e1..b218e4e01252 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java @@ -19,8 +19,12 @@ package org.apache.paimon.format.parquet.writer; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatWriter; import org.apache.paimon.format.parquet.ColumnConfigParser; +import org.apache.paimon.format.parquet.MapShreddingKeyExtractor; +import org.apache.paimon.format.parquet.MapShreddingUtils; import org.apache.paimon.format.parquet.VariantUtils; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; import org.apache.paimon.types.RowType; @@ -34,17 +38,26 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** A {@link ParquetBuilder} for {@link InternalRow}. */ public class RowDataParquetBuilder implements ParquetBuilder { private final RowType rowType; private final Configuration conf; + private final Options mapShreddingOptions; @Nullable private RowType shreddingSchemas; public RowDataParquetBuilder(RowType rowType, Options options) { + this(rowType, options, options); + } + + public RowDataParquetBuilder(RowType rowType, Options options, Options mapShreddingOptions) { this.rowType = rowType; this.conf = new Configuration(false); + this.mapShreddingOptions = mapShreddingOptions; this.shreddingSchemas = VariantUtils.shreddingSchemasFromOptions(options); options.toMap().forEach(conf::set); } @@ -57,8 +70,23 @@ public RowDataParquetBuilder withShreddingSchemas(RowType shreddingSchemas) { @Override public ParquetWriter createWriter(OutputFile out, String compression) throws IOException { + return createWriter(out, compression, Collections.emptyMap()); + } + + public FormatWriter createFormatWriter(PositionOutputStream stream, String compression) + throws IOException { + if (MapShreddingUtils.isMapShreddingEnabled(mapShreddingOptions)) { + return new MapShreddingFormatWriter(stream, compression); + } + OutputFile out = new StreamOutputFile(stream); + return new ParquetBulkWriter(createWriter(out, compression)); + } + + private ParquetWriter createWriter( + OutputFile out, String compression, Map> dynamicMapKeys) + throws IOException { ParquetRowDataBuilder builder = - new ParquetRowDataBuilder(out, rowType, shreddingSchemas) + new ParquetRowDataBuilder(out, rowType, shreddingSchemas, dynamicMapKeys) .withConf(conf) .withCompressionCodec( CompressionCodecName.fromConf(getCompression(compression))) @@ -121,4 +149,64 @@ public ParquetWriter createWriter(OutputFile out, String compressio public String getCompression(String compression) { return conf.get("parquet.compression", compression); } + + private class MapShreddingFormatWriter implements FormatWriter { + + private final PositionOutputStream stream; + private final String compression; + private final MapShreddingKeyExtractor extractor; + private FormatWriter delegate; + + private MapShreddingFormatWriter(PositionOutputStream stream, String compression) { + this.stream = stream; + this.compression = compression; + this.extractor = new MapShreddingKeyExtractor(rowType, mapShreddingOptions); + } + + @Override + public void addElement(InternalRow row) throws IOException { + if (delegate != null) { + delegate.addElement(row); + return; + } + + if (!extractor.finished()) { + extractor.add(row); + } + if (extractor.finished()) { + initializeDelegate(); + } + } + + @Override + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException { + if (delegate != null) { + return delegate.reachTargetSize(suggestedCheck, targetSize); + } + return suggestedCheck && stream.getPos() >= targetSize; + } + + @Override + public void close() throws IOException { + if (delegate == null) { + initializeDelegate(); + } + delegate.close(); + } + + @Nullable + @Override + public Object writerMetadata() { + return delegate == null ? null : delegate.writerMetadata(); + } + + private void initializeDelegate() throws IOException { + Map> dynamicKeys = extractor.finish(); + OutputFile out = new StreamOutputFile(stream); + delegate = new ParquetBulkWriter(createWriter(out, compression, dynamicKeys)); + for (InternalRow bufferedRow : extractor.bufferedRows()) { + delegate.addElement(bufferedRow); + } + } + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/MapShreddingStorageBenchmark.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/MapShreddingStorageBenchmark.java new file mode 100644 index 000000000000..108523887bc8 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/MapShreddingStorageBenchmark.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Benchmark for map shredding storage size. */ +public class MapShreddingStorageBenchmark { + + private static final String MAP_FIELD = "headers"; + private static final String MAP_SHREDDING_COLUMNS = "map.shredding.columns"; + private static final String MAP_SHREDDING_MAX_KEYS = "map.shredding.maxKeys"; + private static final String MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY = + "map.shredding.maxInferBufferMemory"; + private static final String MAP_SHREDDING_MAX_INFER_BUFFER_ROW = + "map.shredding.maxInferBufferRow"; + private static final int DEFAULT_ROW_COUNT = 100_000; + private static final int DEFAULT_HOT_KEY_COUNT = 32; + private static final int DEFAULT_VALUE_LENGTH = 16; + private static final int DEFAULT_KEY_PADDING_LENGTH = 128; + private static final int DEFAULT_VALUE_RUN_LENGTH = 128; + private static final int DEFAULT_VALUE_CARDINALITY = 4; + private static final VarCharType STRING = new VarCharType(VarCharType.MAX_LENGTH); + private static final RowType ROW_TYPE = + RowType.builder() + .field("id", new IntType()) + .field(MAP_FIELD, new MapType(STRING, STRING)) + .build(); + + @TempDir File folder; + + @Test + public void benchmarkLongHotKeyStorageSize() throws Exception { + int rowCount = intProperty("paimon.benchmark.map-shredding.rows", DEFAULT_ROW_COUNT); + int hotKeyCount = + intProperty("paimon.benchmark.map-shredding.hot-keys", DEFAULT_HOT_KEY_COUNT); + int valueLength = + intProperty("paimon.benchmark.map-shredding.value-length", DEFAULT_VALUE_LENGTH); + int keyPaddingLength = + intProperty( + "paimon.benchmark.map-shredding.key-padding-length", + DEFAULT_KEY_PADDING_LENGTH); + + assertStorageSaving( + "Map shredding long header key storage benchmark", + Scenario.LONG_HOT_KEYS, + rowCount, + hotKeyCount, + valueLength, + keyPaddingLength, + 1, + 1, + false); + } + + @Test + public void benchmarkColumnarValueStorageSize() throws Exception { + int rowCount = intProperty("paimon.benchmark.map-shredding.rows", DEFAULT_ROW_COUNT); + int hotKeyCount = + intProperty("paimon.benchmark.map-shredding.hot-keys", DEFAULT_HOT_KEY_COUNT); + int valueLength = + intProperty("paimon.benchmark.map-shredding.value-length", DEFAULT_VALUE_LENGTH); + int valueRunLength = + intProperty( + "paimon.benchmark.map-shredding.value-run-length", + DEFAULT_VALUE_RUN_LENGTH); + int valueCardinality = + intProperty( + "paimon.benchmark.map-shredding.value-cardinality", + DEFAULT_VALUE_CARDINALITY); + + assertStorageSaving( + "Map shredding columnar header value storage benchmark", + Scenario.COLUMNAR_VALUES, + rowCount, + hotKeyCount, + valueLength, + 0, + valueRunLength, + valueCardinality, + true); + } + + private int intProperty(String key, int defaultValue) { + String value = System.getProperties().getProperty(key); + return value == null ? defaultValue : Integer.parseInt(value); + } + + private void assertStorageSaving( + String benchmarkName, + Scenario scenario, + int rowCount, + int hotKeyCount, + int valueLength, + int keyPaddingLength, + int valueRunLength, + int valueCardinality, + boolean dictionaryEnabled) + throws Exception { + long regularSize = + writeFile( + false, + scenario, + rowCount, + hotKeyCount, + valueLength, + keyPaddingLength, + valueRunLength, + valueCardinality, + dictionaryEnabled); + long mapShreddingSize = + writeFile( + true, + scenario, + rowCount, + hotKeyCount, + valueLength, + keyPaddingLength, + valueRunLength, + valueCardinality, + dictionaryEnabled); + long savedBytes = regularSize - mapShreddingSize; + double savedPercent = savedBytes * 100.0D / regularSize; + + System.out.printf( + Locale.ROOT, + "%s: rows=%d, hotKeys=%d, valueLength=%d, keyPaddingLength=%d," + + " valueRunLength=%d, valueCardinality=%d, dictionary=%s," + + " regular=%d bytes, mapShredding=%d bytes, saved=%d bytes (%.2f%%)%n", + benchmarkName, + rowCount, + hotKeyCount, + valueLength, + keyPaddingLength, + valueRunLength, + valueCardinality, + dictionaryEnabled, + regularSize, + mapShreddingSize, + savedBytes, + savedPercent); + + assertThat(mapShreddingSize).isLessThan(regularSize); + } + + private Options options(boolean mapShredding, int hotKeyCount, boolean dictionaryEnabled) { + Options options = new Options(); + options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET); + options.setInteger("parquet.block.size", 256 * 1024 * 1024); + options.set("parquet.enable.dictionary", Boolean.toString(dictionaryEnabled)); + if (mapShredding) { + options.set(MAP_SHREDDING_COLUMNS, MAP_FIELD); + options.setInteger(MAP_SHREDDING_MAX_KEYS, hotKeyCount); + options.setInteger(MAP_SHREDDING_MAX_INFER_BUFFER_ROW, 10_000); + options.set(MAP_SHREDDING_MAX_INFER_BUFFER_MEMORY, "64 mb"); + } + return options; + } + + private long writeFile( + boolean mapShredding, + Scenario scenario, + int rowCount, + int hotKeyCount, + int valueLength, + int keyPaddingLength, + int valueRunLength, + int valueCardinality, + boolean dictionaryEnabled) + throws Exception { + Options options = options(mapShredding, hotKeyCount, dictionaryEnabled); + Path path = + new Path( + folder.getPath(), + (mapShredding ? "map-shredding-" : "regular-") + UUID.randomUUID()); + ParquetWriterFactory factory = + new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, options)); + LocalFileIO fileIO = new LocalFileIO(); + try (FormatWriter writer = factory.create(fileIO.newOutputStream(path, false), "snappy")) { + for (int rowId = 0; rowId < rowCount; rowId++) { + writer.addElement( + row( + scenario, + rowId, + hotKeyCount, + valueLength, + keyPaddingLength, + valueRunLength, + valueCardinality)); + } + } + return fileIO.getFileSize(path); + } + + private InternalRow row( + Scenario scenario, + int rowId, + int hotKeyCount, + int valueLength, + int keyPaddingLength, + int valueRunLength, + int valueCardinality) { + GenericRow row = new GenericRow(2); + row.setField(0, rowId); + row.setField( + 1, + map( + scenario, + rowId, + hotKeyCount, + valueLength, + keyPaddingLength, + valueRunLength, + valueCardinality)); + return row; + } + + private GenericMap map( + Scenario scenario, + int rowId, + int hotKeyCount, + int valueLength, + int keyPaddingLength, + int valueRunLength, + int valueCardinality) { + Map map = new LinkedHashMap<>(); + for (int keyId = 0; keyId < hotKeyCount; keyId++) { + map.put( + hotKey(keyId, keyPaddingLength), + value(scenario, rowId, keyId, valueLength, valueRunLength, valueCardinality)); + } + return new GenericMap(map); + } + + private BinaryString hotKey(int keyId, int keyPaddingLength) { + String prefix = String.format(Locale.ROOT, "very_common_header_key_%03d_", keyId); + StringBuilder builder = new StringBuilder(prefix); + while (builder.length() < prefix.length() + keyPaddingLength) { + builder.append("0123456789abcdef"); + } + int keyLength = prefix.length() + keyPaddingLength; + return BinaryString.fromString(builder.substring(0, Math.min(builder.length(), keyLength))); + } + + private BinaryString value( + Scenario scenario, + int rowId, + int keyId, + int valueLength, + int valueRunLength, + int valueCardinality) { + int valueId = + scenario == Scenario.COLUMNAR_VALUES + ? rowId / valueRunLength % valueCardinality + : rowId; + String prefix = String.format(Locale.ROOT, "v_%03d_%03d_", keyId, valueId); + StringBuilder builder = new StringBuilder(valueLength); + while (builder.length() < valueLength) { + builder.append(prefix); + } + return BinaryString.fromString(builder.substring(0, valueLength)); + } + + private enum Scenario { + LONG_HOT_KEYS, + COLUMNAR_VALUES + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java index 0f71fbbcd719..b9276498adff 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java @@ -18,13 +18,22 @@ package org.apache.paimon.format.parquet; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericMap; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory; import org.apache.paimon.format.FormatReadWriteTest; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -34,12 +43,19 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.data.serializer.InternalMapSerializer.convertToJavaMap; +import static org.assertj.core.api.Assertions.assertThat; + /** A parquet {@link FormatReadWriteTest}. */ public class ParquetFormatReadWriteTest extends FormatReadWriteTest { @@ -89,4 +105,250 @@ public void testEnableBloomFilter(boolean enabled) throws Exception { } } } + + @Test + public void testMapShreddingRoundTrip() throws Exception { + Options options = new Options(); + options.set("map.shredding.columns", "labels"); + options.set("map.shredding.maxKeys", "2"); + options.set("map.shredding.maxInferBufferRow", "10"); + ParquetFileFormat format = + new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "labels", + DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.STRING()))); + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.addElement(GenericRow.of(labels("job", "api", "namespace", "prod", "region", "us"))); + writer.close(); + out.close(); + + try (ParquetFileReader reader = + ParquetUtil.getParquetReader( + fileIO, file, fileIO.getFileSize(file), new Options())) { + assertThat(reader.getFooter().getFileMetaData().getSchema().containsField("labels")) + .isTrue(); + assertThat( + reader.getFooter() + .getFileMetaData() + .getSchema() + .containsField("dynamic_column_labels_value_0")) + .isTrue(); + assertThat( + reader.getFooter() + .getFileMetaData() + .getKeyValueMetaData() + .get("parquet.meta.dynamic.column.map.keys.of.labels") + .split(",")) + .containsExactlyInAnyOrder("job", "namespace"); + } + + InternalMap logicalMap = readOne(format, rowType).getMap(0); + assertThat(toStringMap(logicalMap)) + .containsExactlyInAnyOrderEntriesOf( + stringMap("job", "api", "namespace", "prod", "region", "us")); + } + + @Test + public void testMapShreddingNonStringValueRoundTrip() throws Exception { + Options options = new Options(); + options.set("map.shredding.columns", "headers"); + options.set("map.shredding.maxKeys", "2"); + options.set("map.shredding.maxInferBufferRow", "10"); + ParquetFileFormat format = + new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "headers", + DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.INT()))); + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.addElement(GenericRow.of(intLabels("status", 200, "retry", 3, "region", 1))); + writer.close(); + out.close(); + + InternalMap logicalMap = readOne(format, rowType).getMap(0); + assertThat(toIntMap(logicalMap)) + .containsExactlyInAnyOrderEntriesOf(intMap("status", 200, "retry", 3, "region", 1)); + } + + @Test + public void testMapShreddingVariantValueRoundTrip() throws Exception { + Options options = new Options(); + options.set("map.shredding.columns", "params"); + options.set("map.shredding.maxKeys", "2"); + options.set("map.shredding.maxInferBufferRow", "10"); + ParquetFileFormat format = + new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "params", + DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.VARIANT()))); + + GenericVariant user = GenericVariant.fromJson("{\"id\":1,\"name\":\"alice\"}"); + GenericVariant request = GenericVariant.fromJson("{\"id\":\"r1\"}"); + GenericVariant extra = GenericVariant.fromJson("{\"debug\":true}"); + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.addElement( + GenericRow.of(variantLabels("user", user, "request", request, "extra", extra))); + writer.close(); + out.close(); + + InternalMap logicalMap = readOne(format, rowType).getMap(0); + assertThat(toVariantMap(logicalMap)) + .containsExactlyInAnyOrderEntriesOf( + variantMap("user", user, "request", request, "extra", extra)); + } + + @Test + public void testNestedMapShreddingRoundTrip() throws Exception { + Options options = new Options(); + options.set("map.shredding.columns", "payload.labels"); + options.set("map.shredding.maxKeys", "2"); + options.set("map.shredding.maxInferBufferRow", "10"); + ParquetFileFormat format = + new ParquetFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + + RowType payloadType = + DataTypes.ROW( + DataTypes.FIELD( + 0, + "labels", + DataTypes.MAP(DataTypes.STRING().notNull(), DataTypes.STRING()))); + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "payload", payloadType)); + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + writer.addElement( + GenericRow.of( + GenericRow.of(labels("job", "api", "namespace", "prod", "region", "us")))); + writer.close(); + out.close(); + + try (ParquetFileReader reader = + ParquetUtil.getParquetReader( + fileIO, file, fileIO.getFileSize(file), new Options())) { + assertThat( + reader.getFooter() + .getFileMetaData() + .getSchema() + .getType("payload") + .asGroupType() + .containsField("dynamic_column_payload_labels_value_0")) + .isTrue(); + assertThat( + reader.getFooter() + .getFileMetaData() + .getKeyValueMetaData() + .get("parquet.meta.dynamic.column.map.keys.of.payload.labels") + .split(",")) + .containsExactlyInAnyOrder("job", "namespace"); + } + + InternalMap logicalMap = readOne(format, rowType).getRow(0, 1).getMap(0); + assertThat(toStringMap(logicalMap)) + .containsExactlyInAnyOrderEntriesOf( + stringMap("job", "api", "namespace", "prod", "region", "us")); + } + + private InternalRow readOne(FileFormat format, RowType rowType) throws Exception { + RecordReader reader = + format.createReaderFactory(rowType, rowType, new ArrayList<>()) + .createReader( + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file))); + InternalRowSerializer serializer = new InternalRowSerializer(rowType); + List result = new ArrayList<>(); + reader.forEachRemaining(row -> result.add(serializer.copy(row))); + assertThat(result).hasSize(1); + return result.get(0); + } + + private static GenericMap labels(String... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(BinaryString.fromString(kvs[i]), BinaryString.fromString(kvs[i + 1])); + } + return new GenericMap(map); + } + + private static GenericMap intLabels(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(BinaryString.fromString((String) kvs[i]), (Integer) kvs[i + 1]); + } + return new GenericMap(map); + } + + private static GenericMap variantLabels(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(BinaryString.fromString((String) kvs[i]), (GenericVariant) kvs[i + 1]); + } + return new GenericMap(map); + } + + private static Map toStringMap(InternalMap map) { + Map raw = convertToJavaMap(map, DataTypes.STRING(), DataTypes.STRING()); + Map result = new LinkedHashMap<>(); + for (Map.Entry entry : raw.entrySet()) { + result.put(entry.getKey().toString(), entry.getValue().toString()); + } + return result; + } + + private static Map toIntMap(InternalMap map) { + Map raw = convertToJavaMap(map, DataTypes.STRING(), DataTypes.INT()); + Map result = new LinkedHashMap<>(); + for (Map.Entry entry : raw.entrySet()) { + result.put(entry.getKey().toString(), (Integer) entry.getValue()); + } + return result; + } + + private static Map toVariantMap(InternalMap map) { + Map raw = convertToJavaMap(map, DataTypes.STRING(), DataTypes.VARIANT()); + Map result = new LinkedHashMap<>(); + for (Map.Entry entry : raw.entrySet()) { + result.put(entry.getKey().toString(), (Variant) entry.getValue()); + } + return result; + } + + private static Map stringMap(String... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put(kvs[i], kvs[i + 1]); + } + return map; + } + + private static Map intMap(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put((String) kvs[i], (Integer) kvs[i + 1]); + } + return map; + } + + private static Map variantMap(Object... kvs) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < kvs.length; i += 2) { + map.put((String) kvs[i], (Variant) kvs[i + 1]); + } + return map; + } }