Skip to content

Commit

Permalink
[Imporve][Connector-V2] Imporve iceberg source connector
Browse files Browse the repository at this point in the history
* Lock output row and update offset in pollNext method to support at-least-once
* Add `synchronized` to IcebergStreamSplitEnumerator#run method to avoid update conflicts with readers access `handleSplitRequest`
* Fix duplicate read splits when batch job state restore
* Fix assign split owner(negative number)
  • Loading branch information
hailin0 committed Apr 1, 2023
1 parent f037533 commit 06071cd
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,4 @@ Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also

- [Feature] Support Hadoop3.x ([3046](https://github.com/apache/incubator-seatunnel/pull/3046))
- [improve][api] Refactoring schema parse ([4157](https://github.com/apache/incubator-seatunnel/pull/4157))

- [Improve] Imporve iceberg source connector([2865](https://github.com/apache/incubator-seatunnel/pull/2865))
2 changes: 2 additions & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
- [Jdbc] Add database field to sink config #4199
- [Doris] Refactor some Doris Sink code as well as support 2pc and cdc #4235
- [SelectDB Cloud] Refactor some SelectDB Cloud Sink code as well as support copy into batch and async flush and cdc #4312
- [Iceberg] Imporve iceberg source connector #2865

### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;

import org.apache.iceberg.Table;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -61,56 +59,47 @@ public void open() {
icebergTableLoader.open();
}

@Override
public void run() {
refreshPendingSplits();
assignPendingSplits(context.registeredReaders());
}

@Override
public void close() throws IOException {
icebergTableLoader.close();
}

@Override
public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int subtaskId) {
log.debug("Add back splits {} to IcebergSourceEnumerator.", splits);
addPendingSplits(splits);
if (context.registeredReaders().contains(subtaskId)) {
assignPendingSplits(Collections.singleton(subtaskId));
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void registerReader(int subtaskId) {
log.debug("Adding reader {} to IcebergSourceEnumerator.", subtaskId);
log.debug("Register reader {} to IcebergSourceEnumerator.", subtaskId);
assignPendingSplits(Collections.singleton(subtaskId));
}

public int currentUnassignedSplitSize() {
return pendingSplits.size();
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {}

protected void refreshPendingSplits() {
List<IcebergFileScanTaskSplit> newSplits = loadNewSplits(icebergTableLoader.loadTable());
addPendingSplits(newSplits);
@Override
public void close() throws IOException {
icebergTableLoader.close();
}

protected abstract List<IcebergFileScanTaskSplit> loadNewSplits(Table table);
protected void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
log.debug("Add splits {} to pendingSplits", newSplits.size());

private void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
int numReaders = context.currentParallelism();
for (IcebergFileScanTaskSplit newSplit : newSplits) {
int ownerReader = newSplit.splitId().hashCode() % numReaders;
int ownerReader = (newSplit.splitId().hashCode() & Integer.MAX_VALUE) % numReaders;
pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(newSplit);
log.info("Assigning {} to {} reader.", newSplit, ownerReader);
}
}

protected void assignPendingSplits(Set<Integer> pendingReaders) {
log.debug("Assign pendingSplits to readers {}", pendingReaders);

for (int pendingReader : pendingReaders) {
List<IcebergFileScanTaskSplit> pendingAssignmentForReader =
pendingSplits.remove(pendingReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
public class IcebergBatchSplitEnumerator extends AbstractSplitEnumerator {

private final IcebergScanContext icebergScanContext;
private final boolean shouldEnumerate;
private volatile boolean splitLoadReady;

public IcebergBatchSplitEnumerator(
@NonNull SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context,
Expand All @@ -47,28 +49,38 @@ public IcebergBatchSplitEnumerator(
sourceConfig,
restoreState != null ? restoreState.getPendingSplits() : Collections.emptyMap());
this.icebergScanContext = icebergScanContext;
// split enumeration is not needed during restore scenario
this.shouldEnumerate = restoreState == null;
}

@Override
public void run() {
super.run();

public synchronized void run() {
Set<Integer> readers = context.registeredReaders();
log.debug(
"No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers);
if (shouldEnumerate) {
loadAllSplitsToPendingSplits(icebergTableLoader.loadTable());
assignPendingSplits(readers);
}

log.debug("No more splits to assign. Sending NoMoreSplitsEvent to readers {}.", readers);
readers.forEach(context::signalNoMoreSplits);
}

@Override
public IcebergSplitEnumeratorState snapshotState(long checkpointId) {
return new IcebergSplitEnumeratorState(null, pendingSplits);
if (splitLoadReady) {
return new IcebergSplitEnumeratorState(null, pendingSplits);
}
return null;
}

@Override
public void handleSplitRequest(int subtaskId) {}
public void handleSplitRequest(int subtaskId) {
throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId);
}

@Override
protected List<IcebergFileScanTaskSplit> loadNewSplits(Table table) {
return IcebergScanSplitPlanner.planSplits(table, icebergScanContext);
private void loadAllSplitsToPendingSplits(Table table) {
List<IcebergFileScanTaskSplit> newSplits =
IcebergScanSplitPlanner.planSplits(table, icebergScanContext);
addPendingSplits(newSplits);
splitLoadReady = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -55,38 +54,44 @@ public IcebergStreamSplitEnumerator(
}
}

@Override
public synchronized void run() {
loadNewSplitsToPendingSplits(icebergTableLoader.loadTable());
assignPendingSplits(context.registeredReaders());
}

@Override
public IcebergSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
return new IcebergSplitEnumeratorState(enumeratorPosition.get(), pendingSplits);
}

@Override
public void handleSplitRequest(int subtaskId) {
synchronized (this) {
if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) {
refreshPendingSplits();
}
assignPendingSplits(Collections.singleton(subtaskId));
public synchronized void handleSplitRequest(int subtaskId) {
if (pendingSplits.isEmpty() || pendingSplits.get(subtaskId) == null) {
loadNewSplitsToPendingSplits(icebergTableLoader.loadTable());
}
assignPendingSplits(Collections.singleton(subtaskId));
}

@Override
protected List<IcebergFileScanTaskSplit> loadNewSplits(Table table) {
private void loadNewSplitsToPendingSplits(Table table) {
IcebergEnumerationResult result =
IcebergScanSplitPlanner.planStreamSplits(
table, icebergScanContext, enumeratorPosition.get());
if (!Objects.equals(result.getFromPosition(), enumeratorPosition.get())) {
log.info(
log.warn(
"Skip {} loaded splits because the scan starting position doesn't match "
+ "the current enumerator position: enumerator position = {}, scan starting position = {}",
result.getSplits().size(),
enumeratorPosition.get(),
result.getFromPosition());
return Collections.emptyList();
} else {
enumeratorPosition.set(result.getToPosition());
log.debug("Update enumerator position to {}", result.getToPosition());
return result.getSplits();
return;
}

addPendingSplits(result.getSplits());

enumeratorPosition.set(result.getToPosition());
if (!Objects.equals(enumeratorPosition.get(), result.getToPosition())) {
log.info("Update enumerator position to {}", result.getToPosition());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ public CloseableIterator<SeaTunnelRow> open(@NonNull IcebergFileScanTaskSplit sp
seekIterator.seek(split.getRecordOffset());

return CloseableIterator.transform(
seekIterator,
record -> {
SeaTunnelRow seaTunnelRow = deserializer.deserialize(record);
split.setRecordOffset(split.getRecordOffset() + 1);
return seaTunnelRow;
});
seekIterator, record -> deserializer.deserialize(record));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,18 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try (CloseableIterator<SeaTunnelRow> rowIterator =
icebergFileScanTaskSplitReader.open(currentReadSplit)) {
while (rowIterator.hasNext()) {
output.collect(rowIterator.next());
synchronized (output.getCheckpointLock()) {
currentReadSplit.setRecordOffset(currentReadSplit.getRecordOffset() + 1);
output.collect(rowIterator.next());
}
}
}
}

if (noMoreSplitsAssignment && Boundedness.BOUNDED.equals(context.getBoundedness())) {
context.signalNoMoreElement();
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
if (noMoreSplitsAssignment) {
context.signalNoMoreElement();
}
} else {
context.sendSplitRequest();
if (pendingSplits.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -53,6 +54,7 @@

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -133,6 +135,8 @@ public void testIcebergSource(TestContainer container)
}

private void initializeIcebergTable() {

FileUtil.fullyDelete(new File(CATALOG_DIR));
CATALOG = new IcebergCatalogFactory(CATALOG_NAME, CATALOG_TYPE, WAREHOUSE, null).create();
if (!CATALOG.tableExists(TABLE)) {
CATALOG.createTable(TABLE, SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -53,6 +54,7 @@

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -133,6 +135,7 @@ public void testIcebergSource(TestContainer container)
}

private void initializeIcebergTable() {
FileUtil.fullyDelete(new File(CATALOG_DIR));
CATALOG = new IcebergCatalogFactory(CATALOG_NAME, CATALOG_TYPE, WAREHOUSE, null).create();
if (!CATALOG.tableExists(TABLE)) {
CATALOG.createTable(TABLE, SCHEMA);
Expand Down

0 comments on commit 06071cd

Please sign in to comment.