Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent insertions for Iceberg tables #21250

Merged
merged 1 commit into from
Feb 14, 2024

Conversation

pratyakshsharma
Copy link
Contributor

@pratyakshsharma pratyakshsharma commented Oct 26, 2023

Description

Number of successful concurrent inserts/updates to an iceberg table is controlled by the table property commit.retry.num-retries which is by default set to 4 in TableProperties.java class in Iceberg. This PR aims to make this configurable.

Motivation and Context

#21251

Impact

Introduction of new user-facing config for iceberg connector: iceberg.commit-retries

Test Plan

Manual and unit tests

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

Iceberg Changes
* Add the support to set commit.retry.num-retries table property with table creation to make the number of attempts to make in case of concurrent upserts configurable.

@pratyakshsharma pratyakshsharma requested a review from a team as a code owner October 26, 2023 16:22
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Oct 26, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: pratyakshsharma / name: Pratyaksh Sharma (cb449d2)

@agrawalreetika
Copy link
Member

@pratyakshsharma Could you please squash the commits and update Iceberg documentation as well for this?

propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is commit retry property for Iceberg, then I think this should be applicable even for Iceberg tables that are created using the Hadoop or Nessie catalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it should be.

Copy link
Member

@agrawalreetika agrawalreetika Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's introduce this property in https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java#L157 as well.
Since the Table properties for both Metadata classes are the same, we can extract the common part for setting table properties and use it in both

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will perform simple tests with nessie to verify this works, and then will introduce it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nessie throws below exception when concurrent queries exceed the table config value -

org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: Reference hash is out of date. Update the reference 'main' and try again
	at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:167)
	at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
	at org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:422)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:418)
	at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:302)
	at com.facebook.presto.iceberg.IcebergAbstractMetadata.finishInsert(IcebergAbstractMetadata.java:278)
	at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:452)
	at com.facebook.presto.metadata.MetadataManager.finishInsert(MetadataManager.java:858)
	at com.facebook.presto.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$3(LocalExecutionPlanner.java:3392)
	at com.facebook.presto.operator.TableFinishOperator.getOutput(TableFinishOperator.java:289)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:428)
	at com.facebook.presto.operator.Driver.lambda$processFor$9(Driver.java:311)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:732)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:304)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1079)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:621)
	at com.facebook.presto.$gen.Presto_null__testversion____20231108_094807_1.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.projectnessie.error.NessieReferenceConflictException: Values of existing and expected content for key 'iceberg_db.test_nessie_iceberg' are different.
	at org.projectnessie.error.ErrorCode.lambda$asException$1(ErrorCode.java:66)
	at java.util.Optional.map(Optional.java:215)
	at org.projectnessie.error.ErrorCode.asException(ErrorCode.java:66)
	at org.projectnessie.client.rest.ResponseCheckFilter.checkResponse(ResponseCheckFilter.java:67)
	at org.projectnessie.client.rest.NessieHttpResponseFilter.filter(NessieHttpResponseFilter.java:29)
	at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.lambda$executeRequest$2(UrlConnectionRequest.java:86)
	at java.util.Collections$SingletonList.forEach(Collections.java:4824)
	at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.executeRequest(UrlConnectionRequest.java:86)
	at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.post(UrlConnectionRequest.java:120)
	at org.projectnessie.client.http.HttpTreeClient.commitMultipleOperations(HttpTreeClient.java:202)
	at sun.reflect.GeneratedMethodAccessor1216.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.projectnessie.client.http.NessieHttpClient$ExceptionRewriter.invoke(NessieHttpClient.java:79)
	at com.sun.proxy.$Proxy522.commitMultipleOperations(Unknown Source)
	at org.projectnessie.client.http.v1api.HttpCommitMultipleOperations.commit(HttpCommitMultipleOperations.java:35)
	at org.apache.iceberg.nessie.NessieIcebergClient.commitTable(NessieIcebergClient.java:479)
	at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:160)
	... 24 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nessie throws below exception when concurrent queries exceed the table config value

Does it work if concurrent queries not exceeding the table config value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, have added a test case for the same.

@github-actions
Copy link

github-actions bot commented Oct 30, 2023

Codenotify: Notifying subscribers in CODENOTIFY files for diff 1a4e945...cb449d2.

Notify File(s)
@steveburnett presto-docs/src/main/sphinx/connector/iceberg.rst

@pratyakshsharma
Copy link
Contributor Author

pratyakshsharma commented Oct 30, 2023

the worker seems to crash as part of the failures in linux-presto-e2e-tests. This happened second time consecutively in this PR.
cc @tdcmeehan @wanglinsong

@tdcmeehan
Copy link
Contributor

the worker seems to crash as part of the failures in linux-presto-e2e-tests. This happened second time consecutively in this PR. cc @tdcmeehan @wanglinsong

Can you please ask about this in the #presto-cpp channel in Slack?

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

@pratyakshsharma pratyakshsharma force-pushed the iceberg-concurrency branch 2 times, most recently from 8461a75 to 05615f6 Compare November 9, 2023 07:40
@pratyakshsharma pratyakshsharma changed the title Fix concurrent insertions for hive and glue metastores Fix concurrent insertions for Iceberg tables Nov 9, 2023
@agrawalreetika
Copy link
Member

@pratyakshsharma Thanks for verifying it for Nessie as well.
Few things,

  1. Let's modify the Tests in https://github.com/prestodb/presto/blob/master/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java#L59 with concurrency > 5 (Since 5 is the success case) and add for both the failure and success scenarios there. And if possible we can add similar tests for Nessie as well
  2. I think we can modify the commit message to something like "Introduce iceberg table property commit_retries to handle concurrent insertions"?

@pratyakshsharma
Copy link
Contributor Author

@agrawalreetika yes already working on fixing test cases. Sure will update the commit message.

@pratyakshsharma
Copy link
Contributor Author

I have commented the test case covering failure scenario for concurrent insertions, since it is failing and I am working on fixing it. I have added the expectedExceptions tag, but the test is failing despite that exception getting thrown.

@pratyakshsharma
Copy link
Contributor Author

@yingsu00 As per our internal discussions, I checked JMeter codebase where they spawn new threads for load testing. They use ThreadPoolExecutor for managing the test plan, which in turn starts the new threads as per the configurations provided. ThreadPoolExecutor is just an implementation of ExecutorService. I tried following the same approach for my test case, but surprisingly it did not help.
We can get on a quick call to discuss more.

final CountDownLatch countDownLatch = new CountDownLatch(commitRetries);
AtomicInteger value = new AtomicInteger(0);
Set<Throwable> errors = new CopyOnWriteArraySet<>();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many tests that try to accomplish similar tasks through multi-threading. Ultimately, these style of tests worry me because they end up being highly environment-specific--suspectible to GC pauses, noisy neighbors and such. Can we think of a different way to accomplish this that isn't time depenendent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am currently working on this. Lets see if I am able to come up with something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend just removing the test, since this is a table property and a functionality of the core Iceberg library. Adding a potentially flaky test to our suites is just not worth the potential problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan So with the current approach of submitting 100 queries, the test case is able to pass consistently. I am now only trying to fine tune the number of threads and other parameters, so as to reduce the test run time and not introduce flakiness overall.
Let me know if you still think otherwise. We can probably discuss internally on this.
cc @agrawalreetika @imjalpreet @yingsu00 @nmahadevuni

@pratyakshsharma pratyakshsharma force-pushed the iceberg-concurrency branch 4 times, most recently from 4997d80 to d0b8e63 Compare January 26, 2024 08:16
@pratyakshsharma
Copy link
Contributor Author

@yingsu00 @agrawalreetika This is good to review now.

@@ -45,7 +45,7 @@ jobs:
- ":presto-tests -P ci-only-distributed-queries"
- ":presto-tests -P ci-only-aggregation-queries"
- ":presto-tests -P ci-only-plan-determinism"
- ":presto-tests -P ci-only-resource-manager"
- ":presto-tests -P ci-only-resource-manager"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change is not related?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pratyakshsharma Can you please remove this change? It seems unrelated.

@steveburnett
Copy link
Contributor

Consider changing the release note entry in the Description. Following the release note guidelines, would something like this be a better description of this PR?

== RELEASE NOTES ==

Iceberg Connector Changes
* Improve the commit.retry.num-retries table property to make configurable the number of attempts to make in case of concurrent upserts.

Copy link
Contributor

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan The test was removed. Do you want to review this PR again? Thanks!

@@ -45,7 +45,7 @@ jobs:
- ":presto-tests -P ci-only-distributed-queries"
- ":presto-tests -P ci-only-aggregation-queries"
- ":presto-tests -P ci-only-plan-determinism"
- ":presto-tests -P ci-only-resource-manager"
- ":presto-tests -P ci-only-resource-manager"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pratyakshsharma Can you please remove this change? It seems unrelated.

@pratyakshsharma
Copy link
Contributor Author

@steveburnett

So we are not improving the support, rather adding it from scratch in presto-iceberg connector. Please let me know in case of any more queries.

@pratyakshsharma
Copy link
Contributor Author

@yingsu00 This is ready for another pass

@steveburnett
Copy link
Contributor

@steveburnett

So we are not improving the support, rather adding it from scratch in presto-iceberg connector. Please let me know in case of any more queries.

Thanks for the explanation! The release note guidelines Order of Changes section suggests "Additions: line starts with Add...". Maybe this suggestion works better?

== RELEASE NOTES ==

Iceberg Connector Changes
* Add the commit.retry.num-retries table property to make configurable the number of attempts to make in case of concurrent upserts.

@tdcmeehan
Copy link
Contributor

Agreed with @steveburnett, this is a major change that users of Iceberg connector will be interested in and may want to use. We should add a release note for it.

@@ -73,6 +76,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
"Format version for the table",
null,
false))
.add(integerProperty(
CONCURRENCY_RETRIES,
"Determines the number of attempts in case of concurrent upserts",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent updates

Copy link
Contributor Author

@pratyakshsharma pratyakshsharma Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is valid in case of inserts as well. Ultimately this boils down to some writer finding the metadata file updated, as compared to what it saw in the beginning, when it tries to commit. So upserts seems more suitable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By update, I am trying to convey a more general meaning that upset, which may include deletes. Feel free to use any language that encapsulates this, but I feel upsert does not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can probably make it upserts and deletes then.

@pratyakshsharma
Copy link
Contributor Author

Updated release notes @steveburnett @tdcmeehan

@steveburnett
Copy link
Contributor

Updated release notes @steveburnett @tdcmeehan

@pratyakshsharma, it looks great! Thank you.

@pratyakshsharma
Copy link
Contributor Author

The singlestore test failure is unrelated. Raised an issue here - #21892

@pratyakshsharma
Copy link
Contributor Author

This is good for final pass @yingsu00

@pratyakshsharma pratyakshsharma force-pushed the iceberg-concurrency branch 2 times, most recently from 0493689 to 35fad55 Compare February 13, 2024 12:12
@yingsu00
Copy link
Contributor

@tdcmeehan Do you have more comments?

Introduce new iceberg table property `commit_retries` to handle
concurrent insertions across catalogs.
@pratyakshsharma
Copy link
Contributor Author

@tdcmeehan please take another pass

@pratyakshsharma
Copy link
Contributor Author

@yingsu00 this is good to merge.

@yingsu00 yingsu00 merged commit e579a3f into prestodb:master Feb 14, 2024
57 checks passed
@wanglinsong wanglinsong mentioned this pull request May 1, 2024
48 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants