Skip to content

Commit

Permalink
[Imporve][Connector-V2] Imporve iceberg source connector
Browse files Browse the repository at this point in the history
* Lock output row and update offset in pollNext method to support at-least-once
* Add `synchronized` to IcebergStreamSplitEnumerator#run method to avoid update conflicts with readers access `handleSplitRequest`
* Fix duplicate read splits when batch job state restore
* Fix assign split owner(negative number)
  • Loading branch information
hailin0 committed Jan 10, 2023
1 parent 74fd886 commit cce8cbb
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 57 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,4 @@ Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also
### next version

- [Feature] Support Hadoop3.x ([3046](https://github.com/apache/incubator-seatunnel/pull/3046))
- [Improve] Imporve iceberg source connector([2865](https://github.com/apache/incubator-seatunnel/pull/2865))
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.Table;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -58,52 +57,43 @@ public void open() {
icebergTableLoader.open();
}

@Override
public void run() {
refreshPendingSplits();
assignPendingSplits(context.registeredReaders());
}

@Override
public void close() throws IOException {
icebergTableLoader.close();
}

@Override
public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int subtaskId) {
log.debug("Add back splits {} to IcebergSourceEnumerator.",
splits);
addPendingSplits(splits);
if (context.registeredReaders().contains(subtaskId)) {
assignPendingSplits(Collections.singleton(subtaskId));
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void registerReader(int subtaskId) {
log.debug("Adding reader {} to IcebergSourceEnumerator.",
log.debug("Register reader {} to IcebergSourceEnumerator.",
subtaskId);
assignPendingSplits(Collections.singleton(subtaskId));
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
}

protected void refreshPendingSplits() {
List<IcebergFileScanTaskSplit> newSplits = loadNewSplits(icebergTableLoader.loadTable());
addPendingSplits(newSplits);
@Override
public void close() throws IOException {
icebergTableLoader.close();
}

protected abstract List<IcebergFileScanTaskSplit> loadNewSplits(Table table);
protected void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
log.debug("Add splits {} to pendingSplits", newSplits.size());

private void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
int numReaders = context.currentParallelism();
for (IcebergFileScanTaskSplit newSplit : newSplits) {
int ownerReader = newSplit.splitId().hashCode() % numReaders;
int ownerReader = (newSplit.splitId().hashCode() & Integer.MAX_VALUE) % numReaders;
pendingSplits
.computeIfAbsent(ownerReader, r -> new ArrayList<>())
.add(newSplit);
Expand All @@ -112,6 +102,8 @@ private void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
}

protected void assignPendingSplits(Set<Integer> pendingReaders) {
log.debug("Assign pendingSplits to readers {}", pendingReaders);

for (int pendingReader : pendingReaders) {
List<IcebergFileScanTaskSplit> pendingAssignmentForReader = pendingSplits.remove(pendingReader);
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
public class IcebergBatchSplitEnumerator extends AbstractSplitEnumerator {

private final IcebergScanContext icebergScanContext;
private final boolean shouldEnumerate;
private volatile boolean splitLoadReady;

public IcebergBatchSplitEnumerator(@NonNull SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context,
@NonNull IcebergScanContext icebergScanContext,
Expand All @@ -43,29 +45,39 @@ public IcebergBatchSplitEnumerator(@NonNull SourceSplitEnumerator.Context<Iceber
super(context, sourceConfig, restoreState != null ?
restoreState.getPendingSplits() : Collections.emptyMap());
this.icebergScanContext = icebergScanContext;
// split enumeration is not needed during restore scenario
this.shouldEnumerate = restoreState == null;
}

@Override
public void run() {
super.run();

public synchronized void run() {
Set<Integer> readers = context.registeredReaders();
log.debug("No more splits to assign." +
" Sending NoMoreSplitsEvent to reader {}.", readers);
if (shouldEnumerate) {
loadAllSplitsToPendingSplits(icebergTableLoader.loadTable());
assignPendingSplits(readers);
}

log.debug("No more splits to assign. Sending NoMoreSplitsEvent to readers {}.", readers);
readers.forEach(context::signalNoMoreSplits);
}

@Override
public IcebergSplitEnumeratorState snapshotState(long checkpointId) {
return new IcebergSplitEnumeratorState(null, pendingSplits);
if (splitLoadReady) {
return new IcebergSplitEnumeratorState(null, pendingSplits);
}
return null;
}

@Override
public void handleSplitRequest(int subtaskId) {
throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId);
}

@Override
protected List<IcebergFileScanTaskSplit> loadNewSplits(Table table) {
return IcebergScanSplitPlanner.planSplits(table, icebergScanContext);
private void loadAllSplitsToPendingSplits(Table table) {
List<IcebergFileScanTaskSplit> newSplits = IcebergScanSplitPlanner.planSplits(
table, icebergScanContext);
addPendingSplits(newSplits);
splitLoadReady = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.iceberg.Table;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -51,35 +50,41 @@ public IcebergStreamSplitEnumerator(@NonNull SourceSplitEnumerator.Context<Icebe
}
}

@Override
public synchronized void run() {
loadNewSplitsToPendingSplits(icebergTableLoader.loadTable());
assignPendingSplits(context.registeredReaders());
}

@Override
public IcebergSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
return new IcebergSplitEnumeratorState(enumeratorPosition.get(), pendingSplits);
}

@Override
public void handleSplitRequest(int subtaskId) {
synchronized (this) {
if (pendingSplits.isEmpty() ||
pendingSplits.get(subtaskId) == null) {
refreshPendingSplits();
}
assignPendingSplits(Collections.singleton(subtaskId));
public synchronized void handleSplitRequest(int subtaskId) {
if (pendingSplits.isEmpty() ||
pendingSplits.get(subtaskId) == null) {
loadNewSplitsToPendingSplits(icebergTableLoader.loadTable());
}
assignPendingSplits(Collections.singleton(subtaskId));
}

@Override
protected List<IcebergFileScanTaskSplit> loadNewSplits(Table table) {
private void loadNewSplitsToPendingSplits(Table table) {
IcebergEnumerationResult result = IcebergScanSplitPlanner.planStreamSplits(
table, icebergScanContext, enumeratorPosition.get());
if (!Objects.equals(result.getFromPosition(), enumeratorPosition.get())) {
log.info("Skip {} loaded splits because the scan starting position doesn't match " +
log.warn("Skip {} loaded splits because the scan starting position doesn't match " +
"the current enumerator position: enumerator position = {}, scan starting position = {}",
result.getSplits().size(), enumeratorPosition.get(), result.getFromPosition());
return Collections.emptyList();
} else {
enumeratorPosition.set(result.getToPosition());
log.debug("Update enumerator position to {}", result.getToPosition());
return result.getSplits();
return;
}

addPendingSplits(result.getSplits());

enumeratorPosition.set(result.getToPosition());
if (!Objects.equals(enumeratorPosition.get(), result.getToPosition())) {
log.info("Update enumerator position to {}", result.getToPosition());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ public CloseableIterator<SeaTunnelRow> open(@NonNull IcebergFileScanTaskSplit sp
OffsetSeekIterator<Record> seekIterator = new OffsetSeekIterator(iterator);
seekIterator.seek(split.getRecordOffset());

return CloseableIterator.transform(seekIterator, record -> {
SeaTunnelRow seaTunnelRow = deserializer.deserialize(record);
split.setRecordOffset(split.getRecordOffset() + 1);
return seaTunnelRow;
});
return CloseableIterator.transform(seekIterator, record -> deserializer.deserialize(record));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,18 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
currentReadSplit = pendingSplit;
try (CloseableIterator<SeaTunnelRow> rowIterator = icebergFileScanTaskSplitReader.open(currentReadSplit)) {
while (rowIterator.hasNext()) {
output.collect(rowIterator.next());
synchronized (output.getCheckpointLock()) {
currentReadSplit.setRecordOffset(currentReadSplit.getRecordOffset() + 1);
output.collect(rowIterator.next());
}
}
}
}

if (noMoreSplitsAssignment && Boundedness.BOUNDED.equals(context.getBoundedness())) {
context.signalNoMoreElement();
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
if (noMoreSplitsAssignment) {
context.signalNoMoreElement();
}
} else {
context.sendSplitRequest();
if (pendingSplits.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand All @@ -53,6 +54,7 @@
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -124,6 +126,7 @@ public void testIcebergSource(TestContainer container) throws IOException, Inter
}

private void initializeIcebergTable() {
FileUtil.fullyDelete(new File(CATALOG_DIR));
CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
CATALOG_TYPE,
WAREHOUSE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand All @@ -53,6 +54,7 @@
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -124,6 +126,7 @@ public void testIcebergSource(TestContainer container) throws IOException, Inter
}

private void initializeIcebergTable() {
FileUtil.fullyDelete(new File(CATALOG_DIR));
CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
CATALOG_TYPE,
WAREHOUSE,
Expand Down

0 comments on commit cce8cbb

Please sign in to comment.