Skip to content

Commit

Permalink
Add BSC/Console protocol (#12662)
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock authored Dec 27, 2024
1 parent f30662e commit 4081b1f
Show file tree
Hide file tree
Showing 27 changed files with 1,043 additions and 171 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWriteOltp
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWriteOltpNoSink
ydb/core/kqp/ut/yql KqpScripting.StreamExecuteYqlScriptScanOperationTmeoutBruteForce
ydb/core/mind/hive/ut THiveTest.TestHiveBalancerNodeRestarts
ydb/core/mind/hive/ut THiveTest.TestReassignUseRelativeSpace
ydb/core/persqueue/ut TPQTest.TestReadAndDeleteConsumer
ydb/core/persqueue/ut [*/*] chunk chunk
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,14 @@ struct TEvBlobStorage {
EvControllerGroupDecommittedNotify = 0x1003161e,
EvControllerGroupDecommittedResponse = 0x1003161f,
EvControllerGroupMetricsExchange = 0x10031620,
EvControllerProposeConfigRequest = 0x10031621,
EvControllerProposeConfigResponse = 0x10031622,
EvControllerConsoleCommitRequest = 0x10031623,
EvControllerConsoleCommitResponse = 0x10031624,
EvControllerValidateConfigRequest = 0x10031625,
EvControllerValidateConfigResponse = 0x10031626,
EvControllerReplaceConfigRequest = 0x10031627,
EvControllerReplaceConfigResponse = 0x10031628,

// BSC interface result section
EvControllerNodeServiceSetUpdate = 0x10031802,
Expand Down Expand Up @@ -2478,6 +2486,14 @@ struct TEvBlobStorage {
struct TEvControllerGroupDecommittedResponse;
struct TEvControllerGroupMetricsExchange;
struct TEvPutVDiskToReadOnly;
struct TEvControllerProposeConfigRequest;
struct TEvControllerProposeConfigResponse;
struct TEvControllerConsoleCommitRequest;
struct TEvControllerConsoleCommitResponse;
struct TEvControllerValidateConfigRequest;
struct TEvControllerValidateConfigResponse;
struct TEvControllerReplaceConfigRequest;
struct TEvControllerReplaceConfigResponse;

struct TEvMonStreamQuery;
struct TEvMonStreamActorDeathNote;
Expand Down
94 changes: 94 additions & 0 deletions ydb/core/blobstorage/base/blobstorage_console_events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once
#include "defs.h"

#include <ydb/core/base/blobstorage.h>
#include <ydb/core/protos/blobstorage.pb.h>

namespace NKikimr {

struct TEvBlobStorage::TEvControllerProposeConfigRequest : TEventPB<TEvBlobStorage::TEvControllerProposeConfigRequest,
NKikimrBlobStorage::TEvControllerProposeConfigRequest, TEvBlobStorage::EvControllerProposeConfigRequest> {
TEvControllerProposeConfigRequest() = default;

TEvControllerProposeConfigRequest(const ui32 configHash, const ui32 configVersion) {
Record.SetConfigHash(configHash);
Record.SetConfigVersion(configVersion);
}

TString ToString() const override {
TStringStream str;
str << "{TEvControllerProposeConfigRequest Record# " << Record.DebugString();
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvControllerProposeConfigResponse : TEventPB<TEvBlobStorage::TEvControllerProposeConfigResponse,
NKikimrBlobStorage::TEvControllerProposeConfigResponse, TEvBlobStorage::EvControllerProposeConfigResponse> {
TEvControllerProposeConfigResponse() = default;
};

struct TEvBlobStorage::TEvControllerConsoleCommitRequest : TEventPB<TEvBlobStorage::TEvControllerConsoleCommitRequest,
NKikimrBlobStorage::TEvControllerConsoleCommitRequest, TEvBlobStorage::EvControllerConsoleCommitRequest> {
TEvControllerConsoleCommitRequest() = default;

TEvControllerConsoleCommitRequest(const TString& yamlConfig) {
Record.SetYAML(yamlConfig);
}

TString ToString() const override {
TStringStream str;
str << "{TEvControllerConsoleCommitRequest Record# " << Record.DebugString();
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvControllerConsoleCommitResponse : TEventPB<TEvBlobStorage::TEvControllerConsoleCommitResponse,
NKikimrBlobStorage::TEvControllerConsoleCommitResponse, TEvBlobStorage::EvControllerConsoleCommitResponse> {
TEvControllerConsoleCommitResponse() = default;
};

struct TEvBlobStorage::TEvControllerValidateConfigRequest : TEventPB<TEvBlobStorage::TEvControllerValidateConfigRequest,
NKikimrBlobStorage::TEvControllerValidateConfigRequest, TEvBlobStorage::EvControllerValidateConfigRequest> {
TEvControllerValidateConfigRequest() = default;

TEvControllerValidateConfigRequest(const TString& yamlConfig) {
Record.SetYAML(yamlConfig);
}

TString ToString() const override {
TStringStream str;
str << "{TEvControllerValidateConfigRequest Record# " << Record.DebugString();
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvControllerValidateConfigResponse : TEventPB<TEvBlobStorage::TEvControllerValidateConfigResponse,
NKikimrBlobStorage::TEvControllerValidateConfigResponse, TEvBlobStorage::EvControllerValidateConfigResponse> {
TEvControllerValidateConfigResponse() = default;
};

struct TEvBlobStorage::TEvControllerReplaceConfigRequest : TEventPB<TEvBlobStorage::TEvControllerReplaceConfigRequest,
NKikimrBlobStorage::TEvControllerReplaceConfigRequest, TEvBlobStorage::EvControllerReplaceConfigRequest> {
TEvControllerReplaceConfigRequest() = default;

TEvControllerReplaceConfigRequest(const TString& yamlConfig) {
Record.SetYAML(yamlConfig);
}

TString ToString() const override {
TStringStream str;
str << "{TEvControllerReplaceConfigRequest Record# " << Record.DebugString();
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvControllerReplaceConfigResponse : TEventPB<TEvBlobStorage::TEvControllerReplaceConfigResponse,
NKikimrBlobStorage::TEvControllerReplaceConfigResponse, TEvBlobStorage::EvControllerReplaceConfigResponse> {
TEvControllerReplaceConfigResponse() = default;
};

}
5 changes: 5 additions & 0 deletions ydb/core/cms/console/console.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ void TConsole::ClearState()
Counters->ResetCounters();
}

void TConsole::ForwardFromPipe(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx) {
ev->Rewrite(ev->GetTypeRewrite(), ConfigsManager->SelfId());
ctx.Send(ev.Release());
}

void TConsole::ForwardToConfigsManager(TAutoPtr<IEventHandle> &ev, const TActorContext &ctx)
{
ctx.Forward(ev, ConfigsManager->SelfId());
Expand Down
103 changes: 28 additions & 75 deletions ydb/core/cms/console/console__replace_yaml_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,69 +64,20 @@ class TConfigsManager::TTxReplaceYamlConfig : public TTransactionBase<TConfigsMa
bool Execute(TTransactionContext &txc, const TActorContext &ctx) override
{
NIceDb::TNiceDb db(txc.DB);

TValidateConfigResult result = Self->ValidateConfigAndReplaceMetadata(Config, Force, AllowUnknownFields);
if (result.ErrorReason) {
HandleError(result.ErrorReason.value(), ctx);
return true;
}
try {
if (!Force) {
auto metadata = NYamlConfig::GetMetadata(Config);
Cluster = metadata.Cluster.value_or(TString("unknown"));
Version = metadata.Version.value_or(0);
} else {
Cluster = Self->ClusterName;
Version = Self->YamlVersion;
}

UpdatedConfig = NYamlConfig::ReplaceMetadata(Config, NYamlConfig::TMetadata{
.Version = Version + 1,
.Cluster = Cluster,
});

bool hasForbiddenUnknown = false;

TMap<TString, std::pair<TString, TString>> deprecatedFields;
TMap<TString, std::pair<TString, TString>> unknownFields;

if (UpdatedConfig != Self->YamlConfig || Self->YamlDropped) {
Modify = true;

auto tree = NFyaml::TDocument::Parse(UpdatedConfig);
auto resolved = NYamlConfig::ResolveAll(tree);

if (Self->ClusterName != Cluster) {
ythrow yexception() << "ClusterName mismatch";
}

if (Version != Self->YamlVersion) {
ythrow yexception() << "Version mismatch";
}

UnknownFieldsCollector = new NYamlConfig::TBasicUnknownFieldsCollector;
Version = result.Version;
UpdatedConfig = result.UpdatedConfig;
Cluster = result.Cluster;
Modify = result.Modify;

std::vector<TString> errors;
for (auto& [_, config] : resolved.Configs) {
auto cfg = NYamlConfig::YamlToProto(
config.second,
true,
true,
UnknownFieldsCollector);
NKikimr::NConfig::EValidationResult result = NKikimr::NConfig::ValidateConfig(cfg, errors);
if (result == NKikimr::NConfig::EValidationResult::Error) {
ythrow yexception() << errors.front();
}
}

const auto& deprecatedPaths = NKikimrConfig::TAppConfig::GetReservedChildrenPaths();

for (const auto& [path, info] : UnknownFieldsCollector->GetUnknownKeys()) {
if (deprecatedPaths.contains(path)) {
deprecatedFields[path] = info;
} else {
unknownFields[path] = info;
}
}

hasForbiddenUnknown = !unknownFields.empty() && !AllowUnknownFields;

if (!DryRun && !hasForbiddenUnknown) {
if (result.ValidationFinished) {
if (!DryRun && !result.HasForbiddenUnknown) {
DoInternalAudit(txc, ctx);

db.Table<Schema::YamlConfig>().Key(Version + 1)
Expand All @@ -143,13 +94,13 @@ class TConfigsManager::TTxReplaceYamlConfig : public TTransactionBase<TConfigsMa
}

auto fillResponse = [&](auto& ev, auto errorLevel){
for (auto& [path, info] : unknownFields) {
for (auto& [path, info] : result.UnknownFields) {
auto *issue = ev->Record.AddIssues();
issue->set_severity(errorLevel);
issue->set_message(TStringBuilder{} << "Unknown key# " << info.first << " in proto# " << info.second << " found in path# " << path);
}

for (auto& [path, info] : deprecatedFields) {
for (auto& [path, info] : result.DeprecatedFields) {
auto *issue = ev->Record.AddIssues();
issue->set_severity(NYql::TSeverityIds::S_WARNING);
issue->set_message(TStringBuilder{} << "Deprecated key# " << info.first << " in proto# " << info.second << " found in path# " << path);
Expand All @@ -158,8 +109,7 @@ class TConfigsManager::TTxReplaceYamlConfig : public TTransactionBase<TConfigsMa
Response = MakeHolder<NActors::IEventHandle>(Sender, ctx.SelfID, ev.Release());
};


if (hasForbiddenUnknown) {
if (result.HasForbiddenUnknown) {
Error = true;
auto ev = MakeHolder<TEvConsole::TEvGenericError>();
ev->Record.SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
Expand All @@ -172,18 +122,10 @@ class TConfigsManager::TTxReplaceYamlConfig : public TTransactionBase<TConfigsMa
auto ev = MakeHolder<TEvConsole::TEvSetYamlConfigResponse>();
fillResponse(ev, NYql::TSeverityIds::S_WARNING);
}
} catch (const yexception& ex) {
Error = true;

auto ev = MakeHolder<TEvConsole::TEvGenericError>();
ev->Record.SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
auto *issue = ev->Record.AddIssues();
issue->set_severity(NYql::TSeverityIds::S_ERROR);
issue->set_message(ex.what());
ErrorReason = ex.what();
Response = MakeHolder<NActors::IEventHandle>(Sender, ctx.SelfID, ev.Release());
}

catch (const yexception& ex) {
HandleError(ex.what(), ctx);
}
return true;
}

Expand Down Expand Up @@ -225,6 +167,17 @@ class TConfigsManager::TTxReplaceYamlConfig : public TTransactionBase<TConfigsMa
Self->TxProcessor->TxCompleted(this, ctx);
}

void HandleError(const TString& error, const TActorContext& ctx) {
Error = true;
auto ev = MakeHolder<TEvConsole::TEvGenericError>();
ev->Record.SetYdbStatus(Ydb::StatusIds::BAD_REQUEST);
auto *issue = ev->Record.AddIssues();
issue->set_severity(NYql::TSeverityIds::S_ERROR);
issue->set_message(error);
ErrorReason = error;
Response = MakeHolder<NActors::IEventHandle>(Sender, ctx.SelfID, ev.Release());
}

private:
const TString Config;
const TString Peer;
Expand Down
76 changes: 76 additions & 0 deletions ydb/core/cms/console/console_configs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "http.h"

#include <ydb/core/cms/console/validators/registry.h>
#include <ydb/core/config/validation/validators.h>
#include <ydb/core/base/feature_flags.h>

#include <ydb/library/yaml_config/yaml_config.h>
Expand Down Expand Up @@ -67,6 +68,72 @@ bool TConfigsManager::CheckConfig(const NKikimrConsole::TConfigsConfig &config,
return true;
}

TConfigsManager::TValidateConfigResult TConfigsManager::ValidateConfigAndReplaceMetadata(const TString &config, bool force, bool allowUnknownFields) {
TValidateConfigResult result;
try {
if (!force) {
auto metadata = NYamlConfig::GetMetadata(config);
result.Cluster = metadata.Cluster.value_or(TString("unknown"));
result.Version = metadata.Version.value_or(0);
} else {
result.Cluster = ClusterName;
result.Version = YamlVersion;
}

result.UpdatedConfig = NYamlConfig::ReplaceMetadata(config, NYamlConfig::TMetadata{
.Version = result.Version + 1,
.Cluster = result.Cluster,
});

result.HasForbiddenUnknown = false;
if (result.UpdatedConfig != YamlConfig || YamlDropped) {
result.Modify = true;

auto tree = NFyaml::TDocument::Parse(result.UpdatedConfig);
auto resolved = NYamlConfig::ResolveAll(tree);

if (ClusterName != result.Cluster) {
ythrow yexception() << "ClusterName mismatch";
}

if (result.Version != YamlVersion) {
ythrow yexception() << "Version mismatch";
}

TSimpleSharedPtr<NYamlConfig::TBasicUnknownFieldsCollector> unknownFieldsCollector = new NYamlConfig::TBasicUnknownFieldsCollector;

std::vector<TString> errors;
for (auto& [_, config] : resolved.Configs) {
auto cfg = NYamlConfig::YamlToProto(
config.second,
true,
true,
unknownFieldsCollector);
NKikimr::NConfig::EValidationResult result = NKikimr::NConfig::ValidateConfig(cfg, errors);
if (result == NKikimr::NConfig::EValidationResult::Error) {
ythrow yexception() << errors.front();
}
}

const auto& deprecatedPaths = NKikimrConfig::TAppConfig::GetReservedChildrenPaths();

for (const auto& [path, info] : unknownFieldsCollector->GetUnknownKeys()) {
if (deprecatedPaths.contains(path)) {
result.DeprecatedFields[path] = info;
} else {
result.UnknownFields[path] = info;
}
}

result.HasForbiddenUnknown = !result.UnknownFields.empty() && !allowUnknownFields;
result.ValidationFinished = true;
}
} catch (const yexception &e) {
result.ErrorReason = e.what();
}
return result;
}

void TConfigsManager::Bootstrap(const TActorContext &ctx)
{
LOG_DEBUG(ctx, NKikimrServices::CMS_CONFIGS, "TConfigsManager::Bootstrap");
Expand Down Expand Up @@ -99,6 +166,7 @@ void TConfigsManager::Handle(TEvConsole::TEvConfigNotificationRequest::TPtr &ev,
void TConfigsManager::Detach()
{
Send(ConfigsProvider, new TEvents::TEvPoisonPill);
Send(CommitActor, new TEvents::TEvPoisonPill);
PassAway();
}

Expand Down Expand Up @@ -998,4 +1066,12 @@ void TConfigsManager::HandleUnauthorized(TEvConsole::TEvSetYamlConfigRequest::TP
/* success = */ false);
}

void TConfigsManager::SendInReply(const TActorId& sender, const TActorId& icSession, std::unique_ptr<IEventBase> ev, ui64 cookie) {
auto h = std::make_unique<IEventHandle>(sender, SelfId(), ev.release(), 0, cookie);
if (icSession) {
h->Rewrite(TEvInterconnect::EvForward, icSession);
}
TActivationContext::Send(h.release());
}

} // namespace NKikimr::NConsole
Loading

0 comments on commit 4081b1f

Please sign in to comment.