Skip to content

Commit

Permalink
support folders in S3 uri in CS tiers (#13337)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Jan 15, 2025
1 parent 9829467 commit 6a230d5
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 44 deletions.
48 changes: 5 additions & 43 deletions ydb/core/tx/tiering/tier/object.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "object.h"
#include "s3_uri.h"

#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/protobuf/json/proto2json.h>
Expand Down Expand Up @@ -46,50 +47,11 @@ TConclusionStatus TTierConfig::DeserializeFromProto(const NKikimrSchemeOp::TExte
}
}

NUri::TUri url;
if (url.Parse(proto.GetLocation(), NUri::TFeature::FeaturesAll) != NUri::TState::EParsed::ParsedOK) {
return TConclusionStatus::Fail("Cannot parse url: " + proto.GetLocation());
}

switch (url.GetScheme()) {
case NUri::TScheme::SchemeEmpty:
break;
case NUri::TScheme::SchemeHTTP:
ProtoConfig.SetScheme(::NKikimrSchemeOp::TS3Settings_EScheme_HTTP);
break;
case NUri::TScheme::SchemeHTTPS:
ProtoConfig.SetScheme(::NKikimrSchemeOp::TS3Settings_EScheme_HTTPS);
break;
default:
return TConclusionStatus::Fail("Unknown schema in url");
}

{
TStringBuf endpoint;
TStringBuf bucket;

TStringBuf host = url.GetHost();
TStringBuf path = url.GetField(NUri::TField::FieldPath);
if (!path.Empty()) {
endpoint = host;
bucket = path;
bucket.SkipPrefix("/");
if (bucket.Contains("/")) {
return TConclusionStatus::Fail(TStringBuilder() << "Not a bucket (contains directories): " << bucket);
}
} else {
if (!path.TrySplit('.', endpoint, bucket)) {
return TConclusionStatus::Fail(TStringBuilder() << "Bucket is not specified in URL: " << path);
}
}

if (url.GetField(NUri::TField::FieldPort)) {
ProtoConfig.SetEndpoint(TStringBuilder() << endpoint << ":" << url.GetPort());
} else {
ProtoConfig.SetEndpoint(TString(endpoint));
}
ProtoConfig.SetBucket(TString(bucket));
auto parsedUri = TS3Uri::ParseUri(proto.GetLocation());
if (parsedUri.IsFail()) {
return parsedUri;
}
parsedUri->FillSettings(ProtoConfig);

return TConclusionStatus::Success();
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/tiering/tier/s3_uri.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#include "s3_uri.h"

namespace NKikimr::NColumnShard::NTiers {
}
179 changes: 179 additions & 0 deletions ydb/core/tx/tiering/tier/s3_uri.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>

#include <library/cpp/uri/uri.h>
#include <util/string/builder.h>

namespace NKikimr::NColumnShard::NTiers {

class TS3Uri {
private:
YDB_READONLY_DEF(std::optional<NKikimrSchemeOp::TS3Settings_EScheme>, Scheme);
YDB_READONLY_DEF(TString, Bucket);
YDB_READONLY_DEF(TString, Host);
YDB_READONLY_DEF(std::optional<ui16>, Port);
YDB_READONLY_DEF(std::optional<TString>, Folder);

enum TUriStyle {
PATH_STYLE = 1,
VIRTUAL_HOSTED_STYLE = 2,
};

inline static const std::vector<TString> BucketHostSeparators = { ".s3.", ".s3-" };

private:
static TStringBuf StripPath(const TStringBuf& path) {
TStringBuf stripped = path;
while (stripped.SkipPrefix("/")) {
}
while (stripped.ChopSuffix("/")) {
}
return stripped;
}

static std::optional<TUriStyle> DeduceUriStyle(const NUri::TUri& uri) {
const bool hasSubdomain = std::count(uri.GetHost().begin(), uri.GetHost().end(), '.') >= 2;
const bool hasPath = !StripPath(uri.GetField(NUri::TField::FieldPath)).Empty();
if (hasSubdomain && !hasPath) {
return VIRTUAL_HOSTED_STYLE;
}
if (!hasSubdomain && hasPath) {
return PATH_STYLE;
}

// URI style deduction copied from AWS SDK for Java
for (const TString& sep : BucketHostSeparators) {
if (uri.GetHost().StartsWith(sep.substr(1))) {
return PATH_STYLE;
}
if (uri.GetHost().Contains(sep)) {
return VIRTUAL_HOSTED_STYLE;
}
}

return std::nullopt;
}

static TConclusion<TS3Uri> ParsePathStyleUri(const NUri::TUri& input) {
TS3Uri result;

TStringBuf path = StripPath(input.GetField(NUri::TField::FieldPath));

if (path.Empty()) {
return TConclusionStatus::Fail(TStringBuilder() << "Missing bucket in path-style S3 uri: " << input.Serialize());
}

TStringBuf folder;
TStringBuf bucket;
if (path.TryRSplit('/', folder, bucket)) {
result.Folder = folder;
result.Bucket = bucket;
} else {
result.Bucket = path;
}

result.Host = input.GetHost();

if (auto status = result.FillStyleAgnosticFields(input); status.IsFail()) {
return status;
}
return result;
}

static TConclusion<TS3Uri> ParseVirtualHostedStyleUri(const NUri::TUri& input) {
TS3Uri result;

for (const TString& sep : BucketHostSeparators) {
if (const ui64 findSep = input.GetHost().find(sep); findSep != TStringBuf::npos) {
result.Bucket = input.GetHost().SubStr(0, findSep);
result.Host = input.GetHost().SubStr(findSep + 1);
break;
}
}
if (result.Host.empty()) {
TStringBuf host;
TStringBuf bucket;
if (input.GetHost().TrySplit('.', bucket, host)) {
result.Host = host;
result.Bucket = bucket;
} else {
return TConclusionStatus::Fail(TStringBuilder() << "Missing bucket in virtual-hosted style S3 uri: " << input.Serialize());
}
}

if (TStringBuf path = StripPath(input.GetField(NUri::TField::FieldPath))) {
result.Folder = path;
}

if (auto status = result.FillStyleAgnosticFields(input); status.IsFail()) {
return status;
}
return result;
}

TConclusionStatus FillStyleAgnosticFields(const NUri::TUri& from) {
if (from.GetField(NUri::TField::FieldPort)) {
Port = from.GetPort();
}

switch (from.GetScheme()) {
case NUri::TScheme::SchemeEmpty:
break;
case NUri::TScheme::SchemeHTTP:
Scheme = NKikimrSchemeOp::TS3Settings_EScheme_HTTP;
break;
case NUri::TScheme::SchemeHTTPS:
Scheme = NKikimrSchemeOp::TS3Settings_EScheme_HTTPS;
break;
default:
return TConclusionStatus::Fail(TStringBuilder() << "Unexpected scheme in url: " << from.Serialize());
}

return TConclusionStatus::Success();
}

public:
static TConclusion<TS3Uri> ParseUri(const TString& input) {
NUri::TUri uri;
if (uri.Parse(input, NUri::TFeature::NewFeaturesRecommended) != NUri::TState::EParsed::ParsedOK) {
return TConclusionStatus::Fail("Cannot parse URI: " + input);
}

TUriStyle uriStyle;
if (const auto deducedStyle = DeduceUriStyle(uri)) {
uriStyle = *deducedStyle;
} else {
uriStyle = PATH_STYLE;
}

switch (uriStyle) {
case PATH_STYLE:
return ParsePathStyleUri(uri);
case VIRTUAL_HOSTED_STYLE:
return ParseVirtualHostedStyleUri(uri);
}
}

TString GetEndpoint() const {
TString endpoint = Host;
if (Port) {
endpoint += TStringBuilder() << ':' << *Port;
}
if (Folder) {
endpoint += TStringBuilder() << '/' << *Folder;
}
return endpoint;
}

void FillSettings(NKikimrSchemeOp::TS3Settings& settings) const {
settings.SetEndpoint(GetEndpoint());
settings.SetBucket(Bucket);
if (Scheme) {
settings.SetScheme(*Scheme);
}
}
};

} // namespace NKikimr::NColumnShard::NTiers
2 changes: 2 additions & 0 deletions ydb/core/tx/tiering/tier/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ LIBRARY()

SRCS(
object.cpp
s3_uri.cpp
)

PEERDIR(
ydb/library/conclusion
ydb/services/metadata/secret/accessor
contrib/restricted/aws/aws-crt-cpp
)

YQL_LAST_ABI_VERSION()
Expand Down
40 changes: 39 additions & 1 deletion ydb/core/tx/tiering/ut/ut_object.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#include <ydb/core/tx/tiering/tier/object.h>
#include <ydb/core/tx/tiering/tier/s3_uri.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr {

using namespace NColumnShard;

Y_UNIT_TEST_SUITE(S3SettingsConvertion) {
Y_UNIT_TEST_SUITE(S3SettingsConversion) {
void ValidateConversion(
const NKikimrSchemeOp::TExternalDataSourceDescription& input, TConclusion<const NKikimrSchemeOp::TS3Settings> expectedResult) {
NTiers::TTierConfig config;
Expand Down Expand Up @@ -69,6 +70,43 @@ Y_UNIT_TEST_SUITE(S3SettingsConvertion) {
)", &output));
ValidateConversion(input, output);
}

Y_UNIT_TEST(FoldersStrictStyle) {
std::vector<TString> uris = {
"http://s3.yandexcloud.net:8080/my-folder/subfolder/bucket",
"http://bucket.s3.yandexcloud.net:8080/my-folder/subfolder",
};
for (const auto& input : uris) {
NTiers::TS3Uri uri = NTiers::TS3Uri::ParseUri(input).DetachResult();
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetEndpoint(), "s3.yandexcloud.net:8080/my-folder/subfolder", input);
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetBucket(), "bucket", input);
}
}

Y_UNIT_TEST(FoldersStyleDeduction) {
std::vector<TString> uris = {
"http://storage.yandexcloud.net:8080/my-folder/subfolder/bucket",
"http://storage.yandexcloud.net:8080///my-folder/subfolder/bucket//",
};
for (const auto& input : uris) {
NTiers::TS3Uri uri = NTiers::TS3Uri::ParseUri(input).DetachResult();
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetEndpoint(), "storage.yandexcloud.net:8080/my-folder/subfolder", input);
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetBucket(), "bucket", input);
}
}

Y_UNIT_TEST(StyleDeduction) {
std::vector<TString> uris = {
"http://storage.yandexcloud.net/bucket",
"http://my-s3.net/bucket",
"http://bucket.my-s3.net",
"http://bucket.my-s3.net/",
};
for (const auto& input : uris) {
NTiers::TS3Uri uri = NTiers::TS3Uri::ParseUri(input).DetachResult();
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetBucket(), "bucket", input);
}
}
}

} // namespace NKikimr

0 comments on commit 6a230d5

Please sign in to comment.