diff --git a/docs/en/connector-v2/source/Iceberg.md b/docs/en/connector-v2/source/Iceberg.md index 72dacc076ec3..ff1195db860d 100644 --- a/docs/en/connector-v2/source/Iceberg.md +++ b/docs/en/connector-v2/source/Iceberg.md @@ -182,3 +182,4 @@ Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also ### next version - [Feature] Support Hadoop3.x ([3046](https://github.com/apache/incubator-seatunnel/pull/3046)) +- [Improve] Imporve iceberg source connector([2865](https://github.com/apache/incubator-seatunnel/pull/2865)) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java index e84abdc84063..7554dbbac3fa 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java @@ -24,7 +24,6 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import org.apache.iceberg.Table; import java.io.IOException; import java.util.ArrayList; @@ -58,52 +57,43 @@ public void open() { icebergTableLoader.open(); } - @Override - public void run() { - refreshPendingSplits(); - assignPendingSplits(context.registeredReaders()); - } - - @Override - public void close() throws IOException { - icebergTableLoader.close(); - } - @Override public void addSplitsBack(List splits, int subtaskId) { + log.debug("Add back splits {} to IcebergSourceEnumerator.", + splits); addPendingSplits(splits); if (context.registeredReaders().contains(subtaskId)) { assignPendingSplits(Collections.singleton(subtaskId)); } } - @Override - public int currentUnassignedSplitSize() { - return pendingSplits.size(); - } - @Override public void registerReader(int subtaskId) { - log.debug("Adding reader {} to IcebergSourceEnumerator.", + log.debug("Register reader {} to IcebergSourceEnumerator.", subtaskId); assignPendingSplits(Collections.singleton(subtaskId)); } + @Override + public int currentUnassignedSplitSize() { + return pendingSplits.size(); + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { } - protected void refreshPendingSplits() { - List newSplits = loadNewSplits(icebergTableLoader.loadTable()); - addPendingSplits(newSplits); + @Override + public void close() throws IOException { + icebergTableLoader.close(); } - protected abstract List loadNewSplits(Table table); + protected void addPendingSplits(Collection newSplits) { + log.debug("Add splits {} to pendingSplits", newSplits.size()); - private void addPendingSplits(Collection newSplits) { int numReaders = context.currentParallelism(); for (IcebergFileScanTaskSplit newSplit : newSplits) { - int ownerReader = newSplit.splitId().hashCode() % numReaders; + int ownerReader = (newSplit.splitId().hashCode() & Integer.MAX_VALUE) % numReaders; pendingSplits .computeIfAbsent(ownerReader, r -> new ArrayList<>()) .add(newSplit); @@ -112,6 +102,8 @@ private void addPendingSplits(Collection newSplits) { } protected void assignPendingSplits(Set pendingReaders) { + log.debug("Assign pendingSplits to readers {}", pendingReaders); + for (int pendingReader : pendingReaders) { List pendingAssignmentForReader = pendingSplits.remove(pendingReader); if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) { diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java index 111964f63cdb..c351f5619ea6 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergBatchSplitEnumerator.java @@ -35,6 +35,8 @@ public class IcebergBatchSplitEnumerator extends AbstractSplitEnumerator { private final IcebergScanContext icebergScanContext; + private final boolean shouldEnumerate; + private volatile boolean splitLoadReady; public IcebergBatchSplitEnumerator(@NonNull SourceSplitEnumerator.Context context, @NonNull IcebergScanContext icebergScanContext, @@ -43,29 +45,39 @@ public IcebergBatchSplitEnumerator(@NonNull SourceSplitEnumerator.Context readers = context.registeredReaders(); - log.debug("No more splits to assign." + - " Sending NoMoreSplitsEvent to reader {}.", readers); + if (shouldEnumerate) { + loadAllSplitsToPendingSplits(icebergTableLoader.loadTable()); + assignPendingSplits(readers); + } + + log.debug("No more splits to assign. Sending NoMoreSplitsEvent to readers {}.", readers); readers.forEach(context::signalNoMoreSplits); } @Override public IcebergSplitEnumeratorState snapshotState(long checkpointId) { - return new IcebergSplitEnumeratorState(null, pendingSplits); + if (splitLoadReady) { + return new IcebergSplitEnumeratorState(null, pendingSplits); + } + return null; } @Override public void handleSplitRequest(int subtaskId) { + throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId); } - @Override - protected List loadNewSplits(Table table) { - return IcebergScanSplitPlanner.planSplits(table, icebergScanContext); + private void loadAllSplitsToPendingSplits(Table table) { + List newSplits = IcebergScanSplitPlanner.planSplits( + table, icebergScanContext); + addPendingSplits(newSplits); + splitLoadReady = true; } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java index 5b610323bfd6..eff808318f3e 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java @@ -28,7 +28,6 @@ import org.apache.iceberg.Table; import java.util.Collections; -import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -51,35 +50,41 @@ public IcebergStreamSplitEnumerator(@NonNull SourceSplitEnumerator.Context loadNewSplits(Table table) { + private void loadNewSplitsToPendingSplits(Table table) { IcebergEnumerationResult result = IcebergScanSplitPlanner.planStreamSplits( table, icebergScanContext, enumeratorPosition.get()); if (!Objects.equals(result.getFromPosition(), enumeratorPosition.get())) { - log.info("Skip {} loaded splits because the scan starting position doesn't match " + + log.warn("Skip {} loaded splits because the scan starting position doesn't match " + "the current enumerator position: enumerator position = {}, scan starting position = {}", result.getSplits().size(), enumeratorPosition.get(), result.getFromPosition()); - return Collections.emptyList(); - } else { - enumeratorPosition.set(result.getToPosition()); - log.debug("Update enumerator position to {}", result.getToPosition()); - return result.getSplits(); + return; + } + + addPendingSplits(result.getSplits()); + + enumeratorPosition.set(result.getToPosition()); + if (!Objects.equals(enumeratorPosition.get(), result.getToPosition())) { + log.info("Update enumerator position to {}", result.getToPosition()); } } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java index c170dbd90ef6..63f779def9e3 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java @@ -41,11 +41,7 @@ public CloseableIterator open(@NonNull IcebergFileScanTaskSplit sp OffsetSeekIterator seekIterator = new OffsetSeekIterator(iterator); seekIterator.seek(split.getRecordOffset()); - return CloseableIterator.transform(seekIterator, record -> { - SeaTunnelRow seaTunnelRow = deserializer.deserialize(record); - split.setRecordOffset(split.getRecordOffset() + 1); - return seaTunnelRow; - }); + return CloseableIterator.transform(seekIterator, record -> deserializer.deserialize(record)); } @Override diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java index f432c5e2235e..bc5d5124f8e2 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java @@ -100,13 +100,18 @@ public void pollNext(Collector output) throws Exception { currentReadSplit = pendingSplit; try (CloseableIterator rowIterator = icebergFileScanTaskSplitReader.open(currentReadSplit)) { while (rowIterator.hasNext()) { - output.collect(rowIterator.next()); + synchronized (output.getCheckpointLock()) { + currentReadSplit.setRecordOffset(currentReadSplit.getRecordOffset() + 1); + output.collect(rowIterator.next()); + } } } } - if (noMoreSplitsAssignment && Boundedness.BOUNDED.equals(context.getBoundedness())) { - context.signalNoMoreElement(); + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + if (noMoreSplitsAssignment) { + context.signalNoMoreElement(); + } } else { context.sendSplitRequest(); if (pendingSplits.isEmpty()) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java index 1aa116e53415..786682e46f80 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -53,6 +54,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.utility.MountableFile; +import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -124,6 +126,7 @@ public void testIcebergSource(TestContainer container) throws IOException, Inter } private void initializeIcebergTable() { + FileUtil.fullyDelete(new File(CATALOG_DIR)); CATALOG = new IcebergCatalogFactory(CATALOG_NAME, CATALOG_TYPE, WAREHOUSE, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java index c4ee74c42543..a29161153d8b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -53,6 +54,7 @@ import org.testcontainers.containers.Container; import org.testcontainers.utility.MountableFile; +import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -124,6 +126,7 @@ public void testIcebergSource(TestContainer container) throws IOException, Inter } private void initializeIcebergTable() { + FileUtil.fullyDelete(new File(CATALOG_DIR)); CATALOG = new IcebergCatalogFactory(CATALOG_NAME, CATALOG_TYPE, WAREHOUSE,