Skip to content

Commit

Permalink
s3 listing strategy has been fixed (#9499)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Sep 25, 2024
1 parent 32809cc commit 041ec57
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 11 deletions.
20 changes: 10 additions & 10 deletions ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& option

namespace {

TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

using namespace NThreading;
using namespace NS3Lister;

Expand Down Expand Up @@ -497,15 +502,10 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
return NextDirectoryListeningChunk;
}

static TString ParseBasePath(const TString& path) {
TString basePath = TString{TStringBuf{path}.RBefore('/')};
return basePath == path && !basePath.EndsWith('/') ? TString{} : basePath;
}

void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
result.Directories.push_back({.Path = ParseBasePath(sourcePrefix)});
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
DirectoryPrefixQueue.clear();
}
Expand All @@ -524,10 +524,10 @@ class TBFSDirectoryResolverIterator : public IS3Lister {
}
} else {
for (auto& directoryPrefix : listingResult.Directories) {
result.Directories.push_back({.Path = directoryPrefix.Path});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix.Path)});
}
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
DirectoryPrefixQueue.clear();
}
Expand Down Expand Up @@ -775,10 +775,10 @@ class TConcurrentBFSDirectoryResolverIterator : public IS3Lister {
// TODO: add verification
auto result = TListEntries{.Objects = Objects, .ListedObjectSize = ListedObjectSize};
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
for (auto& directoryPrefix: InProgressPaths) {
result.Directories.push_back({.Path = directoryPrefix});
result.Directories.push_back({.Path = ParseBasePath(directoryPrefix)});
}
for (auto& directoryEntry : Directories) {
result.Directories.push_back(directoryEntry);
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/fq/s3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

from ydb.tests.tools.fq_runner.fq_client import FederatedQueryClient
from ydb.tests.tools.fq_runner.custom_hooks import * # noqa: F401,F403 Adding custom hooks for YQv2 support
from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddAllowConcurrentListingsExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddDataInflightExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddFormatSizeLimitExtension
from ydb.tests.tools.fq_runner.kikimr_utils import AddInflightExtension
from ydb.tests.tools.fq_runner.kikimr_utils import DefaultConfigExtension
from ydb.tests.tools.fq_runner.kikimr_utils import YQv2Extension
from ydb.tests.tools.fq_runner.kikimr_utils import ComputeExtension
Expand Down Expand Up @@ -89,6 +90,7 @@ def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external
return [
AddFormatSizeLimitExtension(),
AddInflightExtension(),
AddAllowConcurrentListingsExtension(),
AddDataInflightExtension(),
DefaultConfigExtension(s3.s3_url),
YQv2Extension(yq_version, kikimr_settings.get("is_replace_if_exists", False)),
Expand Down
78 changes: 78 additions & 0 deletions ydb/tests/fq/s3/test_s3_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,81 @@ def test_top_level_listing(self, kikimr, s3, client, runtime_listing, unique_pre
assert result_set.rows[5].items[1].int32_value == 15
assert result_set.rows[5].items[2].int32_value == 33
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("runtime_listing", ["false", "true"])
@pytest.mark.parametrize("kikimr_params", [{"allow_concurrent_listings": True}], indirect=True)
def test_top_level_listing_2(self, kikimr, s3, client, runtime_listing, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = '''Fruit,Price,Weight
Banana,3,100
Apple,2,22
Pear,15,33'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-09.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-09-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='2024-08-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/a/2024-08-08.csv', ContentType='text/plain')
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='/b/test.csv', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "test_top_level_listing_2"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
pragma s3.UseRuntimeListing="{runtime_listing}";
SELECT *
FROM `{storage_connection_name}`.`/2024-08-*`
WITH (format=csv_with_names, SCHEMA (
Fruit String NOT NULL,
Price Int NOT NULL,
Weight Int NOT NULL
)
);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 3
assert result_set.columns[0].name == "Fruit"
assert result_set.columns[0].type.type_id == ydb.Type.STRING
assert result_set.columns[1].name == "Price"
assert result_set.columns[1].type.type_id == ydb.Type.INT32
assert result_set.columns[2].name == "Weight"
assert result_set.columns[2].type.type_id == ydb.Type.INT32
assert len(result_set.rows) == 6
assert result_set.rows[0].items[0].bytes_value == b"Banana"
assert result_set.rows[0].items[1].int32_value == 3
assert result_set.rows[0].items[2].int32_value == 100
assert result_set.rows[1].items[0].bytes_value == b"Apple"
assert result_set.rows[1].items[1].int32_value == 2
assert result_set.rows[1].items[2].int32_value == 22
assert result_set.rows[2].items[0].bytes_value == b"Pear"
assert result_set.rows[2].items[1].int32_value == 15
assert result_set.rows[2].items[2].int32_value == 33
assert result_set.rows[3].items[0].bytes_value == b"Banana"
assert result_set.rows[3].items[1].int32_value == 3
assert result_set.rows[3].items[2].int32_value == 100
assert result_set.rows[4].items[0].bytes_value == b"Apple"
assert result_set.rows[4].items[1].int32_value == 2
assert result_set.rows[4].items[2].int32_value == 22
assert result_set.rows[5].items[0].bytes_value == b"Pear"
assert result_set.rows[5].items[1].int32_value == 15
assert result_set.rows[5].items[2].int32_value == 33
assert sum(kikimr.control_plane.get_metering(1)) == 10
12 changes: 12 additions & 0 deletions ydb/tests/tools/fq_runner/kikimr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ def apply_to_kikimr(self, request, kikimr):
del request.param["inflight"]


class AddAllowConcurrentListingsExtension(ExtensionPoint):
def is_applicable(self, request):
return (hasattr(request, 'param')
and isinstance(request.param, dict)
and "allow_concurrent_listings" in request.param)

def apply_to_kikimr(self, request, kikimr):
kikimr.allow_concurrent_listings = request.param["allow_concurrent_listings"]
kikimr.compute_plane.fq_config['gateways']['s3']['allow_concurrent_listings'] = kikimr.allow_concurrent_listings
del request.param["allow_concurrent_listings"]


class AddDataInflightExtension(ExtensionPoint):
def is_applicable(self, request):
return (hasattr(request, 'param')
Expand Down

0 comments on commit 041ec57

Please sign in to comment.