Skip to content

Commit

Permalink
configure tiering on CS via ttl (#12095)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 4, 2024
1 parent abb79c8 commit b2da93a
Show file tree
Hide file tree
Showing 76 changed files with 495 additions and 1,260 deletions.
1 change: 0 additions & 1 deletion ydb/core/grpc_services/rpc_create_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreate
return false;
}
}
tableDesc->MutableTtlSettings()->SetUseTiering(req.tiering());

return true;
}
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/grpc_services/rpc_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ class TCreateLogTableRPC : public TRpcSchemeRequestActor<TCreateLogTableRPC, TEv
if (!FillTtlSettings(*create->MutableTtlSettings()->MutableEnabled(), req->ttl_settings(), status, error)) {
return Reply(status, error, NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ctx);
}
} else if (req->has_tiering_settings()) {
create->MutableTtlSettings()->SetUseTiering(req->tiering_settings().tiering_id());
}

create->SetColumnShardCount(req->shards_count());
Expand Down Expand Up @@ -600,12 +598,6 @@ class TAlterLogTableRPC : public TRpcSchemeRequestActor<TAlterLogTableRPC, TEvAl
alter->MutableAlterTtlSettings()->MutableDisabled();
}

if (req->has_set_tiering_settings()) {
alter->MutableAlterTtlSettings()->SetUseTiering(req->set_tiering_settings().tiering_id());
} else if (req->has_drop_tiering_settings()) {
alter->MutableAlterTtlSettings()->SetUseTiering("");
}

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}
};
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,6 @@ bool ConvertCreateTableSettingsToProto(NYql::TKikimrTableMetadataPtr metadata, Y
}
}

if (const auto& tiering = metadata->TableSettings.Tiering) {
if (tiering.IsSet()) {
proto.set_tiering(tiering.GetValueSet());
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = "Can't reset TIERING";
return false;
}
}

if (metadata->TableSettings.StoreExternalBlobs) {
auto& storageSettings = *proto.mutable_storage_settings();
TString value = to_lower(metadata->TableSettings.StoreExternalBlobs.GetRef());
Expand Down Expand Up @@ -520,7 +510,15 @@ bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet();
auto& resultSettings = *tableDesc.MutableTtlSettings();
resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName);
resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds());
for (const auto& tier : inputSettings.Tiers) {
auto* tierProto = resultSettings.MutableEnabled()->AddTiers();
tierProto->SetApplyAfterSeconds(tier.ApplyAfter.Seconds());
if (tier.StorageName) {
tierProto->MutableEvictToExternalStorage()->SetStorageName(*tier.StorageName);
} else {
tierProto->MutableDelete();
}
}
if (inputSettings.ColumnUnit) {
resultSettings.MutableEnabled()->SetColumnUnit(static_cast<NKikimrSchemeOp::TTTLSettings::EUnit>(*inputSettings.ColumnUnit));
}
Expand Down
7 changes: 0 additions & 7 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1686,13 +1686,6 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
ConvertTtlSettingsToProto(ttlSettings, *alterTableRequest.mutable_set_ttl_settings());
} else if (name == "resetTtlSettings") {
alterTableRequest.mutable_drop_ttl_settings();
} else if (name == "setTiering") {
const auto tieringName = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
alterTableRequest.set_set_tiering(tieringName);
} else if (name == "resetTiering") {
alterTableRequest.mutable_drop_tiering();
} else {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown table profile setting: " << name));
Expand Down
32 changes: 16 additions & 16 deletions ydb/core/kqp/provider/yql_kikimr_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,42 +105,36 @@ bool TTtlSettings::TryParse(const NNodes::TCoNameValueTupleList& node, TTtlSetti
if (name == "columnName") {
YQL_ENSURE(field.Value().Maybe<TCoAtom>());
settings.ColumnName = field.Value().Cast<TCoAtom>().StringValue();
} else if (name == "expireAfter") {
// TODO (yentsovsemyon): remove this clause after extending TTL syntax in YQL
YQL_ENSURE(field.Value().Maybe<TCoInterval>());
auto value = FromString<i64>(field.Value().Cast<TCoInterval>().Literal().Value());
if (value < 0) {
error = "Interval value cannot be negative";
return false;
}

settings.ExpireAfter = TDuration::FromValue(value);
} else if (name == "tiers") {
YQL_ENSURE(field.Value().Maybe<TExprList>());
auto listNode = field.Value().Cast<TExprList>();

for (size_t i = 0; i < listNode.Size(); ++i) {
auto tierNode = listNode.Item(i);

std::optional<TString> storageName;
TDuration evictionDelay;
YQL_ENSURE(tierNode.Maybe<TCoNameValueTupleList>());
for (const auto& tierField : tierNode.Cast<TCoNameValueTupleList>()) {
auto tierFieldName = tierField.Name().Value();
if (tierFieldName == "storageName") {
error = "TTL cannot contain tiered storage: tiering in TTL syntax is not supported";
return false;
YQL_ENSURE(tierField.Value().Maybe<TCoAtom>());
storageName = tierField.Value().Cast<TCoAtom>().StringValue();
} else if (tierFieldName == "evictionDelay") {
YQL_ENSURE(tierField.Value().Maybe<TCoInterval>());
auto value = FromString<i64>(tierField.Value().Cast<TCoInterval>().Literal().Value());
if (value < 0) {
error = "Interval value cannot be negative";
return false;
}
settings.ExpireAfter = TDuration::FromValue(value);
evictionDelay = TDuration::FromValue(value);
} else {
error = TStringBuilder() << "Unknown field: " << tierFieldName;
return false;
}
}

settings.Tiers.emplace_back(evictionDelay, storageName);
}
} else if (name == "columnUnit") {
YQL_ENSURE(field.Value().Maybe<TCoAtom>());
Expand Down Expand Up @@ -322,9 +316,15 @@ void ConvertTtlSettingsToProto(const NYql::TTtlSettings& settings, Ydb::Table::T
opts.set_column_name(settings.ColumnName);
opts.set_column_unit(static_cast<Ydb::Table::ValueSinceUnixEpochModeSettings::Unit>(*settings.ColumnUnit));
}
auto* deleteTier = proto.add_tiers();
deleteTier->set_apply_after_seconds(settings.ExpireAfter.Seconds());
deleteTier->mutable_delete_();
for (const auto& tier : settings.Tiers) {
auto* tierProto = proto.add_tiers();
tierProto->set_apply_after_seconds(tier.ApplyAfter.Seconds());
if (tier.StorageName) {
tierProto->mutable_evict_to_external_storage()->set_storage_name(*tier.StorageName);
} else {
tierProto->mutable_delete_();
}
}
}

Ydb::FeatureFlag::Status GetFlagValue(const TMaybe<bool>& value) {
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,14 @@ struct TTtlSettings {
Nanoseconds = 4,
};

struct TTier {
TDuration ApplyAfter;
std::optional<TString> StorageName;
};

TString ColumnName;
TDuration ExpireAfter;
TMaybe<EUnit> ColumnUnit;
std::vector<TTier> Tiers;

static bool TryParse(const NNodes::TCoNameValueTupleList& node, TTtlSettings& settings, TString& error);
};
Expand All @@ -242,7 +247,6 @@ struct TTableSettings {
TMaybe<TString> KeyBloomFilter;
TMaybe<TString> ReadReplicasSettings;
TResetableSetting<TTtlSettings, void> TtlSettings;
TResetableSetting<TString, void> Tiering;
TMaybe<TString> PartitionByHashFunction;
TMaybe<TString> StoreExternalBlobs;

Expand Down
8 changes: 0 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,14 +1251,6 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't reset TTL settings"));
return TStatus::Error;
} else if (name == "setTiering") {
meta->TableSettings.Tiering.Set(TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
));
} else if (name == "resetTiering") {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
"Can't reset TIERING"));
return TStatus::Error;
} else if (name == "storeType") {
TMaybe<TString> storeType = TString(setting.Value().Cast<TCoAtom>().Value());
if (storeType && to_lower(storeType.GetRef()) == "column") {
Expand Down
21 changes: 3 additions & 18 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,14 @@ namespace NKqp {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

TString TTestHelper::CreateTieringRule(const TString& tierName, const TString& columnName) {
const TString ruleName = tierName + "_" + columnName;
const TString configTieringStr = TStringBuilder() << R"({
"rules" : [
{
"tierName" : ")" << tierName << R"(",
"durationForEvict" : "10d"
}
]
})";
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT IF NOT EXISTS " + ruleName + " (TYPE TIERING_RULE) WITH (defaultColumn = " + columnName + ", description = `" + configTieringStr + "`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
return ruleName;
}

void TTestHelper::SetTiering(const TString& tableName, const TString& ruleName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET (TIERING = '" << ruleName << "')";
void TTestHelper::SetTiering(const TString& tableName, const TString& tierName, const TString& columnName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` SET TTL Interval(\"P10D\") TO EXTERNAL DATA SOURCE `" << tierName << "` ON `" << columnName << "`;";
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::ResetTiering(const TString& tableName) {
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TIERING)";
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << tableName << "` RESET (TTL)";
auto result = GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ class TTestHelper {
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS);
void DropTable(const TString& tableName);
void EnsureSecret(const TString& name, const TString& value);
void CreateTier(const TString& tierName);
TString CreateTieringRule(const TString& tierName, const TString& columnName);
void SetTiering(const TString& tableName, const TString& ruleName);
void SetTiering(const TString& tableName, const TString& tierName, const TString& columnName);
void ResetTiering(const TString& tableName);
void BulkUpsert(
const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
Expand Down
Loading

0 comments on commit b2da93a

Please sign in to comment.