-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrent insertions for Iceberg tables #21250
Conversation
|
e6456d6
to
42e97e8
Compare
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
Outdated
Show resolved
Hide resolved
@pratyakshsharma Could you please squash the commits and update Iceberg documentation as well for this? |
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); | ||
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is commit retry property for Iceberg, then I think this should be applicable even for Iceberg tables that are created using the Hadoop
or Nessie
catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then let's introduce this property in https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java#L157 as well.
Since the Table properties for both Metadata classes are the same, we can extract the common part for setting table properties and use it in both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I will perform simple tests with nessie to verify this works, and then will introduce it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nessie throws below exception when concurrent queries exceed the table config value -
org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: Reference hash is out of date. Update the reference 'main' and try again
at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:167)
at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
at org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:422)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:418)
at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:302)
at com.facebook.presto.iceberg.IcebergAbstractMetadata.finishInsert(IcebergAbstractMetadata.java:278)
at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:452)
at com.facebook.presto.metadata.MetadataManager.finishInsert(MetadataManager.java:858)
at com.facebook.presto.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$3(LocalExecutionPlanner.java:3392)
at com.facebook.presto.operator.TableFinishOperator.getOutput(TableFinishOperator.java:289)
at com.facebook.presto.operator.Driver.processInternal(Driver.java:428)
at com.facebook.presto.operator.Driver.lambda$processFor$9(Driver.java:311)
at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:732)
at com.facebook.presto.operator.Driver.processFor(Driver.java:304)
at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1079)
at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)
at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:621)
at com.facebook.presto.$gen.Presto_null__testversion____20231108_094807_1.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.projectnessie.error.NessieReferenceConflictException: Values of existing and expected content for key 'iceberg_db.test_nessie_iceberg' are different.
at org.projectnessie.error.ErrorCode.lambda$asException$1(ErrorCode.java:66)
at java.util.Optional.map(Optional.java:215)
at org.projectnessie.error.ErrorCode.asException(ErrorCode.java:66)
at org.projectnessie.client.rest.ResponseCheckFilter.checkResponse(ResponseCheckFilter.java:67)
at org.projectnessie.client.rest.NessieHttpResponseFilter.filter(NessieHttpResponseFilter.java:29)
at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.lambda$executeRequest$2(UrlConnectionRequest.java:86)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.executeRequest(UrlConnectionRequest.java:86)
at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.post(UrlConnectionRequest.java:120)
at org.projectnessie.client.http.HttpTreeClient.commitMultipleOperations(HttpTreeClient.java:202)
at sun.reflect.GeneratedMethodAccessor1216.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.projectnessie.client.http.NessieHttpClient$ExceptionRewriter.invoke(NessieHttpClient.java:79)
at com.sun.proxy.$Proxy522.commitMultipleOperations(Unknown Source)
at org.projectnessie.client.http.v1api.HttpCommitMultipleOperations.commit(HttpCommitMultipleOperations.java:35)
at org.apache.iceberg.nessie.NessieIcebergClient.commitTable(NessieIcebergClient.java:479)
at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:160)
... 24 more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nessie throws below exception when concurrent queries exceed the table config value
Does it work if concurrent queries not exceeding the table config value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, have added a test case for the same.
d2bacf8
to
8d53d7d
Compare
Codenotify: Notifying subscribers in CODENOTIFY files for diff 1a4e945...cb449d2.
|
the worker seems to crash as part of the failures in |
Can you please ask about this in the #presto-cpp channel in Slack? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
8461a75
to
05615f6
Compare
@pratyakshsharma Thanks for verifying it for Nessie as well.
|
@agrawalreetika yes already working on fixing test cases. Sure will update the commit message. |
05615f6
to
7ad3425
Compare
I have commented the test case covering failure scenario for concurrent insertions, since it is failing and I am working on fixing it. I have added the expectedExceptions tag, but the test is failing despite that exception getting thrown. |
@yingsu00 As per our internal discussions, I checked JMeter codebase where they spawn new threads for load testing. They use |
final CountDownLatch countDownLatch = new CountDownLatch(commitRetries); | ||
AtomicInteger value = new AtomicInteger(0); | ||
Set<Throwable> errors = new CopyOnWriteArraySet<>(); | ||
List<Thread> threads = Stream.generate(() -> new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are too many tests that try to accomplish similar tasks through multi-threading. Ultimately, these style of tests worry me because they end up being highly environment-specific--suspectible to GC pauses, noisy neighbors and such. Can we think of a different way to accomplish this that isn't time depenendent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am currently working on this. Lets see if I am able to come up with something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend just removing the test, since this is a table property and a functionality of the core Iceberg library. Adding a potentially flaky test to our suites is just not worth the potential problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdcmeehan So with the current approach of submitting 100 queries, the test case is able to pass consistently. I am now only trying to fine tune the number of threads and other parameters, so as to reduce the test run time and not introduce flakiness overall.
Let me know if you still think otherwise. We can probably discuss internally on this.
cc @agrawalreetika @imjalpreet @yingsu00 @nmahadevuni
4997d80
to
d0b8e63
Compare
98b9024
to
32b7c1f
Compare
@yingsu00 @agrawalreetika This is good to review now. |
.github/workflows/tests.yml
Outdated
@@ -45,7 +45,7 @@ jobs: | |||
- ":presto-tests -P ci-only-distributed-queries" | |||
- ":presto-tests -P ci-only-aggregation-queries" | |||
- ":presto-tests -P ci-only-plan-determinism" | |||
- ":presto-tests -P ci-only-resource-manager" | |||
- ":presto-tests -P ci-only-resource-manager" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change is not related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pratyakshsharma Can you please remove this change? It seems unrelated.
Consider changing the release note entry in the Description. Following the release note guidelines, would something like this be a better description of this PR?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdcmeehan The test was removed. Do you want to review this PR again? Thanks!
.github/workflows/tests.yml
Outdated
@@ -45,7 +45,7 @@ jobs: | |||
- ":presto-tests -P ci-only-distributed-queries" | |||
- ":presto-tests -P ci-only-aggregation-queries" | |||
- ":presto-tests -P ci-only-plan-determinism" | |||
- ":presto-tests -P ci-only-resource-manager" | |||
- ":presto-tests -P ci-only-resource-manager" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pratyakshsharma Can you please remove this change? It seems unrelated.
32b7c1f
to
6c7e025
Compare
So we are not improving the support, rather adding it from scratch in presto-iceberg connector. Please let me know in case of any more queries. |
@yingsu00 This is ready for another pass |
Thanks for the explanation! The release note guidelines Order of Changes section suggests "Additions: line starts with Add...". Maybe this suggestion works better?
|
Agreed with @steveburnett, this is a major change that users of Iceberg connector will be interested in and may want to use. We should add a release note for it. |
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java
Outdated
Show resolved
Hide resolved
@@ -73,6 +76,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig) | |||
"Format version for the table", | |||
null, | |||
false)) | |||
.add(integerProperty( | |||
CONCURRENCY_RETRIES, | |||
"Determines the number of attempts in case of concurrent upserts", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is valid in case of inserts as well. Ultimately this boils down to some writer finding the metadata file updated, as compared to what it saw in the beginning, when it tries to commit. So upserts
seems more suitable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By update, I am trying to convey a more general meaning that upset, which may include deletes. Feel free to use any language that encapsulates this, but I feel upsert does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can probably make it upserts and deletes then.
Updated release notes @steveburnett @tdcmeehan |
f2030b8
to
d0fdec1
Compare
@pratyakshsharma, it looks great! Thank you. |
d0fdec1
to
3cac4d1
Compare
The singlestore test failure is unrelated. Raised an issue here - #21892 |
This is good for final pass @yingsu00 |
0493689
to
35fad55
Compare
@tdcmeehan Do you have more comments? |
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java
Outdated
Show resolved
Hide resolved
Introduce new iceberg table property `commit_retries` to handle concurrent insertions across catalogs.
35fad55
to
cb449d2
Compare
@tdcmeehan please take another pass |
@yingsu00 this is good to merge. |
Description
Number of successful concurrent inserts/updates to an iceberg table is controlled by the table property
commit.retry.num-retries
which is by default set to 4 in TableProperties.java class in Iceberg. This PR aims to make this configurable.Motivation and Context
#21251
Impact
Introduction of new user-facing config for iceberg connector:
iceberg.commit-retries
Test Plan
Manual and unit tests
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.