Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

configure tiering on CS via ttl #12095

Merged
merged 9 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/grpc_services/rpc_create_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreate
return false;
}
}
tableDesc->MutableTtlSettings()->SetUseTiering(req.tiering());

return true;
}
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/grpc_services/rpc_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ class TCreateLogTableRPC : public TRpcSchemeRequestActor<TCreateLogTableRPC, TEv
if (!FillTtlSettings(*create->MutableTtlSettings()->MutableEnabled(), req->ttl_settings(), status, error)) {
return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
}
} else if (req->has_tiering_settings()) {
create->MutableTtlSettings()->SetUseTiering(req->tiering_settings().tiering_id());
}

create->SetColumnShardCount(req->shards_count());
Expand Down Expand Up @@ -600,12 +598,6 @@ class TAlterLogTableRPC : public TRpcSchemeRequestActor<TAlterLogTableRPC, TEvAl
alter->MutableAlterTtlSettings()->MutableDisabled();
}

if (req->has_set_tiering_settings()) {
alter->MutableAlterTtlSettings()->SetUseTiering(req->set_tiering_settings().tiering_id());
} else if (req->has_drop_tiering_settings()) {
alter->MutableAlterTtlSettings()->SetUseTiering("");
}

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
};
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,6 @@ bool ConvertCreateTableSettingsToProto(NYql::TKikimrTableMetadataPtr metadata, Y
}
}

if (const auto& tiering = metadata->TableSettings.Tiering) {
if (tiering.IsSet()) {
proto.set_tiering(tiering.GetValueSet());
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = "Can't reset TIERING";
return false;
}
}

if (metadata->TableSettings.StoreExternalBlobs) {
auto& storageSettings = *proto.mutable_storage_settings();
TString value = to_lower(metadata->TableSettings.StoreExternalBlobs.GetRef());
Expand Down Expand Up @@ -520,7 +510,15 @@ bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet();
auto& resultSettings = *tableDesc.MutableTtlSettings();
resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName);
resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds());
for (const auto& tier : inputSettings.Tiers) {
auto* tierProto = resultSettings.MutableEnabled()->AddTiers();
tierProto->SetApplyAfterSeconds(tier.ApplyAfter.Seconds());
if (tier.StorageName) {
tierProto->MutableEvictToExternalStorage()->SetStorageName(*tier.StorageName);
} else {
tierProto->MutableDelete();
}
}
if (inputSettings.ColumnUnit) {
resultSettings.MutableEnabled()->SetColumnUnit(static_cast<NKikimrSchemeOp::TTTLSettings::EUnit>(*inputSettings.ColumnUnit));
}
Expand Down
7 changes: 0 additions & 7 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1686,13 +1686,6 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
ConvertTtlSettingsToProto(ttlSettings, *alterTableRequest.mutable_set_ttl_settings());
} else if (name == "resetTtlSettings") {
alterTableRequest.mutable_drop_ttl_settings();
} else if (name == "setTiering") {
const auto tieringName = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
alterTableRequest.set_set_tiering(tieringName);
} else if (name == "resetTiering") {
alterTableRequest.mutable_drop_tiering();
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown table profile setting: " << name));
Expand Down
32 changes: 16 additions & 16 deletions ydb/core/kqp/provider/yql_kikimr_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,42 +105,36 @@ bool TTtlSettings::TryParse(const NNodes::TCoNameValueTupleList& node, TTtlSetti
if (name == "columnName") {
YQL_ENSURE(field.Value().Maybe<TCoAtom>());
settings.ColumnName = field.Value().Cast<TCoAtom>().StringValue();
} else if (name == "expireAfter") {
// TODO (yentsovsemyon): remove this clause after extending TTL syntax in YQL
YQL_ENSURE(field.Value().Maybe<TCoInterval>());
auto value = FromString<i64>(field.Value().Cast<TCoInterval>().Literal().Value());
if (value < 0) {
error = "Interval value cannot be negative";
return false;
}

settings.ExpireAfter = TDuration::FromValue(value);
} else if (name == "tiers") {
YQL_ENSURE(field.Value().Maybe<TExprList>());
auto listNode = field.Value().Cast<TExprList>();

for (size_t i = 0; i < listNode.Size(); ++i) {
auto tierNode = listNode.Item(i);

std::optional<TString> storageName;
TDuration evictionDelay;
YQL_ENSURE(tierNode.Maybe<TCoNameValueTupleList>());
for (const auto& tierField : tierNode.Cast<TCoNameValueTupleList>()) {
auto tierFieldName = tierField.Name().Value();
if (tierFieldName == "storageName") {
error = "TTL cannot contain tiered storage: tiering in TTL syntax is not supported";
return false;
YQL_ENSURE(tierField.Value().Maybe<TCoAtom>());
storageName = tierField.Value().Cast<TCoAtom>().StringValue();
} else if (tierFieldName == "evictionDelay") {
YQL_ENSURE(tierField.Value().Maybe<TCoInterval>());
auto value = FromString<i64>(tierField.Value().Cast<TCoInterval>().Literal().Value());
if (value < 0) {
error = "Interval value cannot be negative";
return false;
}
settings.ExpireAfter = TDuration::FromValue(value);
evictionDelay = TDuration::FromValue(value);
} else {
error = TStringBuilder() << "Unknown field: " << tierFieldName;
return false;
}
}

settings.Tiers.emplace_back(evictionDelay, storageName);
}
} else if (name == "columnUnit") {
YQL_ENSURE(field.Value().Maybe<TCoAtom>());
Expand Down Expand Up @@ -322,9 +316,15 @@ void ConvertTtlSettingsToProto(const NYql::TTtlSettings& settings, Ydb::Table::T
opts.set_column_name(settings.ColumnName);
opts.set_column_unit(static_cast<Ydb::Table::ValueSinceUnixEpochModeSettings::Unit>(*settings.ColumnUnit));
}
auto* deleteTier = proto.add_tiers();
deleteTier->set_apply_after_seconds(settings.ExpireAfter.Seconds());
deleteTier->mutable_delete_();
for (const auto& tier : settings.Tiers) {
auto* tierProto = proto.add_tiers();
tierProto->set_apply_after_seconds(tier.ApplyAfter.Seconds());
if (tier.StorageName) {
tierProto->mutable_evict_to_external_storage()->set_storage_name(*tier.StorageName);
} else {
tierProto->mutable_delete_();
}
}
}

Ydb::FeatureFlag::Status GetFlagValue(const TMaybe<bool>& value) {
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,14 @@ struct TTtlSettings {
Nanoseconds = 4,
};

struct TTier {
TDuration ApplyAfter;
std::optional<TString> StorageName;
};

TString ColumnName;
TDuration ExpireAfter;
TMaybe<EUnit> ColumnUnit;
std::vector<TTier> Tiers;

static bool TryParse(const NNodes::TCoNameValueTupleList& node, TTtlSettings& settings, TString& error);
};
Expand All @@ -242,7 +247,6 @@ struct TTableSettings {
TMaybe<TString> KeyBloomFilter;
TMaybe<TString> ReadReplicasSettings;
TResetableSetting<TTtlSettings, void> TtlSettings;
TResetableSetting<TString, void> Tiering;
TMaybe<TString> PartitionByHashFunction;
TMaybe<TString> StoreExternalBlobs;

Expand Down
8 changes: 0 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,14 +1251,6 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't reset TTL settings"));
return TStatus::Error;
} else if (name == "setTiering") {
meta->TableSettings.Tiering.Set(TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
));
} else if (name == "resetTiering") {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't reset TIERING"));
return TStatus::Error;
} else if (name == "storeType") {
TMaybe<TString> storeType = TString(setting.Value().Cast<TCoAtom>().Value());
if (storeType && to_lower(storeType.GetRef()) == "column") {
Expand Down
21 changes: 3 additions & 18 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,14 @@ namespace NKqp {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

TString TTestHelper::CreateTieringRule(const TString& tierName, const TString& columnName) {
const TString ruleName = tierName + "_" + columnName;
const TString configTieringStr = TStringBuilder() << R"({
"rules" : [
{
"tierName" : ")" << tierName << R"(",
"durationForEvict" : "10d"
}
]
})";
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
return ruleName;
}

void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')";
void TTestHelper::SetTiering(const TString& tableName, const TString& tierName, const TString& columnName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET TTL Interval(\"P10D\") TO EXTERNAL DATA SOURCE `" << tierName << "` ON `" << columnName << "`;";
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::ResetTiering(const TString& tableName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)";
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TTL)";
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ class TTestHelper {
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void EnsureSecret(const TString& name, const TString& value);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void SetTiering(const TString& tableName, const TString& tierName, const TString& columnName);
void ResetTiering(const TString& tableName);
void BulkUpsert(
const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
Expand Down
Loading
Loading