Skip to content

Commit

Permalink
[HUDI-7147] Fix CDC write flush bug (apache#10186)
Browse files Browse the repository at this point in the history
* Using iterator instead of values to avoid unsupported operation exception

* check style
  • Loading branch information
zhangyue19921010 authored Nov 29, 2023
1 parent 675abf1 commit 77cfb3a
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE;
import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER;
Expand Down Expand Up @@ -84,7 +84,7 @@ public class HoodieCDCLogger implements Closeable {
private final Schema cdcSchema;

// the cdc data
private final Map<String, HoodieAvroPayload> cdcData;
private final ExternalSpillableMap<String, HoodieAvroPayload> cdcData;

private final Map<HoodieLogBlock.HeaderMetadataType, String> cdcDataBlockHeader;

Expand Down Expand Up @@ -183,15 +183,16 @@ public void put(HoodieRecord hoodieRecord,
private void flushIfNeeded(Boolean force) {
if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize) {
try {
List<HoodieRecord> records = cdcData.values().stream()
.map(record -> {
try {
return new HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get());
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}).collect(Collectors.toList());

ArrayList<HoodieRecord> records = new ArrayList<>();
Iterator<HoodieAvroPayload> recordIter = cdcData.iterator();
while (recordIter.hasNext()) {
HoodieAvroPayload record = recordIter.next();
try {
records.add(new HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get()));
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}
HoodieLogBlock block = new HoodieCDCDataBlock(records, cdcDataBlockHeader, keyField);
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));

Expand Down

0 comments on commit 77cfb3a

Please sign in to comment.