From 77cfb3a51dabffdbfc67ba0ac85cfa129277c166 Mon Sep 17 00:00:00 2001 From: YueZhang <69956021+zhangyue19921010@users.noreply.github.com> Date: Wed, 29 Nov 2023 09:46:53 +0800 Subject: [PATCH] [HUDI-7147] Fix CDC write flush bug (#10186) * Using iterator instead of values to avoid unsupported operation exception * check style --- .../org/apache/hudi/io/HoodieCDCLogger.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index cab978164d8f..1e2fa7c59e41 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -53,10 +53,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE; import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER; @@ -84,7 +84,7 @@ public class HoodieCDCLogger implements Closeable { private final Schema cdcSchema; // the cdc data - private final Map cdcData; + private final ExternalSpillableMap cdcData; private final Map cdcDataBlockHeader; @@ -183,15 +183,16 @@ public void put(HoodieRecord hoodieRecord, private void flushIfNeeded(Boolean force) { if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize) { try { - List records = cdcData.values().stream() - .map(record -> { - try { - return new HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get()); - } catch (IOException e) { - throw new HoodieIOException("Failed to get cdc record", e); - } - }).collect(Collectors.toList()); - + ArrayList records = new ArrayList<>(); + Iterator recordIter = cdcData.iterator(); + while (recordIter.hasNext()) { + HoodieAvroPayload record = recordIter.next(); + try { + records.add(new HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get())); + } catch (IOException e) { + throw new HoodieIOException("Failed to get cdc record", e); + } + } HoodieLogBlock block = new HoodieCDCDataBlock(records, cdcDataBlockHeader, keyField); AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));