diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 2ba0d553f58c7..5b54151dc4cbd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -495,7 +495,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, preCommit(metadata); } // Update table's metadata (table) - writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata, writeStatuses.orElse(context.emptyHoodieData())); + writeTableMetadata(table, clusteringInstant.getTimestamp(), metadata, writeStatuses.orElseGet(context::emptyHoodieData)); LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); @@ -1016,7 +1016,7 @@ private List getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()) - .orElse(createNewInstantTime(!skipLocking)); + .orElseGet(() -> createNewInstantTime(!skipLocking)); return rollback(commitInstantTime, pendingRollbackInfo, rollbackInstantTime, skipLocking); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index a3aa669902758..a510dd0bf4a8d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -297,7 +297,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom InternalSchema internalSchema; Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { - internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema)); + internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> AvroInternalSchemaConverter.convert(avroSchema)); internalSchema.setSchemaId(Long.parseLong(instantTime)); } else { internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 1bea51721c8b3..0277492ca0965 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -81,7 +81,7 @@ public static Option resolveWriteConflictIfAny( table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant), completedInstantsDuringCurrentWriteOperation); - final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata())); + final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElseGet(HoodieCommitMetadata::new)); instantStream.forEach(instant -> { try { ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index e4afb885ec49e..ee0006b0b05e8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -641,7 +641,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant, Function> getPendingRollbackInstantFunc) { final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry -> entry.getRollbackInstant().getTimestamp()) - .orElse(getMetaClient().createNewInstantTime()); + .orElseGet(() -> getMetaClient().createNewInstantTime()); scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(), false); rollback(context, commitTime, inflightInstant, false, false); @@ -657,7 +657,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant, public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Function> getPendingRollbackInstantFunc) { final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry -> entry.getRollbackInstant().getTimestamp()) - .orElse(getMetaClient().createNewInstantTime()); + .orElseGet(() -> getMetaClient().createNewInstantTime()); scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(), false); rollback(context, commitTime, inflightInstant, true, false); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java index 0cddcbd116c79..2adb7dddeb4e4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java @@ -45,7 +45,7 @@ public BaseHoodieFunctionalIndexClient() { public void register(HoodieTableMetaClient metaClient, String indexName, String indexType, Map> columns, Map options) { LOG.info("Registering index {} of using {}", indexName, indexType); String indexMetaPath = metaClient.getTableConfig().getIndexDefinitionPath() - .orElse(metaClient.getMetaPath() + Path.SEPARATOR + HoodieTableMetaClient.INDEX_DEFINITION_FOLDER_NAME + Path.SEPARATOR + HoodieTableMetaClient.INDEX_DEFINITION_FILE_NAME); + .orElseGet(() -> metaClient.getMetaPath() + Path.SEPARATOR + HoodieTableMetaClient.INDEX_DEFINITION_FOLDER_NAME + Path.SEPARATOR + HoodieTableMetaClient.INDEX_DEFINITION_FILE_NAME); // build HoodieFunctionalIndexMetadata and then add to index definition file metaClient.buildFunctionalIndexDefinition(indexMetaPath, indexName, indexType, columns, options); // update table config if necessary diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java index 29da31b478cbb..1e0330a4defc2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java @@ -90,7 +90,7 @@ public HoodieSavepointMetadata execute() { } catch (IOException e) { throw new HoodieSavepointException("Failed to savepoint " + instantTime, e); } - }).orElse(table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp()); + }).orElseGet(() -> table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp()); // Cannot allow savepoint time on a commit that could have been cleaned ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained), diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 03a207c4feb83..7d4b0b2979403 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -133,7 +133,7 @@ protected void completeClustering( // commit to data table after committing to metadata table. // We take the lock here to ensure all writes to metadata table happens within a single lock (single writer). // Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElse(context.emptyHoodieData())); + writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElseGet(context::emptyHoodieData)); LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 45010bdf230af..5503573656c66 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -78,7 +78,7 @@ public HoodieWriteMetadata> bulkInsert(final List JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode())); // write new files List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 4ae8552f6c079..20fc7e9f47992 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -220,7 +220,7 @@ private BulkInsertPartitioner getPartitioner(Map strategy default: throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); } - }).orElse(isRowPartitioner + }).orElseGet(() -> isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index fc4b8bf100624..2f57f6bb18b67 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -74,7 +74,7 @@ public HoodieWriteMetadata> bulkInsert(final HoodieData< executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(table, config)); + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElseGet(() -> BulkInsertInternalPartitionerFactory.get(table, config)); // Write new files HoodieData writeStatuses = diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index 788e1040783f0..ac84475bfa412 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -71,7 +71,7 @@ public HoodieWriteMetadata> execute() { protected Partitioner getPartitioner(WorkloadProfile profile) { return table.getStorageLayout().layoutPartitionerClass() .map(c -> getLayoutPartitioner(profile, c)) - .orElse(new SparkInsertOverwritePartitioner(profile, context, table, config)); + .orElseGet(() -> new SparkInsertOverwritePartitioner(profile, context, table, config)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index d84679eaf923a..55877938f8cb5 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -97,19 +97,15 @@ object AvroConversionUtils { * TODO convert directly from GenericRecord into InternalRow instead */ def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = { - if (rdd.isEmpty()) { - ss.emptyDataFrame - } else { - ss.createDataFrame(rdd.mapPartitions { records => - if (records.isEmpty) Iterator.empty - else { - val schema = new Schema.Parser().parse(schemaStr) - val dataType = convertAvroSchemaToStructType(schema) - val converter = createConverterToRow(schema, dataType) - records.map { r => converter(r) } - } - }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))) - } + ss.createDataFrame(rdd.mapPartitions { records => + if (records.isEmpty) Iterator.empty + else { + val schema = new Schema.Parser().parse(schemaStr) + val dataType = convertAvroSchemaToStructType(schema) + val converter = createConverterToRow(schema, dataType) + records.map { r => converter(r) } + } + }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))) } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 824a94abab4bd..bf7e25393c86e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -144,7 +144,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, Option beginInstantTime, Option endInstantTime) { this.partitionColumns = metaClient.getTableConfig().getPartitionFields() - .orElse(new String[0]); + .orElseGet(() -> new String[0]); this.metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(configProperties) @@ -284,7 +284,7 @@ private Map> loadFileSlicesForPartitions(List fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) ) - .orElse(fileSystemView.getLatestFileSlices(partitionPath.path)) + .orElseGet(() -> fileSystemView.getLatestFileSlices(partitionPath.path)) .collect(Collectors.toList()) )); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index cd438aa965cd1..85d00ecb18d77 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -164,7 +164,7 @@ public Integer getInt(ConfigProperty configProperty) { public Integer getIntOrDefault(ConfigProperty configProperty) { Option rawValue = getRawValue(configProperty); return rawValue.map(v -> Integer.parseInt(v.toString())) - .orElse(Integer.parseInt(configProperty.defaultValue().toString())); + .orElseGet(() -> Integer.parseInt(configProperty.defaultValue().toString())); } public Boolean getBoolean(ConfigProperty configProperty) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 17c77d807c051..4fc7996c87371 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -956,7 +956,7 @@ private Pair, Schema> getRecordsIterator( .orElse(Function.identity()); Schema schema = schemaEvolutionTransformerOpt.map(Pair::getRight) - .orElse(dataBlock.getSchema()); + .orElseGet(dataBlock::getSchema); return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, transformer), schema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java index 86011e865dc04..20b9c802f6051 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -131,7 +131,7 @@ private CompletableFuture startConsumingAsync() { return (Void) null; }, consumerExecutorService) ) - .orElse(CompletableFuture.completedFuture(null)); + .orElseGet(() -> CompletableFuture.completedFuture(null)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java index cece36291dffc..5e86570d2917c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java @@ -108,14 +108,14 @@ public Expression visitPredicate(Predicate predicate) { Predicates.IsNull isNull = (Predicates.IsNull) predicate; return Option.ofNullable(isNull.child.accept(this)) .map(expr -> (Expression)Predicates.isNull(expr)) - .orElse(alwaysTrue()); + .orElseGet(this::alwaysTrue); } if (predicate instanceof Predicates.IsNotNull) { Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate; return Option.ofNullable(isNotNull.child.accept(this)) .map(expr -> (Expression)Predicates.isNotNull(expr)) - .orElse(alwaysTrue()); + .orElseGet(this::alwaysTrue); } if (predicate instanceof Predicates.StringStartsWith) { diff --git a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java index 9c6d92821dd63..6b508da77f8dd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java @@ -116,7 +116,7 @@ public void create( List newSecondaryIndexes = secondaryIndexes.map(h -> { h.add(secondaryIndexToAdd); return h; - }).orElse(Collections.singletonList(secondaryIndexToAdd)); + }).orElseGet(() -> Collections.singletonList(secondaryIndexToAdd)); newSecondaryIndexes.sort(new HoodieSecondaryIndex.HoodieIndexCompactor()); // Persistence secondary indexes' metadata to hoodie.properties file diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 1b7c2db2daa12..ccb0968b169c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -358,7 +358,7 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { throw new HoodieIOException("Failed to extract file-statuses from the payload", e); } }) - .orElse(new FileStatus[0]); + .orElseGet(() -> new FileStatus[0]); LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length); return statuses; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 2bb8e0a59eca1..57cc08ab59ff6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -577,7 +577,7 @@ public HoodieTableFileSystemView getMetadataFileSystemView() { public Map stats() { Set allMetadataPartitionPaths = Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); - return metrics.map(m -> m.getStats(true, metadataMetaClient, this, allMetadataPartitionPaths)).orElse(new HashMap<>()); + return metrics.map(m -> m.getStats(true, metadataMetaClient, this, allMetadataPartitionPaths)).orElseGet(HashMap::new); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 839a7ed41a31a..076aac0454fa3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1029,7 +1029,7 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta Option fileSystemView, String partition, boolean mergeFileSlices) { - HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient)); + HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient)); Stream fileSliceStream; if (mergeFileSlices) { if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) { @@ -1057,7 +1057,7 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta public static List getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient, Option fileSystemView, String partition) { - HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient)); + HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient)); Stream fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition); return fileSliceStream .sorted(Comparator.comparing(FileSlice::getFileId)) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index b4ef68a39392d..5d1b01981dd83 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -176,12 +176,12 @@ public HoodieTableSource( this.dataPruner = dataPruner; this.partitionPruner = partitionPruner; this.dataBucket = dataBucket; - this.requiredPos = Optional.ofNullable(requiredPos).orElse(IntStream.range(0, this.tableRowType.getFieldCount()).toArray()); + this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() -> IntStream.range(0, this.tableRowType.getFieldCount()).toArray()); this.limit = Optional.ofNullable(limit).orElse(NO_LIMIT_CONSTANT); this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); - this.metaClient = Optional.ofNullable(metaClient).orElse(StreamerUtil.metaClientForReader(conf, hadoopConf)); + this.metaClient = Optional.ofNullable(metaClient).orElseGet(() -> StreamerUtil.metaClientForReader(conf, hadoopConf)); this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); - this.internalSchemaManager = Optional.ofNullable(internalSchemaManager).orElse(InternalSchemaManager.get(this.conf, this.metaClient)); + this.internalSchemaManager = Optional.ofNullable(internalSchemaManager).orElseGet(() -> InternalSchemaManager.get(this.conf, this.metaClient)); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 04b65d8878aa8..88302ec74ce0f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -48,8 +48,11 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; @@ -60,8 +63,6 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; @@ -290,7 +291,7 @@ private List listStatusForSnapshotMode(JobConf job, List fileSlices = queryInstant.map( instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant)) - .orElse(fsView.getLatestFileSlices(relativePartitionPath)) + .orElseGet(() -> fsView.getLatestFileSlices(relativePartitionPath)) .collect(Collectors.toList()); filteredFileSlices.addAll(fileSlices); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 68f425fd998ff..15d983ee48bf9 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -289,7 +289,7 @@ private File getLogTempFile(long startTime, long endTime, String diskType) { return Arrays.stream(new File("/tmp").listFiles()) .filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime) .findFirst() - .orElse(new File("")); + .orElseGet(() -> new File("")); } @Test diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java index 1e27b29ae2d5b..cce507b9fca35 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java @@ -189,7 +189,7 @@ public static String getPartitionColumns(KeyGenerator keyGenerator, TypedPropert if (keyGenerator instanceof CustomAvroKeyGenerator) { return ((BaseKeyGenerator) keyGenerator).getPartitionPathFields().stream().map( pathField -> Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)) - .findFirst().orElse("Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) + .findFirst().orElseGet(() -> "Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) .collect(Collectors.joining(",")); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index 1e20e4ab663da..6719b7356e18d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -82,7 +82,7 @@ private HoodieWriteMetadata> buildHoodieWriteMetadata(Optio hoodieWriteMetadata.setWriteStatuses(HoodieJavaRDD.getJavaRDD(statuses)); hoodieWriteMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses)); return hoodieWriteMetadata; - }).orElse(new HoodieWriteMetadata<>()); + }).orElseGet(HoodieWriteMetadata::new); } public final HoodieWriteResult execute(Dataset records, boolean isTablePartitioned) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java index 69dd8ea795a70..9783113117ce1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java @@ -277,7 +277,7 @@ public static SparkRDDWriteClient createHoodieClient(JavaSp HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) - .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); + .orElseGet(() -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withParallelism(parallelism, parallelism) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java index c2b739c9f8bbc..4a4226724f8bc 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java @@ -31,8 +31,8 @@ import org.apache.hadoop.fs.FileSystem; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -97,7 +97,7 @@ public List getLatestFileSlicesStateless(String basePath, String p public List getLatestFileSlice(String basePath, String partitionPath, String fileId) { return viewManager.getFileSystemView(basePath).getLatestFileSlice(partitionPath, fileId) - .map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(new ArrayList<>()); + .map(FileSliceDTO::fromFileSlice).map(Arrays::asList).orElse(Collections.emptyList()); } public List getPendingCompactionOperations(String basePath) { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/TimelineHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/TimelineHandler.java index 5d788ac74fc18..b9a721aae363f 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/TimelineHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/TimelineHandler.java @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.FileSystem; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -43,7 +43,7 @@ public TimelineHandler(Configuration conf, TimelineService.Config timelineServic public List getLastInstant(String basePath) { return viewManager.getFileSystemView(basePath).getLastInstant().map(InstantDTO::fromInstant) - .map(Arrays::asList).orElse(new ArrayList<>()); + .map(Arrays::asList).orElse(Collections.emptyList()); } public TimelineDTO getTimeline(String basePath) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java index 794de225a5e67..9f892ab8f0e33 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/JsonToAvroSchemaConverter.java @@ -78,12 +78,12 @@ public String convert(String jsonSchema) throws IOException { } private static ArrayNode convertProperties(JsonNode jsonProperties, Set required) { - List avroFields = new ArrayList<>(); + List avroFields = new ArrayList<>(jsonProperties.size()); jsonProperties.fieldNames().forEachRemaining(name -> avroFields.add(tryConvertNestedProperty(name, jsonProperties.get(name)) - .or(tryConvertArrayProperty(name, jsonProperties.get(name))) - .or(tryConvertEnumProperty(name, jsonProperties.get(name))) - .orElse(convertProperty(name, jsonProperties.get(name), required.contains(name))))); + .or(() -> tryConvertArrayProperty(name, jsonProperties.get(name))) + .or(() -> tryConvertEnumProperty(name, jsonProperties.get(name))) + .orElseGet(() -> convertProperty(name, jsonProperties.get(name), required.contains(name))))); return MAPPER.createArrayNode().addAll(avroFields); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java index 64da4f4f50f5d..e658bde5853c4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java @@ -47,7 +47,7 @@ protected InputBatch> fetchNewData(Option lastCkptStr, l pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit); return selPathsWithMaxModificationTime.getLeft() .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selPathsWithMaxModificationTime.getRight())) - .orElse(new InputBatch<>(Option.empty(), selPathsWithMaxModificationTime.getRight())); + .orElseGet(() -> new InputBatch<>(Option.empty(), selPathsWithMaxModificationTime.getRight())); } private JavaRDD fromFiles(String pathStr) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index ff2debc8dccdf..c4839cc43120b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -612,7 +612,7 @@ private InputBatch fetchNextBatchFromSource(Option resumeCheckpointStr, AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), getAvroRecordQualifiedName(cfg.targetTableName))); schemaProvider = incomingSchemaOpt.map(incomingSchema -> getDeducedSchemaProvider(incomingSchema, dataAndCheckpoint.getSchemaProvider(), metaClient)) - .orElse(dataAndCheckpoint.getSchemaProvider()); + .orElseGet(dataAndCheckpoint::getSchemaProvider); if (useRowWriter) { inputBatchForWriter = new InputBatch(transformed, checkpointStr, schemaProvider); @@ -900,12 +900,12 @@ private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instant instantTime = startCommit(instantTime, !autoGenerateRecordKeys); if (useRowWriter) { - Dataset df = (Dataset) inputBatch.getBatch().orElse(hoodieSparkContext.getSqlContext().emptyDataFrame()); + Dataset df = (Dataset) inputBatch.getBatch().orElseGet(() -> hoodieSparkContext.getSqlContext().emptyDataFrame()); HoodieWriteConfig hoodieWriteConfig = prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema()); BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime); writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses()); } else { - JavaRDD records = (JavaRDD) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD()); + JavaRDD records = (JavaRDD) inputBatch.getBatch().orElseGet(() -> hoodieSparkContext.emptyRDD()); // filter dupes if needed if (cfg.filterDupes) { records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), records, writeClient.getConfig()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java index 367448533b315..4ff7dd6e1c2ac 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java @@ -124,12 +124,8 @@ private StructType getExpectedTransformedSchema(TransformerInfo transformerInfo, throw new HoodieTransformPlanException("Either source schema or source dataset should be available to fetch the schema"); } StructType incomingStruct = incomingStructOpt - .orElse(sourceSchemaOpt.isPresent() ? AvroConversionUtils.convertAvroSchemaToStructType(sourceSchemaOpt.get()) : rowDatasetOpt.get().schema()); - try { - return transformerInfo.getTransformer().transformedSchema(jsc, sparkSession, incomingStruct, properties).asNullable(); - } catch (Exception e) { - throw e; - } + .orElseGet(() -> sourceSchemaOpt.isPresent() ? AvroConversionUtils.convertAvroSchemaToStructType(sourceSchemaOpt.get()) : rowDatasetOpt.get().schema()); + return transformerInfo.getTransformer().transformedSchema(jsc, sparkSession, incomingStruct, properties).asNullable(); } @Override