Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Iceberg] Iceberg Source use multiple parallelism encountering lost data #5661

Closed
2 of 3 tasks
SamealD opened this issue Oct 19, 2023 · 2 comments · Fixed by #5732
Closed
2 of 3 tasks

[Bug] [Iceberg] Iceberg Source use multiple parallelism encountering lost data #5661

SamealD opened this issue Oct 19, 2023 · 2 comments · Fixed by #5732
Labels

Comments

@SamealD
Copy link

SamealD commented Oct 19, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Iceberg Source use multiple parallelism encountering lost data.
when parallelism=1 ,it won't be lost data. But when parallelism=2 or more ,it will lost data.

SeaTunnel Version

SeaTunnel 2.3.3

SeaTunnel Config

env {
  parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 50000
}

source {
  Iceberg {
    catalog_name = "hadoop_prod"
    catalog_type = "hadoop"
    warehouse="hdfs://***:8020/warehouse/hive/test-iceberg"
    namespace = "test01"
    table = "test_table01"
  }
}

sink {
  Console {

 }
}

Running Command

bin/seatunnel.sh --config jobconf/iceberg_to_local.conf

Error Exception

no Error Exception

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

this is my iceberg table data count:
Uploading image.png…

if I set parallelism = 1, The information I have obtained is as follows:


       Job Statistic Information

Start Time : 2023-10-19 14:41:10
End Time : 2023-10-19 14:41:16
Total Time(s) : 5
Total Read Count : 2000002
Total Write Count : 2000002
Total Failed Count : 0


if I set parallelism = 2, The information I have obtained is as follows:


       Job Statistic Information

Start Time : 2023-10-19 14:48:58
End Time : 2023-10-19 14:49:01
Total Time(s) : 3
Total Read Count : 1000001
Total Write Count : 1000001
Total Failed Count : 0


Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@SamealD SamealD added the bug label Oct 19, 2023
@SamealD
Copy link
Author

SamealD commented Oct 19, 2023

Screenshot failed to upload successfully, It shows original iceberg table data count is 2000002

@kangdw0x80
Copy link
Contributor

kangdw0x80 commented Oct 27, 2023

It is a bug.
Iceberg Connector assign files to multiple reader with path information (in addPendingSplits function)

int ownerReader = newSplit.splitId().hashCode() % numReaders;

splitId in Iceberg source use path information.

    public String splitId() {
        return task.file().path().toString();
    }

However, It will get negative value from hashCode function with too long path.

This values is id of reader.
So, the Connector can't assign iceberg file to any reader caused by negative value
Change the code

-   int ownerReader = newSplit.splitId().hashCode() % numReaders;  -> 
+  int ownerReader = ( newSplit.splitId().hashCode() & Integer.MAX_VALUE ) % numReaders;% numReaders;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants