Skip to content

Commit

Permalink
[MINOR] Fix usages of orElse (#10435)
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown authored and yihua committed Feb 27, 2024
1 parent c0e59e9 commit 26df317
Show file tree
Hide file tree
Showing 31 changed files with 57 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
preCommit(metadata);
}
// Update table's metadata (table)
writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata, writeStatuses.orElse(context.emptyHoodieData()));
writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata, writeStatuses.orElseGet(context::emptyHoodieData));

LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);

Expand Down Expand Up @@ -1008,7 +1008,8 @@ private List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
*/
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
return rollback(commitInstantTime, pendingRollbackInfo, rollbackInstantTime, skipLocking);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> AvroInternalSchemaConverter.convert(avroSchema));
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant),
completedInstantsDuringCurrentWriteOperation);

final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata()));
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElseGet(HoodieCommitMetadata::new));
instantStream.forEach(instant -> {
try {
ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,8 @@ public void rollbackInflightClustering(HoodieInstant inflightInstant,
private void rollbackInflightInstant(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
rollback(context, commitTime, inflightInstant, false, false);
Expand All @@ -658,7 +659,8 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant,
*/
public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
rollback(context, commitTime, inflightInstant, true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public HoodieSavepointMetadata execute() {
} catch (IOException e) {
throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
}
}).orElse(table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp());
}).orElseGet(() -> table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp());

// Cannot allow savepoint time on a commit that could have been cleaned
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void completeClustering(
// commit to data table after committing to metadata table.
// We take the lock here to ensure all writes to metadata table happens within a single lock (single writer).
// Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElse(context.emptyHoodieData()));
writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElseGet(context::emptyHoodieData));

LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord
config.shouldAllowMultiWriteOnSameInstant());
}

BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElseGet(() -> JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));

// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElse(isRowPartitioner
}).orElseGet(() -> isRowPartitioner
? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true)
: BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());

BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(table, config));
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElseGet(() -> BulkInsertInternalPartitionerFactory.get(table, config));

// Write new files
HoodieData<WriteStatus> writeStatuses =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
protected Partitioner getPartitioner(WorkloadProfile profile) {
return table.getStorageLayout().layoutPartitionerClass()
.map(c -> getLayoutPartitioner(profile, c))
.orElse(new SparkInsertOverwritePartitioner(profile, context, table, config));
.orElseGet(() -> new SparkInsertOverwritePartitioner(profile, context, table, config));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,15 @@ object AvroConversionUtils {
* TODO convert directly from GenericRecord into InternalRow instead
*/
def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = {
if (rdd.isEmpty()) {
ss.emptyDataFrame
} else {
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
val converter = createConverterToRow(schema, dataType)
records.map { r => converter(r) }
}
}, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
}
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
val converter = createConverterToRow(schema, dataType)
records.map { r => converter(r) }
}
}, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
Option<String> beginInstantTime,
Option<String> endInstantTime) {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElse(new String[0]);
.orElseGet(() -> new String[0]);

this.metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(configProperties)
Expand Down Expand Up @@ -284,7 +284,7 @@ private Map<PartitionPath, List<FileSlice>> loadFileSlicesForPartitions(List<Par
queryInstant.map(instant ->
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.orElseGet(() -> fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public <T> Integer getInt(ConfigProperty<T> configProperty) {
public <T> Integer getIntOrDefault(ConfigProperty<T> configProperty) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(v -> Integer.parseInt(v.toString()))
.orElse(Integer.parseInt(configProperty.defaultValue().toString()));
.orElseGet(() -> Integer.parseInt(configProperty.defaultValue().toString()));
}

public <T> Boolean getBoolean(ConfigProperty<T> configProperty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
.orElse(Function.identity());

Schema schema = schemaEvolutionTransformerOpt.map(Pair::getRight)
.orElse(dataBlock.getSchema());
.orElseGet(dataBlock::getSchema);

return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, transformer), schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private CompletableFuture<Void> startConsumingAsync() {
return (Void) null;
}, consumerExecutorService)
)
.orElse(CompletableFuture.completedFuture(null));
.orElseGet(() -> CompletableFuture.completedFuture(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ public Expression visitPredicate(Predicate predicate) {
Predicates.IsNull isNull = (Predicates.IsNull) predicate;
return Option.ofNullable(isNull.child.accept(this))
.map(expr -> (Expression)Predicates.isNull(expr))
.orElse(alwaysTrue());
.orElseGet(this::alwaysTrue);
}

if (predicate instanceof Predicates.IsNotNull) {
Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate;
return Option.ofNullable(isNotNull.child.accept(this))
.map(expr -> (Expression)Predicates.isNotNull(expr))
.orElse(alwaysTrue());
.orElseGet(this::alwaysTrue);
}

if (predicate instanceof Predicates.StringStartsWith) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException {
throw new HoodieIOException("Failed to extract file-statuses from the payload", e);
}
})
.orElse(new FileStatus[0]);
.orElseGet(() -> new FileStatus[0]);

LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length);
return statuses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public HoodieTableFileSystemView getMetadataFileSystemView() {

public Map<String, String> stats() {
Set<String> allMetadataPartitionPaths = Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
return metrics.map(m -> m.getStats(true, metadataMetaClient, this, allMetadataPartitionPaths)).orElse(new HashMap<>());
return metrics.map(m -> m.getStats(true, metadataMetaClient, this, allMetadataPartitionPaths)).orElseGet(HashMap::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient meta
Option<HoodieTableFileSystemView> fileSystemView,
String partition,
boolean mergeFileSlices) {
HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient));
HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient));
Stream<FileSlice> fileSliceStream;
if (mergeFileSlices) {
if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) {
Expand All @@ -1026,7 +1026,7 @@ private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient meta
public static List<FileSlice> getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient,
Option<HoodieTableFileSystemView> fileSystemView,
String partition) {
HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient));
HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient));
Stream<FileSlice> fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition);
return fileSliceStream
.sorted(Comparator.comparing(FileSlice::getFileId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void create(
List<HoodieSecondaryIndex> newSecondaryIndexes = secondaryIndexes.map(h -> {
h.add(secondaryIndexToAdd);
return h;
}).orElse(Collections.singletonList(secondaryIndexToAdd));
}).orElseGet(() -> Collections.singletonList(secondaryIndexToAdd));
newSecondaryIndexes.sort(new HoodieSecondaryIndex.HoodieIndexCompactor());

// Persistence secondary indexes' metadata to hoodie.properties file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
Expand All @@ -56,8 +59,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hudi.common.util.ValidationUtils.checkState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private File getLogTempFile(long startTime, long endTime, String diskType) {
return Arrays.stream(new File("/tmp").listFiles())
.filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime)
.findFirst()
.orElse(new File(""));
.orElseGet(() -> new File(""));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public static String getPartitionColumns(KeyGenerator keyGenerator, TypedPropert
if (keyGenerator instanceof CustomAvroKeyGenerator) {
return ((BaseKeyGenerator) keyGenerator).getPartitionPathFields().stream().map(
pathField -> Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX))
.findFirst().orElse("Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.findFirst().orElseGet(() -> "Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.collect(Collectors.joining(","));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildHoodieWriteMetadata(Optio
hoodieWriteMetadata.setWriteStatuses(HoodieJavaRDD.getJavaRDD(statuses));
hoodieWriteMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
return hoodieWriteMetadata;
}).orElse(new HoodieWriteMetadata<>());
}).orElseGet(HoodieWriteMetadata::new);
}

public final HoodieWriteResult execute(Dataset<Row> records, boolean isTablePartitioned) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSp
HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
.orElseGet(() -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -97,7 +97,7 @@ public List<FileSliceDTO> getLatestFileSlicesStateless(String basePath, String p

public List<FileSliceDTO> getLatestFileSlice(String basePath, String partitionPath, String fileId) {
return viewManager.getFileSystemView(basePath).getLatestFileSlice(partitionPath, fileId)
.map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(new ArrayList<>());
.map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(Collections.emptyList());
}

public List<CompactionOpDTO> getPendingCompactionOperations(String basePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -43,7 +43,7 @@ public TimelineHandler(Configuration conf, TimelineService.Config timelineServic

public List<InstantDTO> getLastInstant(String basePath) {
return viewManager.getFileSystemView(basePath).getLastInstant().map(InstantDTO::fromInstant)
.map(Arrays::asList).orElse(new ArrayList<>());
.map(Arrays::asList).orElse(Collections.emptyList());
}

public TimelineDTO getTimeline(String basePath) {
Expand Down
Loading

0 comments on commit 26df317

Please sign in to comment.