Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8800] Introduce SingleSparkConsistentBucketClusteringExecutionStrategy to improve performance #12537

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
public static final String SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
public static final String PLAN_PARTITION_FILTER_MODE =
Expand Down Expand Up @@ -653,9 +655,11 @@ private void validate() {
ValidationUtils.checkArgument(
planStrategy.equalsIgnoreCase(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
"Consistent hashing bucket index only supports clustering plan strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
String clusteringConfigString = clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME);
ValidationUtils.checkArgument(
clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
"Consistent hashing bucket index only supports clustering execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
clusteringConfigString.equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY) || clusteringConfigString.equals(SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY),
"Consistent hashing bucket index only supports clustering execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY + " or "
+ SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY);
}
}
}
Expand Down Expand Up @@ -683,7 +687,7 @@ private String getDefaultPlanStrategyClassName(EngineType engineType) {
private String getDefaultExecutionStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
return isConsistentHashingBucketIndex() ? SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY : SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
return isConsistentHashingBucketIndex() ? SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY : SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,31 @@

package org.apache.hudi.table.action.cluster.strategy;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;

/**
Expand All @@ -41,12 +55,14 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O> implements Seriali
private final transient HoodieEngineContext engineContext;
protected final HoodieWriteConfig writeConfig;
protected final HoodieRecordType recordType;
protected final Schema readerSchemaWithMetaFields;

public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
this.hoodieTable = table;
this.engineContext = engineContext;
this.recordType = table.getConfig().getRecordMerger().getRecordType();
this.readerSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
}

/**
Expand All @@ -67,4 +83,52 @@ protected HoodieEngineContext getEngineContext() {
protected HoodieWriteConfig getWriteConfig() {
return this.writeConfig;
}

protected ClosableIterator<HoodieRecord<T>> getRecordIteratorWithLogFiles(ClusteringOperation operation, String instantTime, long maxMemory,
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> baseFileReaderOpt) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getStorage())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(operation.getDeltaFilePaths())
.withReaderSchema(readerSchemaWithMetaFields)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemory)
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(operation.getPartitionPath())
.withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
.withTableMetaClient(table.getMetaClient())
.build();

try {
return new HoodieFileSliceReader(baseFileReaderOpt, scanner, readerSchemaWithMetaFields, tableConfig.getPreCombineField(), config.getRecordMerger(),
tableConfig.getProps(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading file slices", e);
}
}

protected ClosableIterator<HoodieRecord<T>> getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, HoodieFileReader baseFileReader) {
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be shuffled later.
ClosableIterator<HoodieRecord> baseRecordsIterator;
try {
baseRecordsIterator = baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading base file", e);
}
return new CloseableMappingIterator(
baseRecordsIterator,
rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, writeConfig.getProps(), keyGeneratorOpt));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ public void testConsistentBucketIndexDefaultClusteringConfig() {
.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build())
.build();
assertEquals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY, writeConfig.getClusteringPlanStrategyClass());
assertEquals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY, writeConfig.getClusteringExecutionStrategyClass());
assertEquals(HoodieClusteringConfig.SINGLE_SPARK_JOB_CONSISTENT_HASHING_EXECUTION_STRATEGY, writeConfig.getClusteringExecutionStrategyClass());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,23 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -57,12 +50,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
Expand Down Expand Up @@ -167,55 +158,19 @@ private List<HoodieRecord<T>> readRecordsForGroup(HoodieClusteringGroup clusteri
private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps,
String instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
List<HoodieRecord<T>> records = new ArrayList<>();

clusteringOps.forEach(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
Option<HoodieFileReader> baseFileReader = Option.empty();
HoodieMergedLogRecordScanner scanner = null;
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(table.getStorage())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
.build();

baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieIOFactory.getIOFactory(table.getStorage()).getReaderFactory(recordType)
.getFileReader(config, new StoragePath(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Option<BaseKeyGenerator> keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
Iterator<HoodieRecord<T>> fileSliceReader = new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), writeConfig.getRecordMerger(),
tableConfig.getProps(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
fileSliceReader.forEachRemaining(records::add);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
} finally {
if (scanner != null) {
scanner.close();
}
if (baseFileReader.isPresent()) {
baseFileReader.get().close();
}
}
});
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);

List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>(clusteringOps.size());
clusteringOps.forEach(op -> suppliers.add(() -> {
Option<HoodieFileReader> baseFileReader = ClusteringUtils.getBaseFileReader(getHoodieTable().getStorage(), recordType, getWriteConfig(), op.getDataFilePath());
return getRecordIteratorWithLogFiles(op, instantTime, maxMemoryPerCompaction, Option.empty(), baseFileReader);
}));
LazyConcatenatingIterator<HoodieRecord<T>> lazyIterator = new LazyConcatenatingIterator<>(suppliers);

lazyIterator.forEachRemaining(records::add);
lazyIterator.close();
return records;
}

Expand All @@ -224,21 +179,17 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
*/
private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
try (HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(getHoodieTable().getStorage())
.getReaderFactory(recordType)
.getFileReader(getHoodieTable().getConfig(), new StoragePath(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into the records(List).
recordIterator.forEachRemaining(record -> records.add(record.copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, new Properties(), Option.empty())));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
List<Supplier<ClosableIterator<HoodieRecord<T>>>> suppliers = new ArrayList<>(clusteringOps.size());
clusteringOps.forEach(
op -> suppliers.add(() -> {
Option<HoodieFileReader> baseFileReaderOpt = ClusteringUtils.getBaseFileReader(getHoodieTable().getStorage(), recordType, getWriteConfig(), op.getDataFilePath());
ValidationUtils.checkArgument(baseFileReaderOpt.isPresent(), "Base file reader should be present for base file only read.");
return getRecordIteratorWithBaseFileOnly(Option.empty(), baseFileReaderOpt.get());
}));
LazyConcatenatingIterator<HoodieRecord<T>> lazyIterator = new LazyConcatenatingIterator<>(suppliers);

lazyIterator.forEachRemaining(records::add);
lazyIterator.close();
return records;
}
}
Loading
Loading