From 7f3b4be075e8e00f2f94ed7f5b69371befeda374 Mon Sep 17 00:00:00 2001 From: Dennis Date: Sun, 29 Oct 2023 00:20:39 +0900 Subject: [PATCH] [BUG][Connector-V2] Iceberg source lost data with parallelism option (#5732) --- .../iceberg/source/enumerator/AbstractSplitEnumerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java index 199c56eb159..26a971cc2e1 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java @@ -108,7 +108,7 @@ protected void refreshPendingSplits() { private void addPendingSplits(Collection newSplits) { int numReaders = context.currentParallelism(); for (IcebergFileScanTaskSplit newSplit : newSplits) { - int ownerReader = newSplit.splitId().hashCode() % numReaders; + int ownerReader = (newSplit.splitId().hashCode() & Integer.MAX_VALUE) % numReaders; pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(newSplit); log.info("Assigning {} to {} reader.", newSplit, ownerReader); }