Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg]Support specifying S3 path as warehouse dir for Hadoop catalog #24221

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

hantangwangd
Copy link
Member

@hantangwangd hantangwangd commented Dec 9, 2024

Description

This PR enable iceberg connector to use S3 location as warehouse dir for Hadoop catalog.

By configuring the s3 properties described in https://prestodb.io/docs/current/connector/hive.html#amazon-s3-configuration, we can specify a S3 location as the warehouse dir of Hadoop catalog. This way, both metadata and data
of iceberg tables will be maintained on S3 storage.

An example configuration includes:

    connector.name=iceberg
    iceberg.catalog.type=hadoop
    iceberg.catalog.warehouse=s3://iceberg_bucket/warehouse

    hive.s3.use-instance-credentials=false
    hive.s3.aws-access-key=accesskey
    hive.s3.aws-secret-key=secretkey
    hive.s3.endpoint=http://192.168.0.103:9878
    hive.s3.path-style-access=true

Motivation and Context

Support specifying a S3 location directly as warehouse dir for Iceberg with Hadoop catalog

Impact

N/A

Test Plan

  • Manually confirmed in local environment (including Ozone and MinIO)
  • Make sure this change do not affect any existing test cases
  • Newly added test class to run test cases in IcebergDistributedTestBase and TestIcebergDistributedQueries on S3 storage provided by dockerized MinIO

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== RELEASE NOTES ==

Iceberg Connector Changes
* Enable iceberg connector to use S3 location as warehouse dir for Hadoop catalog. :pr:`24221`

@hantangwangd hantangwangd requested a review from a team as a code owner December 9, 2024 16:47
@tdcmeehan tdcmeehan self-assigned this Dec 9, 2024
@hantangwangd hantangwangd force-pushed the support_s3_as_warehouse_dir branch from bbb4740 to 0ed5c96 Compare December 13, 2024 07:25
@hantangwangd hantangwangd force-pushed the support_s3_as_warehouse_dir branch 2 times, most recently from adb7573 to 976ee9c Compare December 13, 2024 15:47
@hantangwangd hantangwangd marked this pull request as draft December 14, 2024 02:51
@hantangwangd hantangwangd force-pushed the support_s3_as_warehouse_dir branch 18 times, most recently from cfaa548 to a1e1104 Compare December 16, 2024 00:16
@hantangwangd hantangwangd changed the title [WIP][Iceberg]Support s3 path as warehouse dir for hive and hadoop catalog [Iceberg]Support specifying S3 path as warehouse dir for Hadoop catalog Dec 16, 2024
@hantangwangd hantangwangd force-pushed the support_s3_as_warehouse_dir branch from a1e1104 to 925d3d6 Compare December 16, 2024 01:25
@hantangwangd hantangwangd marked this pull request as ready for review December 16, 2024 02:26
steveburnett
steveburnett previously approved these changes Jan 6, 2025
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!

steveburnett
steveburnett previously approved these changes Jan 9, 2025
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (doc)

Pull updated branch, new local doc build, looks good. Thanks!

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor question, otherwise it looks good. The testing infrastructure you added here is great!

@@ -396,8 +396,7 @@ private static boolean isDirectory(PrestoS3ObjectMetadata metadata)
}

return mediaType.is(X_DIRECTORY_MEDIA_TYPE) ||
(mediaType.is(OCTET_STREAM_MEDIA_TYPE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this condition removed?

Copy link
Member Author

@hantangwangd hantangwangd Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object Storage System do not necessarily return content type application/octet-stream, for example, Ozone return a content type binary/octet-stream, see: https://github.com/apache/ozone/blob/master/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java#L645-L647. Besides, the response examples in amazon s3 api document also used binary/octet-stream as it's content type, see the examples of Sample Response for general purpose buckets in https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html.

Moreover, I believe the remaining two conditions can already confirm that this is a directory. So I removed this condition in order to support as many Object Storage Systems as possible.

Copy link
Contributor

@ZacBlanco ZacBlanco Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, but I'm not sure if there are additional implications. If our tests pass then I am ok with it. Though I will defer to @imjalpreet for any additional thoughts. He is the one that implemented this check previously

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Could we please verify if it is possible to create an external table on an empty directory in MinIO after removing the current condition?

I added this condition because we noticed that some systems, such as MinIO, were creating empty directories with a content type of application/octet-stream instead of application/x-directory.

For your reference: #20603

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imjalpreet Thanks for providing the relevant note and issue link. I have verified that we can create an external table on an empty directory in Minio after removing condition 'mediaType.is(OCTET_STREAM_MEDIA_TYPE)', the remaining conditions 'metadata.isKeyNeedsPathSeparator() && objectMetadata.getContentLength() == 0' is capable of handling the directory identification problem described in issue #20310.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile, if we do not remove condition 'mediaType.is(OCTET_STREAM_MEDIA_TYPE)', we will get an exception External location must be a directory when trying to create an external table on an empty directory in some other Object Storage Systems like Ozone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the empty directory table covered by any test cases that run in CI? If not we should add it. Aside from that I think this PR is good to go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll check it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco @imjalpreet I have added test method testCreateExternalTableOnEmptyS3Directory() in BaseTestHiveInsertOverwrite, it verifies the behavior of creating an external hive table on an empty S3 directory. Please take a look when convenient, thanks.

ZacBlanco
ZacBlanco previously approved these changes Jan 12, 2025
Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you for the tests!

Copy link
Member

@imjalpreet imjalpreet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of nits, otherwise LGTM, thank you.

super.updateConfiguration(config);

if (this.icebergConfig.getCatalogType().equals(HADOOP)) {
// re-map filesystem schemes to match Amazon Elastic MapReduce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit unsure about what this comment meant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out this. It seems the origin comment in PrestoS3ConfigurationUpdater was copied from some other framework. I have fixed the comment in both places, please take a look when available.

Copy link
Member

@imjalpreet imjalpreet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR, LGTM!

@hantangwangd
Copy link
Member Author

Hi @tdcmeehan, this PR is ready for a final review, could you please take a look when convenient? Thanks a lot!

@tdcmeehan
Copy link
Contributor

Thanks for the PR @hantangwangd. S3 now supports conditional writes, but does this PR take advantage of conditional writes to ensure catalog consistency? If not, what protects the integrity of Iceberg's ACID transactions, given it is the responsibility of the catalog to ensure the table metadata is atomically updated?

@hantangwangd
Copy link
Member Author

@tdcmeehan Thanks for your question. As I understand, the HadoopCatalog in Iceberg lib requires that the underlying file system supports atomic rename to maintain the table transaction's atomic commit. See the comment here: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java#L77. More specifically, it just requires the underlying file system being able to atomically rename a single file, that is, rename from a temp metadata file to a final metadata file, see: https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java#L161-L162. So in order to ensure the consistency of Iceberg transactions, we just need to make sure that our PrestoS3FileSystem can atomically rename a single file.

After checking the corresponding code in PrestoS3FileSystem, I found that it splits the single file's renaming behavior into two steps as follows:

s3.copyObject(getBucketName(uri), keyFromPath(src), getBucketName(uri), keyFromPath(dst));
delete(src, true);

Among them, S3 will ensure that the copyObject() is a single atomic action. Therefore, combined with the implementation logic in HadoopTableOperations.commit(...), this copyObject() action has already ensured the atomicity of transaction submission: if the copy fails, an exception will be thrown and the transaction commission fails as well; if the copy is successful, the transaction is committed successfully as well, regardless of the deletion result of the temp metadata file.

So, as I understand, seems we don't necessarily need to use conditional writes to ensure the Hadoop catalog's consistency, although using conditional writing may lead to better implementation I guess.

What's your opinion? If I have any misunderstanding, please let me know, thanks.

@tdcmeehan
Copy link
Contributor

tdcmeehan commented Jan 14, 2025

@hantangwangd consider two different Presto engines attempting to insert into the same table, which attempt to commit the insertion at exactly the same time, to the effect that it's arbitrary which one should succeed. While the copy operation itself is atomic, wouldn't the later of the two copy operations overwrite the first one, leading to a corrupt timeline? And as far as I know, the only way to guard against this is to externally lock using some sort of metastore or external catalog that supports atomic updates, or in S3 using conditional writes (basically, the copy operation should not succeed if the destination already exists).

@hantangwangd
Copy link
Member Author

@tdcmeehan If we look deeper into the implementation of HadoopTableOperations.renameToFinal(...), we will find that it has already done the concurrent control you described. Firstly, it uses strictly monotonically increasing version numbers to prevent a later commit transaction which is based on an older version from overwriting an earlier commit transaction which is based on a newer version. Secondly, it has a lockManager to ensure that the renaming process for the same dest path is synchronized, and in this synchronized process it will check again whether the target path already exists, if so, the rename will fail and the current transaction may be resubmitted based on the latest updated metadata version if possible.

I think these measures can ensure the isolation and consistency of concurrent transactions, and ensure that the scenario you described will not lead to corrupt and inconsistency. What do you think?

@hantangwangd
Copy link
Member Author

Sorry I didn't notice that you were referring to two different Presto engines, in that case, the memory based lock manager in HadoopTableOperations could not keep synchronized between them and the latter copy would overwrite the earlier copy.

After checking s3 document for conditional writes and copyObject, I found that s3 do not support conditional writes for copyObject, it just support putObject and CompleteMultipartUpload. So it seem that we can not use conditional write in this scenario.

However, Iceberg provide DynamoDb based implementation for LockManager, see: https://iceberg.apache.org/docs/1.6.0/aws/#dynamodb-lock-manager, I think configuring this lock manager in all independent Presto engines which may handle the same Iceberg table can solve the problem you mentioned. Any thoughts or concerns please let me know.

@tdcmeehan
Copy link
Contributor

@hantangwangd agreed that we need some sort of consistent external locking mechanism, such as a database to ensure transactional integrity.

My concern with using the Hadoop catalog in general over an object storage system that doesn't support atomic swaps is how do users configure it? What is happening in S3, apparently even with condition write support as you pointed out above, is the rename isn't really atomic, because two versions may both succeed, with the later of the two operations overwriting the first. This requires some external solution outside of the object store, which is another infrastructure component. Can a Spark engine also be configured, but the user forgets to configure a DynamoDB based lock manager, and eventually this causes a corrupt timeline?

Another thought is, what if we implement a custom S3-only catalog, and this properly leverages an algorithm which ensures transactional integrity of writes (it seems a Hadoop based solution can't currently do this)? The problem with this is, how do we test it against third party object stores which implement the S3 API? Do all of them now support conditional writes?

Note the Iceberg community seems pessimistic about supporting a file-only catalog: https://lists.apache.org/thread/v7x65kxrrozwlvsgstobm7685541lf5w . So a final worry I have is we are going against the grain on this approach, thinking of Presto in the context of larger engine interoperability.

@hantangwangd
Copy link
Member Author

hantangwangd commented Jan 15, 2025

@tdcmeehan Thanks for the information you provided, I also read the discussions when the Iceberg community making the decision to deprecate HadoopCatalog and HadoopTableOperations, see: https://lists.apache.org/thread/oohcjfp1vpo005h2r0f6gfpsp6op0qps. I agree that your concerns are reasonable, currently run HadoopCatalog directly on object storage system is not safe.

As I learned from the conversations, seems that the Iceberg community mainly wants to get rid of the specific implementation details of a file based Catalog, so they mostly converged toward the direction of REST catalog, even in the case where HadoopCatalog can work well on HDFS and conditional writes can make transaction actomicity not a problem on Object Storage System. I also noticed that many people do not want to give up file based HadoopCatalog, they hope to improve HadoopCatalog so that it can run safely on Object Storage Systems, especially after the S3 conditional write features appeared. So it may evolve as an independent module or even project. Additionally, there are also discussions on building a dedicated Catalog for only Object Storage Systems that provides conditional writes, which you mentioned above.

I will draft this PR and do some more in-depth investigations, take a look at the subsequent evolution direction after the emergence of conditional writes. Thank you for the whole discussion, it's very helpful to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants