Skip to content

Commit

Permalink
Additional validation for queries with "json_list" format and datetim…
Browse files Browse the repository at this point in the history
…e types (#10208)
  • Loading branch information
evanevanevanevannnn authored Oct 17, 2024
1 parent fd93fab commit a71cc84
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 0 deletions.
69 changes: 69 additions & 0 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (hasPartitioning) {
if (NYql::NS3::HasWildcards(location)) {
Expand Down Expand Up @@ -263,6 +264,30 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

template<typename TScheme>
static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
if (format != "json_list"sv) {
return issues;
}

TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};

for (const auto& column: schema.column()) {
if (partitionedBySet.contains(column.name())) {
continue;
}
if (ValidateDateOrTimeType(column.type())) {
issues.AddIssue(MakeErrorIssue(
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder{} << "Date, Timestamp and Interval types are not allowed in json_list format (you have '"
<< column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
}
}

return issues;
}

template<typename TScheme>
static NYql::TIssues ValidateRawFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
Expand Down Expand Up @@ -800,6 +825,50 @@ struct TObjectStorageExternalSource : public IExternalSource {
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
}

static std::vector<NYdb::TType> GetDateOrTimeTypes() {
NYdb::TType dateType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Date).Build();
NYdb::TType datetimeType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Datetime).Build();
NYdb::TType timestampType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Timestamp).Build();
NYdb::TType intervalType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Interval).Build();
NYdb::TType date32Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Date32).Build();
NYdb::TType datetime64Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Datetime64).Build();
NYdb::TType timestamp64Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Timestamp64).Build();
NYdb::TType interval64Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Interval64).Build();
NYdb::TType tzdateType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::TzDate).Build();
NYdb::TType tzdatetimeType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::TzDatetime).Build();
NYdb::TType tztimestampType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::TzTimestamp).Build();
const std::vector<NYdb::TType> result {
dateType,
datetimeType,
timestampType,
intervalType,
date32Type,
datetime64Type,
timestamp64Type,
interval64Type,
tzdateType,
tzdatetimeType,
tztimestampType,
NYdb::TTypeBuilder{}.Optional(dateType).Build(),
NYdb::TTypeBuilder{}.Optional(datetimeType).Build(),
NYdb::TTypeBuilder{}.Optional(timestampType).Build(),
NYdb::TTypeBuilder{}.Optional(intervalType).Build(),
NYdb::TTypeBuilder{}.Optional(date32Type).Build(),
NYdb::TTypeBuilder{}.Optional(datetime64Type).Build(),
NYdb::TTypeBuilder{}.Optional(timestamp64Type).Build(),
NYdb::TTypeBuilder{}.Optional(interval64Type).Build(),
NYdb::TTypeBuilder{}.Optional(tzdateType).Build(),
NYdb::TTypeBuilder{}.Optional(tzdatetimeType).Build(),
NYdb::TTypeBuilder{}.Optional(tztimestampType).Build()
};
return result;
}

static bool ValidateDateOrTimeType(const NYdb::TType& columnType) {
static const std::vector<NYdb::TType> availableTypes = GetDateOrTimeTypes();
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
}

private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Partition by must always be specified");
}

Y_UNIT_TEST(FailedJsonListValidation) {
static auto invalidTypes = {
Ydb::Type::DATE,
Ydb::Type::DATETIME,
Ydb::Type::TIMESTAMP,
Ydb::Type::INTERVAL,
Ydb::Type::DATE32,
Ydb::Type::DATETIME64,
Ydb::Type::TIMESTAMP64,
Ydb::Type::INTERVAL64,
Ydb::Type::TZ_DATE,
Ydb::Type::TZ_DATETIME,
Ydb::Type::TZ_TIMESTAMP,
};
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
for (const auto typeId : invalidTypes) {
auto newColumn = schema.add_column();
newColumn->mutable_type()->set_type_id(typeId);
}
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"format", "json_list"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format");
}

Y_UNIT_TEST(WildcardsValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
Expand Down
24 changes: 24 additions & 0 deletions ydb/library/yql/providers/common/provider/yql_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,30 @@ bool ValidateFormatForInput(
return false;
}
}
else if (schemaStructRowType && format == TStringBuf("json_list")) {
bool failedSchemaColumns = false;

for (const TItemExprType* item : schemaStructRowType->GetItems()) {
if (excludeFields && excludeFields(item->GetName())) {
continue;
}
const TTypeAnnotationNode* rowType = item->GetItemType();
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
}

if (rowType->GetKind() == ETypeAnnotationKind::Data
&& IsDataTypeDateOrTzDateOrInterval(rowType->Cast<TDataExprType>()->GetSlot())) {
ctx.AddError(TIssue(TStringBuilder() << "Date, Timestamp and Interval types are not allowed in json_list format (you have '"
<< item->GetName() << " " << FormatType(rowType) << "' field)"));
failedSchemaColumns = true;
}
}

if (failedSchemaColumns) {
return false;
}
}
return true;
}

Expand Down
63 changes: 63 additions & 0 deletions ydb/tests/fq/s3/test_s3_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,69 @@ def test_inference_unsupported_types(self, kikimr, s3, client, unique_prefix):
assert result_set.rows[2].items[0].int64_value == 30
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_v2
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_json_list_formats(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = '''[
{ "date" : "", "datetime" : "", "timestamp" : "", "interval" : "", "date32" : "", "datetime64" : "", "timestamp64" : "", "interval64" : "", "tzDate" : "", "tzDateTime" : "", "tzTimestamp" : "" },
{ "date" : "", "datetime" : "", "timestamp" : "", "interval" : "", "date32" : "", "datetime64" : "", "timestamp64" : "", "interval64" : "", "tzDate" : "", "tzDateTime" : "", "tzTimestamp" : "" },
{ "date" : "", "datetime" : "", "timestamp" : "", "interval" : "", "date32" : "", "datetime64" : "", "timestamp64" : "", "interval64" : "", "tzDate" : "", "tzDateTime" : "", "tzTimestamp" : "" }
]'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='timestamp.json', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
SELECT *
FROM `{storage_connection_name}`.`/timestamp.json`
WITH (
format="json_list",
schema=(
`date` date,
`datetime` datetime,
`timestamp` timestamp,
`interval` interval,
`date32` date32,
`datetime64` datetime64,
`timestamp64` timestamp64,
`interval64` interval64,
`tzDate` tzDate,
`tzDateTime` tzDateTime,
`tzTimestamp` tzTimestamp
));
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

error_message = str(client.describe_query(query_id).result)
assert "Date, Timestamp and Interval types are not allowed in json_list format" in error_message
assert "Date" in error_message
assert "Datetime" in error_message
assert "Timestamp" in error_message
assert "Interval" in error_message
assert "Date32" in error_message
assert "Datetime64" in error_message
assert "Timestamp64" in error_message
assert "Interval64" in error_message
assert "TzDate" in error_message
assert "TzDatetime" in error_message
assert "TzTimestamp" in error_message

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):
Expand Down

0 comments on commit a71cc84

Please sign in to comment.