Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink iceberg may occur duplication when succeed to write datafile and commit but checkpoint fail #10765

Closed
3 tasks
maekchi opened this issue Jul 23, 2024 · 9 comments
Labels
bug Something isn't working stale

Comments

@maekchi
Copy link

maekchi commented Jul 23, 2024

Apache Iceberg version

1.4.3

Query engine

Flink

Please describe the bug 🐞

It seems like very rare duplicates occur in flink iceberg.

Let me explain the situation I experienced:

  • log with timeline
04:57:33 Triggering checkpoint 19516 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1721764651219 for job ba65ea243c487f4f0fd52c158e4ed985.
04:57:33 Completed checkpoint 19516 for job ba65ea243c487f4f0fd52c158e4ed985 (6182 bytes, checkpointDuration=389 ms, finalizationTime=60 ms).
04:58:36 Triggering checkpoint 19517 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1721764711219 for job ba65ea243c487f4f0fd52c158e4ed985.
04:58:42 Triggering Checkpoint 19517 for job ba65ea243c487f4f0fd52c158e4ed985 failed due to java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://[email protected]:6122/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
04:58:42 Failed to trigger or complete checkpoint 19517 for job ba65ea243c487f4f0fd52c158e4ed985. (0 consecutive failed attempts so far)
04:59:00 Job ~~~ (ba65ea243c487f4f0fd52c158e4ed985) switched from state RESTARTING to RUNNING.
04:59:00 Clearing resource requirements of job ba65ea243c487f4f0fd52c158e4ed985
04:59:00 Restoring job ba65ea243c487f4f0fd52c158e4ed985 from Checkpoint 19516 @ 1721764651219 for ba65ea243c487f4f0fd52c158e4ed985 located at [hdfs-path]/ff5df181-7682-4153-bafc-8e489c506d92/checkpoints/ba65ea243c487f4f0fd52c158e4ed985/chk-19516.
04:59:45 Failed to trigger checkpoint for job ba65ea243c487f4f0fd52c158e4ed985 since Checkpoint triggering task freezer-IcebergFilesCommitter -> Sink: IcebergSink hive.[tablename] (1/1) of job ba65ea243c487f4f0fd52c158e4ed985 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running..
05:00:46 Triggering checkpoint 19518 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1721764840957 for job ba65ea243c487f4f0fd52c158e4ed985.
  • taskmanager pod timeout at 04:59:19

result

  • metadata.json
{
    "sequence-number" : 201719,
    "snapshot-id" : 8203882888081487848,
    "parent-snapshot-id" : 7556868946872881546,
    "timestamp-ms" : 1721764676985,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966880804",
      "total-files-size" : "241007398466",
      "total-data-files" : "774",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-8203882888081487848-1-354fd0bb-38d9-4706-8483-8a4276888dc3.avro",
    "schema-id" : 2
  }, {
    "sequence-number" : 201720,
    "snapshot-id" : 3289453546560274810,
    "parent-snapshot-id" : 8203882888081487848,
    "timestamp-ms" : 1721764798149,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966898358",
      "total-files-size" : "241008063306",
      "total-data-files" : "775",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-3289453546560274810-2-e0983626-a2a5-49f2-988b-dc432f100451.avro",
    "schema-id" : 2
  },
  • snap-8203882888081487848-1-354fd0bb-38d9-4706-8483-8a4276888dc3.avro
image
  • snap-3289453546560274810-2-e0983626-a2a5-49f2-988b-dc432f100451.avro"
image
  • 354fd0bb-38d9-4706-8483-8a4276888dc3-m0.avro
    • snapshot-id : 8203882888081487848
    • data_file path : hdfs://~~~~/data/xxx/00000-0-01327dd4-6162-483d-b2e0-bb0694402807-00513.parquet
  • e0983626-a2a5-49f2-988b-dc432f100451-m0.avro
    • snapshot-id : 3289453546560274810
    • data_file path : hdfs://~~~~/data/xxx/00000-0-01327dd4-6162-483d-b2e0-bb0694402807-00513.parquet

Looking at the situation, the restore was done with the checkpoint ID that was completed up to the checkpoint, and the commit was performed again up to the completed checkpoint.
As a result, the commit for checkpoint id 19516 was performed twice, pointing to the same data file.

When I read this in trino, the same data will be read twice and will appear duplicated.

I tried to delete it to resolve duplicate data, but the following error occurred in trino.
Found more deleted rows than exist in the file

rewrite_file was also performed

CALL spark_catalog.system.rewrite_data_files(table => 'table', where => 'partition="xxxxxx"');
24/07/24 06:38:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxT
oStringFields'.
24/07/24 06:44:02 WARN ManifestFilterManager: Deleting a duplicate path from manifest hdfs://~~~~~/metadata/f0b56e5f-9b32-48d2-ba77-ddb93081c881-m1.avro: hdfs://~~~~/data/parttiion/00
000-0-01327dd4-6162-483d-b2e0-bb0694402807-00513.parquet
398     1       373764540       0
Time taken: 313.164 seconds, Fetched 1 row(s)

but contrary to the message, data duplication was not actually resolved.
and expire snapstot is not worked, too.

Finally, I will try to modify manifest directly.

(Addition) After rewrite file, I found that data can be deleted without error.
So i create backup table, and backup other table without duplicated data
Then. delete duplicated data in original table and copy from backup table

Is there a better solution in this situation?
and please check this situation

Thanks.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@maekchi maekchi added the bug Something isn't working label Jul 23, 2024
@pvary
Copy link
Contributor

pvary commented Jul 24, 2024

Is this similar to #10526?

@zhongqishang
Copy link
Contributor

zhongqishang commented Jul 24, 2024

As a result, the commit for checkpoint id 19516 was performed twice, pointing to the same data file.

@maekchi I think submitting files repeatedly does not generate duplicate data. Can you check the snapshot information of checkpoint id = 19518. If possible, you can use Time travel to confirm which specific submission caused the duplicate data.

@pvary
Copy link
Contributor

pvary commented Jul 24, 2024

I have taken another look at the info you have provided.
This is definitely different than #10526 as this is append only change.

Do you have any more detailed log, or any way to repro the case?

As you highlighted there are 2 commits with the same data:

      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",

This should be prevented by the maxCommittedCheckpointId check. See:
https://github.com/apache/iceberg/blob/apache-iceberg-1.4.3/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L189

@maekchi
Copy link
Author

maekchi commented Jul 25, 2024

@pvary
I also think this is different than #10526.
I am only appending data.
and perform with exactly once mode in Flink.

Because there were other Flink apps, there were too many logs, so I am attaching additional logs by filtering them by table name among all logs.

Jul 24, 2024 @ 04:57:32.562	Committing append for checkpoint 19516 to table hive.table branch main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=17554, dataFilesByteCount=664840, deleteFilesCount=0, deleteFilesRecordCount=0, deleteFilesByteCount=0}	90	task_manager
Jul 24, 2024 @ 04:57:32.562	Start to flush snapshot state to state backend, table: hive.table, checkpointId: 19516	90	task_manager
Jul 24, 2024 @ 04:59:00.153	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (a035c92a2f05258b27a3986798fcde66) switched from CANCELING to CANCELED.	25,369	job_manager
Jul 24, 2024 @ 04:59:00.153	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from CREATED to SCHEDULED.	25,371	job_manager
Jul 24, 2024 @ 04:59:00.153	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (a035c92a2f05258b27a3986798fcde66) switched from RUNNING to CANCELING.	25,369	job_manager
Jul 24, 2024 @ 04:59:19.156	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from SCHEDULED to DEPLOYING.	25,360	job_manager
Jul 24, 2024 @ 04:59:19.156	Deploying freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (attempt #1) with attempt id eb7fcaa1da87cd760c8e489b36528ac7 and vertex id 9135501d46e54bf84710f477c1eb5f38_0 to 172.24.71.24:6122-16ff02 @ 172.24.71.24 (dataPort=45067) with allocation id fa75b71da4f9b96484bc5435b5f55b00	25,360	job_manager
Jul 24, 2024 @ 04:59:20.156	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from DEPLOYING to INITIALIZING.	25,366	job_manager
Jul 24, 2024 @ 04:59:20.780	Loading JAR files for task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) [DEPLOYING].	3,208	task_manager
Jul 24, 2024 @ 04:59:20.780	Received task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7), deploy into slot with allocation id fa75b71da4f9b96484bc5435b5f55b00.	3,196	task_manager
Jul 24, 2024 @ 04:59:20.780	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) switched from CREATED to DEPLOYING.	3,208	task_manager
Jul 24, 2024 @ 04:59:20.781	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) switched from DEPLOYING to INITIALIZING.	3,208	task_manager
Jul 24, 2024 @ 04:59:22.783	Table loaded by catalog: hive.table	3,208	task_manager
Jul 24, 2024 @ 04:59:22.783	Committing append for checkpoint 19516 to table hive.table branch main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=17554, dataFilesByteCount=664840, deleteFilesCount=0, deleteFilesRecordCount=0, deleteFilesByteCount=0}	3,208	task_manager
Jul 24, 2024 @ 04:59:45.158	Failed to trigger checkpoint for job ba65ea243c487f4f0fd52c158e4ed985 since Checkpoint triggering task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) of job ba65ea243c487f4f0fd52c158e4ed985 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running..	173	job_manager
Jul 24, 2024 @ 04:59:59.159	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from INITIALIZING to RUNNING.	25,397	job_manager
Jul 24, 2024 @ 05:00:00.788	freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) switched from INITIALIZING to RUNNING.	3,208	task_manager
Jul 24, 2024 @ 05:00:00.788	Successfully committed to table hive.table in 130 ms	3,208	task_manager
Jul 24, 2024 @ 05:00:00.788	Committed to table hive.table with the new metadata location hdfs://~~~~~/metadata/202296-54e38fd0-c3c5-4431-b072-962402eedb7b.metadata.json	3,208	task_manager
Jul 24, 2024 @ 05:00:00.788	Committed append to table: hive.table, branch: main, checkpointId 19516 in 36033 ms	3,208	task_manager
Jul 24, 2024 @ 05:00:00.788	Received metrics report: CommitReport{tableName=hive.table, snapshotId=3289453546560274810, sequenceNumber=201720, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT36.012583152S, count=1}, attempts=CounterResult{unit=COUNT, value=2}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=775}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=2}, addedRecords=CounterResult{unit=COUNT, value=17554}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=3966898358}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=664840}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=241008063306}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=18608}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=1.15.2, engine-name=flink, iceberg-version=Apache Iceberg 1.4.3 (commit 9a5d24fee239352021a9a73f6a4cad8ecf464f01)}}	3,208	task_manager
Jul 24, 2024 @ 05:00:09.567	Committed to table hive.table with the new metadata location hdfs://~~~~~/metadata/202295-d906b795-214d-46a6-b2dd-5353f134a9a6.metadata.json	90	task_manager
Jul 24, 2024 @ 05:00:45.789	Committed append to table: hive.table, branch: main, checkpointId 19518 in 1112 ms	3,208	task_manager
Jul 24, 2024 @ 05:00:45.789	Table loaded by catalog: hive.table	3,208	task_manager
Jul 24, 2024 @ 05:00:45.789	Start to flush snapshot state to state backend, table: hive.table, checkpointId: 19518	3,208	task_manager

Checking the log, it seems that there is a "commiting" log at 04:57, but there is no "commited" log for this.

@pvary
Copy link
Contributor

pvary commented Jul 26, 2024

Which version of Flink do you using btw?

I see this in the logs Committing append for checkpoint 19516 - this is started, but can't see the corresponding Committed {} to table: {}, branch: {}, checkpointId {} in {} ms

This means that Flink doesn't know if 19516 (snapshot 8203882888081487848) was successful or not. Let's assume it was successful behind the scenes.

So when it recovers it will find the metadata in the state for checkpoint 19516.

So we have to check the recovery codepath to understand what is happening. https://github.com/apache/iceberg/blob/apache-iceberg-1.4.3/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L144

maxCommittedCheckpointId (https://github.com/apache/iceberg/blob/apache-iceberg-1.4.3/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L493) should return 19516 if the previous commit was successful, and the recovery code should prevent it to be committed again.

What Catalog are you using? Is there any cache, or something which might return wrong data for the table?

Thanks,
Peter

@maekchi
Copy link
Author

maekchi commented Jul 29, 2024

@pvary

Which version of Flink do you using btw?

I use flink version 1.15.4

What Catalog are you using? Is there any cache, or something which might return wrong data for the table?

We use hive catalog, and set up 'engine.hive.lock-enabled=true'
and no cache and no wrong data. (we are filtering every wrong data)


By the way, there seems to be one more strange thing in the log.

After processing the checkpointId for 19516, there should be a log somewhere for processing the 19517 checkpoint ID, but there is no log at all.

Because our system performs checkpoints every minute, the checkpoint for 19517 should be performed around 04:58:32 after the 19516 checkpoint performed at 04:57:32.

However, there is no related log. This is because the taskmanager was shut down around that time.

However, an attempt was made for 19516 at 04:59:22, and if you look at the log at 05:00:00, the 19516 snapshot was successfully committed. But there is no log for 19517 anywhere.

There is no record for 19517 even when looking at metadata.json! Is it normal situation during recovery?

{
    "sequence-number" : 201719,
    "snapshot-id" : 8203882888081487848,
    "parent-snapshot-id" : 7556868946872881546,
    "timestamp-ms" : 1721764676985,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966880804",
      "total-files-size" : "241007398466",
      "total-data-files" : "774",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-8203882888081487848-1-354fd0bb-38d9-4706-8483-8a4276888dc3.avro",
    "schema-id" : 2
  }, {
    "sequence-number" : 201720,
    "snapshot-id" : 3289453546560274810,
    "parent-snapshot-id" : 8203882888081487848,
    "timestamp-ms" : 1721764798149,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966898358",
      "total-files-size" : "241008063306",
      "total-data-files" : "775",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-3289453546560274810-2-e0983626-a2a5-49f2-988b-dc432f100451.avro",
    "schema-id" : 2
  }, {
    "sequence-number" : 201721,
    "snapshot-id" : 3232659717465048464,
    "parent-snapshot-id" : 3289453546560274810,
    "timestamp-ms" : 1721764843143,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19518",
      "added-data-files" : "1",
      "added-records" : "56759",
      "added-files-size" : "2237712",
      "changed-partition-count" : "1",
      "total-records" : "3966955117",
      "total-files-size" : "241010301018",
      "total-data-files" : "776",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-3232659717465048464-1-8c5a3ab7-9303-45e5-910c-41d47be08142.avro",
    "schema-id" : 2
  },

Thanks!

@pvary
Copy link
Contributor

pvary commented Jul 29, 2024

I think this is the log for the 19517:

Jul 24, 2024 @ 04:59:45.158	Failed to trigger checkpoint for job ba65ea243c487f4f0fd52c158e4ed985 since Checkpoint triggering task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) of job ba65ea243c487f4f0fd52c158e4ed985 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running..

The job was recorvering, and not all of the operators were running, so checkpoint 19517 failed.

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Jan 26, 2025
Copy link

github-actions bot commented Feb 9, 2025

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Feb 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working stale
Projects
None yet
Development

No branches or pull requests

3 participants