Skip to content

Commit

Permalink
Backup changefeed configurations (tools dump) (#12283)
Browse files Browse the repository at this point in the history
Co-authored-by: Stanislav Shchetinin <[email protected]>
  • Loading branch information
stanislav-shchetinin and Stanislav Shchetinin authored Dec 12, 2024
1 parent d779ea8 commit 9c831b2
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 5 deletions.
36 changes: 35 additions & 1 deletion ydb/library/backup/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
#include <ydb/public/lib/ydb_cli/dump/files/files.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>
Expand Down Expand Up @@ -475,6 +478,36 @@ void BackupPermissions(TDriver driver, const TString& dbPrefix, const TString& p
WriteProtoToFile(proto, folderPath, NDump::NFiles::Permissions());
}

Ydb::Table::ChangefeedDescription ProtoFromChangefeedDesc(const NTable::TChangefeedDescription& changefeedDesc) {
Ydb::Table::ChangefeedDescription protoChangeFeedDesc;
changefeedDesc.SerializeTo(protoChangeFeedDesc);
return protoChangeFeedDesc;
}

NTopic::TDescribeTopicResult DescribeTopic(TDriver driver, const TString& path) {
NYdb::NTopic::TTopicClient client(driver);
return NConsoleClient::RetryFunction([&]() {
return client.DescribeTopic(path).GetValueSync();
});
}

void BackupChangefeeds(TDriver driver, const TString& tablePath, const TFsPath& folderPath) {
auto desc = DescribeTable(driver, tablePath);

for (const auto& changefeedDesc : desc.GetChangefeedDescriptions()) {
TFsPath changefeedDirPath = CreateDirectory(folderPath, changefeedDesc.GetName());

auto protoChangeFeedDesc = ProtoFromChangefeedDesc(changefeedDesc);
const auto descTopicResult = DescribeTopic(driver, JoinDatabasePath(tablePath, changefeedDesc.GetName()));
VerifyStatus(descTopicResult);
const auto& topicDescription = descTopicResult.GetTopicDescription();
const auto protoTopicDescription = NYdb::TProtoAccessor::GetProto(topicDescription);

WriteProtoToFile(protoChangeFeedDesc, changefeedDirPath, NDump::NFiles::Changefeed());
WriteProtoToFile(protoTopicDescription, changefeedDirPath, NDump::NFiles::Topic());
}
}

void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupPrefix, const TString& path,
const TFsPath& folderPath, bool schemaOnly, bool preservePoolKinds, bool ordered) {
Y_ENSURE(!path.empty());
Expand All @@ -486,8 +519,9 @@ void BackupTable(TDriver driver, const TString& dbPrefix, const TString& backupP

auto desc = DescribeTable(driver, fullPath);
auto proto = ProtoFromTableDescription(desc, preservePoolKinds);

WriteProtoToFile(proto, folderPath, NDump::NFiles::TableScheme());

BackupChangefeeds(driver, JoinDatabasePath(dbPrefix, path), folderPath);
BackupPermissions(driver, dbPrefix, path, folderPath);

if (!schemaOnly) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/backup/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ PEERDIR(
ydb/public/lib/yson_value
ydb/public/lib/ydb_cli/dump/files
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_proto
ydb/public/sdk/cpp/client/ydb_result
ydb/public/sdk/cpp/client/ydb_table
ydb/public/sdk/cpp/client/ydb_topic
ydb/public/sdk/cpp/client/ydb_value
)

Expand Down
31 changes: 27 additions & 4 deletions ydb/public/sdk/cpp/client/ydb_table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2816,10 +2816,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
return ret;
}

void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
template <typename TProto>
void TChangefeedDescription::SerializeCommonFields(TProto& proto) const {
proto.set_name(Name_);
proto.set_virtual_timestamps(VirtualTimestamps_);
proto.set_initial_scan(InitialScan_);
proto.set_aws_region(AwsRegion_);

switch (Mode_) {
Expand Down Expand Up @@ -2860,12 +2860,35 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval());
}

for (const auto& [key, value] : Attributes_) {
(*proto.mutable_attributes())[key] = value;
}
}

void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
SerializeCommonFields(proto);
proto.set_initial_scan(InitialScan_);

if (RetentionPeriod_) {
SetDuration(*RetentionPeriod_, *proto.mutable_retention_period());
}
}

for (const auto& [key, value] : Attributes_) {
(*proto.mutable_attributes())[key] = value;
void TChangefeedDescription::SerializeTo(Ydb::Table::ChangefeedDescription& proto) const {
SerializeCommonFields(proto);

switch (State_) {
case EChangefeedState::Enabled:
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_ENABLED);
break;
case EChangefeedState::Disabled:
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_DISABLED);
break;
case EChangefeedState::InitialScan:
proto.set_state(Ydb::Table::ChangefeedDescription_State::ChangefeedDescription_State_STATE_INITIAL_SCAN);
break;
case EChangefeedState::Unknown:
break;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ class TChangefeedDescription {
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;

void SerializeTo(Ydb::Table::Changefeed& proto) const;
void SerializeTo(Ydb::Table::ChangefeedDescription& proto) const;
TString ToString() const;
void Out(IOutputStream& o) const;

Expand All @@ -399,6 +400,9 @@ class TChangefeedDescription {
template <typename TProto>
static TChangefeedDescription FromProto(const TProto& proto);

template <typename TProto>
void SerializeCommonFields(TProto& proto) const;

private:
TString Name_;
EChangefeedMode Mode_;
Expand Down

0 comments on commit 9c831b2

Please sign in to comment.