From 842dfb777ea86fb9b66c857d746ac343f04a0cb8 Mon Sep 17 00:00:00 2001 From: maxim_konstantinov Date: Sun, 20 Oct 2024 19:55:39 -0700 Subject: [PATCH] add a draft of column renaming feature to ParquetRewriter --- .../hadoop/rewrite/ParquetRewriter.java | 104 ++++++++-- .../hadoop/rewrite/RewriteOptions.java | 40 +++- .../hadoop/rewrite/ParquetRewriterTest.java | 184 +++++++++++++----- 3 files changed, 264 insertions(+), 64 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 2ff9c0ea34..8fb4794428 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -109,7 +109,7 @@ * Please note the schema of all inputFiles must be the same, otherwise the rewrite will fail. *

*

Applying column transformations

- * Some supported column transformations: pruning, masking, encrypting, changing a codec. + * Some supported column transformations: pruning, masking, renaming, encrypting, changing a codec. * See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full list with description. *

*

Joining with extra files with a different schema

@@ -145,15 +145,18 @@ public class ParquetRewriter implements Closeable { private final Queue inputFiles = new LinkedList<>(); private final Queue inputFilesToJoin = new LinkedList<>(); private final MessageType outSchema; + private final MessageType outSchemaWithRenamedColumns; // The index cache strategy private final IndexCache.CacheStrategy indexCacheStrategy; private final boolean overwriteInputWithJoinColumns; private final InternalFileEncryptor nullColumnEncryptor; + private final Map renamedColumns; public ParquetRewriter(RewriteOptions options) throws IOException { this.newCodecName = options.getNewCodecName(); this.indexCacheStrategy = options.getIndexCacheStrategy(); this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); + this.renamedColumns = options.gerRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); OutputFile out = options.getParquetOutputFile(); inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); @@ -169,6 +172,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { out); this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); + // TODO check a requirement that all renamed column should be present in outSchema + this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema); this.extraMetaData = getExtraMetadata(options); if (options.getMaskColumns() != null) { @@ -186,7 +191,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter( out, - outSchema, + outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema, writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, @@ -222,6 +227,7 @@ public ParquetRewriter( MaskMode maskMode) { this.writer = writer; this.outSchema = outSchema; + this.outSchemaWithRenamedColumns = null; this.newCodecName = codecName; extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); extraMetaData.put( @@ -239,6 +245,7 @@ public ParquetRewriter( this.indexCacheStrategy = IndexCache.CacheStrategy.NONE; this.overwriteInputWithJoinColumns = false; this.nullColumnEncryptor = null; + this.renamedColumns = new HashMap<>(); } private MessageType getSchema() { @@ -266,6 +273,27 @@ private MessageType getSchema() { } } + private MessageType getSchemaWithRenamedColumns(MessageType schema) { + List fields = schema.getFields().stream() + .map(type -> { + if (renamedColumns == null || !renamedColumns.containsKey(type.getName())) { + return type; + } else if (type.isPrimitive()) { + return new PrimitiveType( + type.getRepetition(), + type.asPrimitiveType().getPrimitiveTypeName(), + renamedColumns.get(type.getName())); + } else { + return new GroupType( + type.getRepetition(), + renamedColumns.get(type.getName()), + type.asGroupType().getFields()); + } + }) + .collect(Collectors.toList()); + return new MessageType(schema.getName(), fields); + } + private Map getExtraMetadata(RewriteOptions options) { List allFiles; if (options.getIgnoreJoinFilesMetadata()) { @@ -421,6 +449,27 @@ public void processBlocks() throws IOException { if (readerToJoin != null) readerToJoin.close(); } + private ColumnPath renameFieldsInPath(ColumnPath path) { + if (renamedColumns == null) { + return path; + } else { + String[] pathArray = path.toArray(); + pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]); + return ColumnPath.get(pathArray); + } + } + + private PrimitiveType renameNameInType(PrimitiveType type) { + if (renamedColumns == null) { + return type; + } else { + return new PrimitiveType( + type.getRepetition(), + type.asPrimitiveType().getPrimitiveTypeName(), + renamedColumns.getOrDefault(type.getName(), type.getName())); + } + } + private void processBlock( TransParquetFileReader reader, int blockIdx, @@ -431,7 +480,27 @@ private void processBlock( if (chunk.isEncrypted()) { throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); } - ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx); + + ColumnChunkMetaData chunkColumnsRenamed = chunk; + if (renamedColumns != null && !renamedColumns.isEmpty()) { + chunkColumnsRenamed = ColumnChunkMetaData.get( + renameFieldsInPath(chunk.getPath()), + renameNameInType(chunk.getPrimitiveType()), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + chunk.getFirstDataPageOffset(), + chunk.getDictionaryPageOffset(), + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize(), + chunk.getSizeStatistics()); + } + + ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); + ColumnDescriptor descriptorRenamed = + outSchemaWithRenamedColumns.getColumns().get(outColumnIdx); BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx); String originalCreatedBy = reader.getFileMetaData().getCreatedBy(); @@ -443,13 +512,21 @@ private void processBlock( // Mask column and compress it again. MaskMode maskMode = maskColumns.get(chunk.getPath()); if (maskMode.equals(MaskMode.NULLIFY)) { - Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition(); + Type.Repetition repetition = + descriptorOriginal.getPrimitiveType().getRepetition(); if (repetition.equals(Type.Repetition.REQUIRED)) { - throw new IOException( - "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified"); + throw new IOException("Required column [" + + descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified"); } nullifyColumn( - reader, blockIdx, descriptor, chunk, writer, newCodecName, encryptColumn, originalCreatedBy); + reader, + blockIdx, + descriptorOriginal, + chunk, + writer, + newCodecName, + encryptColumn, + originalCreatedBy); } else { throw new UnsupportedOperationException("Only nullify is supported for now"); } @@ -462,7 +539,7 @@ private void processBlock( } // Translate compression and/or encryption - writer.startColumn(descriptor, chunk.getValueCount(), newCodecName); + writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName); processChunk( reader, blockMetaData.getRowCount(), @@ -480,7 +557,8 @@ private void processBlock( BloomFilter bloomFilter = indexCache.getBloomFilter(chunk); ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); - writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex); + writer.appendColumnChunk( + descriptorRenamed, reader.getStream(), chunkColumnsRenamed, bloomFilter, columnIndex, offsetIndex); } } @@ -522,7 +600,7 @@ private void processChunk( } if (bloomFilter != null) { - writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter); + writer.addBloomFilter(renameFieldsInPath(chunk.getPath()).toDotString(), bloomFilter); } reader.setStreamPosition(chunk.getStartingPos()); @@ -580,7 +658,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - chunk.getPrimitiveType(), + renameNameInType(chunk.getPrimitiveType()), headerV1.getStatistics(), columnIndex, pageOrdinal, @@ -648,7 +726,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - chunk.getPrimitiveType(), + renameNameInType(chunk.getPrimitiveType()), headerV2.getStatistics(), columnIndex, pageOrdinal, @@ -887,7 +965,7 @@ private void nullifyColumn( CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(newCodecName); // Create new schema that only has the current column - MessageType newSchema = newSchema(outSchema, descriptor); + MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, descriptor)); ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( compressor, newSchema, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index a69403f464..710109f6c2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -49,6 +49,7 @@ public class RewriteOptions { private final List pruneColumns; private final CompressionCodecName newCodecName; private final Map maskColumns; + private final Map renameColumns; private final List encryptColumns; private final FileEncryptionProperties fileEncryptionProperties; private final IndexCache.CacheStrategy indexCacheStrategy; @@ -63,6 +64,7 @@ private RewriteOptions( List pruneColumns, CompressionCodecName newCodecName, Map maskColumns, + Map renameColumns, List encryptColumns, FileEncryptionProperties fileEncryptionProperties, IndexCache.CacheStrategy indexCacheStrategy, @@ -75,6 +77,7 @@ private RewriteOptions( this.pruneColumns = pruneColumns; this.newCodecName = newCodecName; this.maskColumns = maskColumns; + this.renameColumns = renameColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; this.indexCacheStrategy = indexCacheStrategy; @@ -192,6 +195,10 @@ public Map getMaskColumns() { return maskColumns; } + public Map gerRenameColumns() { + return renameColumns; + } + public List getEncryptColumns() { return encryptColumns; } @@ -221,6 +228,7 @@ public static class Builder { private List pruneColumns; private CompressionCodecName newCodecName; private Map maskColumns; + private Map renameColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE; @@ -432,6 +440,19 @@ public Builder mask(Map maskColumns) { return this; } + /** + * Set the columns to be renamed. + *

+ * Note that nested columns can't be renamed, in case of GroupType column only top level column can be renamed. + * + * @param renameColumns map where keys are original names and values are new names + * @return self + */ + public Builder renameColumns(Map renameColumns) { + this.renameColumns = renameColumns; + return this; + } + /** * Set the columns to encrypt. *

@@ -561,13 +582,29 @@ public RewriteOptions build() { !maskColumns.containsKey(pruneColumn), "Cannot prune and mask same column"); } } - if (encryptColumns != null) { for (String pruneColumn : pruneColumns) { Preconditions.checkArgument( !encryptColumns.contains(pruneColumn), "Cannot prune and encrypt same column"); } } + if (renameColumns != null) { + for (Map.Entry entry : renameColumns.entrySet()) { + Preconditions.checkArgument( + !encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column"); + } + } + } + + if (renameColumns != null && !renameColumns.isEmpty()) { + for (Map.Entry entry : renameColumns.entrySet()) { + Preconditions.checkArgument( + entry.getValue() != null && !entry.getValue().trim().isEmpty(), + "Renamed column target name can't be empty"); + Preconditions.checkArgument( + !entry.getKey().contains(".") && !entry.getValue().contains("."), + "Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed"); + } } if (encryptColumns != null && !encryptColumns.isEmpty()) { @@ -590,6 +627,7 @@ public RewriteOptions build() { pruneColumns, newCodecName, maskColumns, + renameColumns, encryptColumns, fileEncryptionProperties, indexCacheStrategy, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 34c90a4641..40c525339b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.rewrite; +import static java.util.Collections.emptyMap; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; @@ -181,10 +182,10 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E null); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(new HashSet<>(), false); + validatePageIndex(new HashSet<>(), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); @@ -199,7 +200,7 @@ public void setUp() { @Test public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { add(new Path(inputFiles.get(0).getFileName())); @@ -210,8 +211,8 @@ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { @Test public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { add(new Path(inputFiles.get(0).getFileName())); @@ -252,10 +253,10 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except null); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false); + validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(ImmutableSet.of("Links.Forward"), false); + validatePageIndex(ImmutableSet.of("Links.Forward"), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); @@ -264,7 +265,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except @Test public void testPruneNullifyTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -276,8 +277,8 @@ public void testPruneNullifyTranslateCodecSingleFile() throws Exception { @Test public void testPruneNullifyTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -327,7 +328,8 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except fileDecryptionProperties); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false); + validateColumnData( + new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, emptyMap()); // Verify column encryption ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -349,7 +351,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except @Test public void testPruneEncryptTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -361,8 +363,8 @@ public void testPruneEncryptTranslateCodecSingleFile() throws Exception { @Test public void testPruneEncryptTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -488,10 +490,10 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception // Verify the data are not changed for non-encrypted and non-masked columns. // Also make sure the masked column is nullified. - validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false); + validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false, emptyMap()); // Verify the page index - validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false); + validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false, emptyMap()); // Verify the column is encrypted ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -511,7 +513,7 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception @Test public void testNullifyEncryptSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -523,8 +525,8 @@ public void testNullifyEncryptSingleFile() throws Exception { @Test public void testNullifyEncryptTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -537,8 +539,8 @@ public void testNullifyEncryptTwoFiles() throws Exception { @Test public void testMergeTwoFilesOnly() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); // Only merge two files but do not change anything. List inputPaths = new ArrayList<>(); @@ -571,10 +573,59 @@ public void testMergeTwoFilesOnly() throws Exception { null); // Verify the merged data are not changed - validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false); + validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, emptyMap()); + + // Verify the page index + validatePageIndex(new HashSet<>(), false, emptyMap()); + + // Verify original.created.by is preserved + validateCreatedBy(); + validateRowGroupRowCount(); + } + + @Test + public void testMergeTwoFilesOnlyRenameColumn() throws Exception { + addGzipInputFile(); + addUncompressedInputFile(); + + // Only merge two files but do not change anything. + List inputPaths = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + inputPaths.add(new Path(inputFile.getFileName())); + } + Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); + RewriteOptions.Builder builder = createBuilder(inputPaths); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) + .renameColumns(ImmutableMap.of("Name", "NameRenamed")) + .build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + // Verify the schema is not changed + ParquetMetadata pmd = + ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + MessageType expectSchema = createSchemaWithRenamed(); + assertEquals(expectSchema, schema); + + // Verify codec has not been translated + verifyCodec( + outputFile, + new HashSet() { + { + add(CompressionCodecName.GZIP); + add(CompressionCodecName.UNCOMPRESSED); + } + }, + null); + + // Verify the merged data are not changed + validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, renameColumns); // Verify the page index - validatePageIndex(new HashSet<>(), false); + validatePageIndex(new HashSet<>(), false, renameColumns); // Verify original.created.by is preserved validateCreatedBy(); @@ -648,7 +699,7 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput @Test public void testRewriteFileWithMultipleBlocks() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -823,12 +874,13 @@ public void testOneInputFileManyInputFilesToJoinSetup(boolean joinColumnsOverwri new HashSet<>(pruneColumns), maskColumns.keySet(), fileDecryptionProperties, - joinColumnsOverwrite); // Verify data + joinColumnsOverwrite, + emptyMap()); // Verify data validateSchemaWithGenderColumnPruned(true); // Verify schema validateCreatedBy(); // Verify original.created.by assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom filters verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD), fileDecryptionProperties); // Verify codec - validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite); + validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite, emptyMap()); } private void testOneInputFileManyInputFilesToJoinSetup() throws IOException { @@ -884,11 +936,27 @@ private MessageType createSchemaToJoin() { new PrimitiveType(REPEATED, BINARY, "Forward"))); } + private MessageType createSchemaWithRenamed() { + return new MessageType( + "schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "NameRenamed"), + new PrimitiveType(OPTIONAL, BINARY, "Gender"), + new PrimitiveType(REPEATED, FLOAT, "FloatFraction"), + new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"), + new GroupType( + OPTIONAL, + "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + } + private void validateColumnData( Set prunePaths, Set nullifiedPaths, FileDecryptionProperties fileDecryptionProperties, - Boolean joinColumnsOverwrite) + Boolean joinColumnsOverwrite, + Map renameColumns) throws IOException { ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) @@ -901,7 +969,7 @@ private void validateColumnData( List filesJoined = inputFilesToJoin.stream() .flatMap(x -> Arrays.stream(x.getFileContent())) .collect(Collectors.toList()); - BiFunction groups = (name, rowIdx) -> { + BiFunction groupsExpected = (name, rowIdx) -> { if (!filesMain.get(0).getType().containsField(name) || joinColumnsOverwrite && !filesJoined.isEmpty() @@ -915,50 +983,53 @@ private void validateColumnData( int totalRows = inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum(); for (int i = 0; i < totalRows; i++) { - Group group = reader.read(); - assertNotNull(group); + Group groupActual = reader.read(); + assertNotNull(groupActual); if (!prunePaths.contains("DocId")) { if (nullifiedPaths.contains("DocId")) { - assertThrows(RuntimeException.class, () -> group.getLong("DocId", 0)); + assertThrows(RuntimeException.class, () -> groupActual.getLong("DocId", 0)); } else { assertEquals( - group.getLong("DocId", 0), groups.apply("DocId", i).getLong("DocId", 0)); + groupActual.getLong("DocId", 0), + groupsExpected.apply("DocId", i).getLong("DocId", 0)); } } if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) { + String colName = renameColumns.getOrDefault("Name", "Name"); assertArrayEquals( - group.getBinary("Name", 0).getBytes(), - groups.apply("Name", i).getBinary("Name", 0).getBytes()); + groupActual.getBinary(colName, 0).getBytes(), + groupsExpected.apply("Name", i).getBinary("Name", 0).getBytes()); } if (!prunePaths.contains("Gender") && !nullifiedPaths.contains("Gender")) { assertArrayEquals( - group.getBinary("Gender", 0).getBytes(), - groups.apply("Gender", i).getBinary("Gender", 0).getBytes()); + groupActual.getBinary("Gender", 0).getBytes(), + groupsExpected.apply("Gender", i).getBinary("Gender", 0).getBytes()); } if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) { assertEquals( - group.getFloat("FloatFraction", 0), - groups.apply("FloatFraction", i).getFloat("FloatFraction", 0), + groupActual.getFloat("FloatFraction", 0), + groupsExpected.apply("FloatFraction", i).getFloat("FloatFraction", 0), 0); } if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) { assertEquals( - group.getDouble("DoubleFraction", 0), - groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0), + groupActual.getDouble("DoubleFraction", 0), + groupsExpected.apply("DoubleFraction", i).getDouble("DoubleFraction", 0), 0); } - Group subGroup = group.getGroup("Links", 0); + Group subGroup = groupActual.getGroup("Links", 0); if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) { assertArrayEquals( subGroup.getBinary("Backward", 0).getBytes(), - groups.apply("Links", i) + groupsExpected + .apply("Links", i) .getGroup("Links", 0) .getBinary("Backward", 0) .getBytes()); @@ -970,7 +1041,8 @@ private void validateColumnData( } else { assertArrayEquals( subGroup.getBinary("Forward", 0).getBytes(), - groups.apply("Links", i) + groupsExpected + .apply("Links", i) .getGroup("Links", 0) .getBinary("Forward", 0) .getBytes()); @@ -1014,13 +1086,22 @@ interface CheckedFunction { R apply(T t) throws IOException; } + private ColumnPath renameFieldsInPath(ColumnPath path, Map renameColumns) { + String[] pathArray = path.toArray(); + if (renameColumns != null) { + pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]); + } + return ColumnPath.get(pathArray); + } + /** * Verify the page index is correct. * * @param exclude the columns to exclude from comparison, for example because they were nullified. * @param joinColumnsOverwrite whether a join columns overwrote existing overlapping columns. */ - private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite) throws Exception { + private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite, Map renameColumns) + throws Exception { class BlockMeta { final TransParquetFileReader reader; final BlockMetaData blockMeta; @@ -1058,6 +1139,8 @@ class BlockMeta { List inBlocksJoined = blockMetaExtractor.apply( inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList())); List outBlocks = blockMetaExtractor.apply(ImmutableList.of(outputFile)); + Map renameColumnsInverted = + renameColumns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) { BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta; TransParquetFileReader outReader = outBlocks.get(blockIdx).reader; @@ -1066,17 +1149,18 @@ class BlockMeta { TransParquetFileReader inReader; BlockMetaData inBlockMeta; ColumnChunkMetaData inChunk; - if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath()) + ColumnPath colPath = renameFieldsInPath(outChunk.getPath(), renameColumnsInverted); + if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath) || joinColumnsOverwrite && !inBlocksJoined.isEmpty() - && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) { + && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(colPath)) { inReader = inBlocksJoined.get(blockIdx).reader; inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta; - inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath()); + inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(colPath); } else { inReader = inBlocksMain.get(blockIdx).reader; inBlockMeta = inBlocksMain.get(blockIdx).blockMeta; - inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath()); + inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(colPath); } ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); @@ -1284,13 +1368,13 @@ conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER) assertEquals(expectSchema, actualSchema); } - private void ensureContainsGzipFile() { + private void addGzipInputFile() { if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) { inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn); } } - private void ensureContainsUncompressedFile() { + private void addUncompressedInputFile() { if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) { inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn); }