From 70cbc40a28eef8dbb83500722908cf08efa7f2c1 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Wed, 18 Dec 2024 13:17:39 +0000 Subject: [PATCH 01/11] Add export checksums --- ydb/core/protos/feature_flags.proto | 1 + ydb/core/protos/flat_scheme_op.proto | 1 + ydb/core/testlib/basics/feature_flags.h | 1 + .../tx/datashard/backup_restore_common.cpp | 6 +- ydb/core/tx/datashard/backup_restore_common.h | 2 + ydb/core/tx/datashard/backup_restore_traits.h | 4 + ydb/core/tx/datashard/export_checksum.cpp | 27 ++++ ydb/core/tx/datashard/export_checksum.h | 21 +++ ydb/core/tx/datashard/export_s3.h | 5 +- ydb/core/tx/datashard/export_s3_buffer.h | 4 +- .../tx/datashard/export_s3_buffer_raw.cpp | 25 +++- ydb/core/tx/datashard/export_s3_buffer_raw.h | 5 +- .../tx/datashard/export_s3_buffer_zstd.cpp | 11 +- ydb/core/tx/datashard/export_s3_uploader.cpp | 138 ++++++++++++++++-- ydb/core/tx/datashard/export_scan.h | 7 +- .../tx/datashard/extstorage_usage_config.h | 6 + ydb/core/tx/datashard/ya.make | 2 + ydb/core/tx/schemeshard/schemeshard__init.cpp | 1 + .../tx/schemeshard/schemeshard_export.cpp | 3 +- .../schemeshard_export__create.cpp | 2 +- .../schemeshard_export_flow_proposals.cpp | 2 + .../tx/schemeshard/schemeshard_info_types.h | 2 + ydb/core/tx/schemeshard/schemeshard_schema.h | 5 +- .../tx/schemeshard/ut_export/ut_export.cpp | 50 +++++++ .../tx/schemeshard/ut_helpers/test_env.cpp | 1 + 25 files changed, 298 insertions(+), 34 deletions(-) create mode 100644 ydb/core/tx/datashard/export_checksum.cpp create mode 100644 ydb/core/tx/datashard/export_checksum.h diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 1f8a70465871..fd7947080aca 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -184,4 +184,5 @@ message TFeatureFlags { optional bool EnableDataShardInMemoryStateMigration = 159 [default = true]; optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false]; optional bool DisableLocalDBEraseCache = 161 [default = false]; + optional bool EnableExportChecksums = 162 [default = false]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 4c52ff262e8d..e412b2b1f63a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1246,6 +1246,7 @@ message TBackupTask { optional uint64 SnapshotStep = 14; optional uint64 SnapshotTxId = 15; + optional bool EnableChecksums = 16; // currently available for s3 } message TRestoreTask { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 5124e9ed3fa1..25df60794aaa 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -71,6 +71,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableParameterizedDecimal) FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC) FEATURE_FLAG_SETTER(EnableFollowerStats) + FEATURE_FLAG_SETTER(EnableExportChecksums) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/datashard/backup_restore_common.cpp b/ydb/core/tx/datashard/backup_restore_common.cpp index dad590c13d56..5daaf26e22f1 100644 --- a/ydb/core/tx/datashard/backup_restore_common.cpp +++ b/ydb/core/tx/datashard/backup_restore_common.cpp @@ -6,9 +6,13 @@ void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) { FullBackups.emplace(fb->SnapshotVts, fb); } +void TMetadata::SetVersion(ui64 version) { + Version = version; +} + TString TMetadata::Serialize() const { NJson::TJsonMap m; - m["version"] = 0; + m["version"] = Version; NJson::TJsonArray fullBackups; for (auto &[tp, _] : FullBackups) { NJson::TJsonMap backupMap; diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h index 61da3f7e5350..254a6ed21004 100644 --- a/ydb/core/tx/datashard/backup_restore_common.h +++ b/ydb/core/tx/datashard/backup_restore_common.h @@ -175,6 +175,7 @@ class TMetadata { void AddFullBackup(TFullBackupMetadata::TPtr fullBackup); void AddLog(TLogMetadata::TPtr log); void SetConsistencyKey(const TString& key); + void SetVersion(ui64 version); TString Serialize() const; static TMetadata Deserialize(const TString& metadata); @@ -182,6 +183,7 @@ class TMetadata { TString ConsistencyKey; TMap FullBackups; TMap Logs; + ui64 Version; }; } // NBackupRestore diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h index 036d5a6ea514..69c27454d613 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.h +++ b/ydb/core/tx/datashard/backup_restore_traits.h @@ -47,6 +47,10 @@ inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str()); } +inline TString ChecksumKey(const TString& objKey) { + return Sprintf("%s.sha256", objKey.c_str()); +} + } // NBackupRestoreTraits } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/tx/datashard/export_checksum.cpp new file mode 100644 index 000000000000..676033de5526 --- /dev/null +++ b/ydb/core/tx/datashard/export_checksum.cpp @@ -0,0 +1,27 @@ +#include "export_checksum.h" + +#include + +namespace NKikimr::NDataShard { + +TExportChecksum::TExportChecksum() { + SHA256_Init(&Context); +} + +void TExportChecksum::AddData(const char* data, size_t dataSize) { + SHA256_Update(&Context, data, dataSize); +} + +TString TExportChecksum::Serialize() { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_Final(hash, &Context); + return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH)); +} + +TString TExportChecksum::Compute(const char* data, size_t dataSize) { + TExportChecksum checksum; + checksum.AddData(data, dataSize); + return checksum.Serialize(); +} + +} // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_checksum.h b/ydb/core/tx/datashard/export_checksum.h new file mode 100644 index 000000000000..7b4f4dd1b2f6 --- /dev/null +++ b/ydb/core/tx/datashard/export_checksum.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +#include + +namespace NKikimr::NDataShard { + +class TExportChecksum { +public: + TExportChecksum(); + + void AddData(const char* data, size_t dataSize); + TString Serialize(); + + static TString Compute(const char* data, size_t dataSize); +private: + SHA256_CTX Context; +}; + +} // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h index 3e3a651c8f53..cf21cd2e90df 100644 --- a/ydb/core/tx/datashard/export_s3.h +++ b/ydb/core/tx/datashard/export_s3.h @@ -31,9 +31,10 @@ class TS3Export: public IExport { switch (CodecFromTask(Task)) { case ECompressionCodec::None: - return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes); + return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes, Task.GetEnableChecksums()); case ECompressionCodec::Zstd: - return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes); + return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, + maxBytes, minBytes, Task.GetEnableChecksums()); case ECompressionCodec::Invalid: Y_ABORT("unreachable"); } diff --git a/ydb/core/tx/datashard/export_s3_buffer.h b/ydb/core/tx/datashard/export_s3_buffer.h index 2d332a561d73..130e0ba7c968 100644 --- a/ydb/core/tx/datashard/export_s3_buffer.h +++ b/ydb/core/tx/datashard/export_s3_buffer.h @@ -9,10 +9,10 @@ namespace NKikimr { namespace NDataShard { NExportScan::IBuffer* CreateS3ExportBufferRaw( - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes); + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, bool enableChecksums); NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel, - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes); + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, bool enableChecksums); } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp index e0647ee70a8c..dcb8ad2ac122 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp @@ -15,12 +15,13 @@ namespace NKikimr { namespace NDataShard { -TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit) +TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums) : Columns(columns) , RowsLimit(rowsLimit) , BytesLimit(bytesLimit) , Rows(0) , BytesRead(0) + , EnableChecksums(enableChecksums) { } @@ -154,7 +155,16 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) { bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) { TBufferOutput out(Buffer); ErrorString.clear(); - return Collect(row, out); + + size_t startSize = Buffer.Size(); + if (!Collect(row, out)) { + return false; + } + + if (EnableChecksums) { + Checksum.AddData(Buffer.Data() + startSize, Buffer.Size() - startSize); + } + return true; } IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) { @@ -167,7 +177,12 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& } stats.BytesSent = buffer->Size(); - return new TEvExportScan::TEvBuffer(std::move(*buffer), last); + + if (EnableChecksums && last) { + return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum.Serialize()); + } else { + return new TEvExportScan::TEvBuffer(std::move(*buffer), last); + } } void TS3BufferRaw::Clear() { @@ -189,9 +204,9 @@ TMaybe TS3BufferRaw::Flush(bool) { } NExportScan::IBuffer* CreateS3ExportBufferRaw( - const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums) { - return new TS3BufferRaw(columns, rowsLimit, bytesLimit); + return new TS3BufferRaw(columns, rowsLimit, bytesLimit, enableChecksums); } } // NDataShard diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h index 3573f4b6d597..230a5f2fc300 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.h +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h @@ -14,7 +14,7 @@ class TS3BufferRaw: public NExportScan::IBuffer { using TTagToIndex = THashMap; // index in IScan::TRow public: - explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit); + explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums); void ColumnsOrder(const TVector& tags) override; bool Collect(const NTable::IScan::TRow& row) override; @@ -42,6 +42,9 @@ class TS3BufferRaw: public NExportScan::IBuffer { ui64 BytesRead; TBuffer Buffer; + bool EnableChecksums; + TExportChecksum Checksum; + TString ErrorString; }; // TS3BufferRaw diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp index 7588d3b7a58b..17d6b0e59606 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp @@ -49,8 +49,9 @@ class TS3BufferZstd: public TS3BufferRaw { public: explicit TS3BufferZstd(int compressionLevel, - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes) - : TS3BufferRaw(columns, maxRows, maxBytes) + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, + bool enableChecksums) + : TS3BufferRaw(columns, maxRows, maxBytes, enableChecksums) , CompressionLevel(compressionLevel) , MinBytes(minBytes) , Context(ZSTD_createCCtx()) @@ -67,6 +68,7 @@ class TS3BufferZstd: public TS3BufferRaw { return false; } + Checksum.AddData(BufferRaw.data(), BufferRaw.size()); BytesRaw += BufferRaw.size(); auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0}; @@ -122,9 +124,10 @@ class TS3BufferZstd: public TS3BufferRaw { }; // TS3BufferZstd NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel, - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes) + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, + bool enableChecksums) { - return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes); + return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes, enableChecksums); } } // NDataShard diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 9acd52bc7b81..52f6ab3c1846 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -31,6 +31,8 @@ namespace NKikimr { namespace NDataShard { +using namespace NBackupRestoreTraits; + class TS3Uploader: public TActorBootstrapped { using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig; @@ -180,6 +182,9 @@ class TS3Uploader: public TActorBootstrapped { } google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); + if (EnableChecksums) { + SchemeChecksum = TExportChecksum::Compute(Buffer.data(), Buffer.size()); + } auto request = Aws::S3::Model::PutObjectRequest() .WithKey(Settings.GetSchemeKey()); @@ -196,6 +201,9 @@ class TS3Uploader: public TActorBootstrapped { } google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer); + if (EnableChecksums) { + PermissionsChecksum = TExportChecksum::Compute(Buffer.data(), Buffer.size()); + } auto request = Aws::S3::Model::PutObjectRequest() .WithKey(Settings.GetPermissionsKey()); @@ -208,6 +216,9 @@ class TS3Uploader: public TActorBootstrapped { Y_ABORT_UNLESS(!MetadataUploaded); Buffer = std::move(Metadata); + if (EnableChecksums) { + MetadataChecksum = TExportChecksum::Compute(Buffer.data(), Buffer.size()); + } auto request = Aws::S3::Model::PutObjectRequest() .WithKey(Settings.GetMetadataKey()); @@ -216,6 +227,21 @@ class TS3Uploader: public TActorBootstrapped { this->Become(&TThis::StateUploadMetadata); } + void UploadChecksum(TString&& checksum, const TString& key, const TString& relativeObjKey, + std::function checksumUploadedCallback) + { + // make it verifiable from sha256sum CLI + checksum += " "; + checksum += relativeObjKey; + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(key); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(checksum))); + + ChecksumUploadedCallback = checksumUploadedCallback; + this->Become(&TThis::StateUploadChecksum); + } + void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { const auto& result = ev->Get()->Result; @@ -227,13 +253,21 @@ class TS3Uploader: public TActorBootstrapped { return; } - SchemeUploaded = true; + auto nextStep = [this](){ + SchemeUploaded = true; - if (Scanner) { - this->Send(Scanner, new TEvExportScan::TEvFeed()); - } + if (Scanner) { + this->Send(Scanner, new TEvExportScan::TEvFeed()); + } + this->Become(&TThis::StateUploadData); + }; - this->Become(&TThis::StateUploadData); + if (EnableChecksums) { + TString relativeObjKey = SchemeKey(""); + UploadChecksum(std::move(SchemeChecksum), ChecksumKey(Settings.GetSchemeKey()), relativeObjKey, nextStep); + } else { + nextStep(); + } } void HandlePermissions(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { @@ -247,9 +281,21 @@ class TS3Uploader: public TActorBootstrapped { return; } + auto nextStep = [this](){ + PermissionsUploaded = true; PermissionsUploaded = true; - UploadScheme(); + PermissionsUploaded = true; + + UploadScheme(); + }; + + if (EnableChecksums) { + TString relativeObjKey = PermissionsKey(""); + UploadChecksum(std::move(PermissionsChecksum), ChecksumKey(Settings.GetPermissionsKey()), relativeObjKey, nextStep); + } else { + nextStep(); + } } void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { @@ -263,9 +309,31 @@ class TS3Uploader: public TActorBootstrapped { return; } - MetadataUploaded = true; + auto nextStep = [this](){ + MetadataUploaded = true; + UploadPermissions(); + }; + + if (EnableChecksums) { + TString relativeObjKey = MetadataKey(""); + UploadChecksum(std::move(MetadataChecksum), ChecksumKey(Settings.GetMetadataKey()), relativeObjKey, nextStep); + } else { + nextStep(); + } + } + + void HandleChecksum(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("HandleChecksum TEvExternalStorage::TEvPutObjectResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("PutObject (checksum)"))) { + return; + } - UploadPermissions(); + ChecksumUploadedCallback(); } void Handle(TEvExportScan::TEvReady::TPtr& ev) { @@ -301,6 +369,7 @@ class TS3Uploader: public TActorBootstrapped { Last = ev->Get()->Last; MultiPart = MultiPart || !Last; ev->Get()->Buffer.AsString(Buffer); + DataChecksum = std::move(ev->Get()->Checksum); UploadData(); } @@ -335,7 +404,18 @@ class TS3Uploader: public TActorBootstrapped { return; } - Finish(); + auto nextStep = [this](){ + Finish(); + }; + + if (EnableChecksums) { + // checksum is always calculated before compression + TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); + TString relativeObjKey = Settings.GetRelativeDataKey(DataFormat, ECompressionCodec::None); + UploadChecksum(std::move(DataChecksum), checksumKey, relativeObjKey, nextStep); + } else { + nextStep(); + } } void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) { @@ -418,7 +498,18 @@ class TS3Uploader: public TActorBootstrapped { Parts.push_back(result.GetResult().GetETag().c_str()); if (Last) { - return Finish(); + auto nextStep = [this](){ + Finish(); + }; + + if (EnableChecksums) { + // checksum is always calculated before compression + TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); + TString relativeObjKey = Settings.GetRelativeDataKey(DataFormat, ECompressionCodec::None); + return UploadChecksum(std::move(DataChecksum), checksumKey, relativeObjKey, nextStep); + } else { + return nextStep(); + } } this->Send(Scanner, new TEvExportScan::TEvFeed()); @@ -576,8 +667,8 @@ class TS3Uploader: public TActorBootstrapped { TString&& metadata) : ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings())) , Settings(TS3Settings::FromBackupTask(task)) - , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) - , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task)) + , DataFormat(EDataFormat::Csv) + , CompressionCodec(CodecFromTask(task)) , HttpResolverConfig(GetHttpResolverConfig(*GetS3StorageConfig())) , DataShard(dataShard) , TxId(txId) @@ -590,6 +681,7 @@ class TS3Uploader: public TActorBootstrapped { , SchemeUploaded(task.GetShardNum() == 0 ? false : true) , MetadataUploaded(task.GetShardNum() == 0 ? false : true) , PermissionsUploaded(task.GetShardNum() == 0 ? false : true) + , EnableChecksums(task.GetEnableChecksums()) { } @@ -647,6 +739,14 @@ class TS3Uploader: public TActorBootstrapped { } } + STATEFN(StateUploadChecksum) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleChecksum); + default: + return StateBase(ev); + } + } + STATEFN(StateUploadData) { switch (ev->GetTypeRewrite()) { hFunc(TEvBuffer, Handle); @@ -665,8 +765,8 @@ class TS3Uploader: public TActorBootstrapped { private: NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; TS3Settings Settings; - const NBackupRestoreTraits::EDataFormat DataFormat; - const NBackupRestoreTraits::ECompressionCodec CompressionCodec; + const EDataFormat DataFormat; + const ECompressionCodec CompressionCodec; bool ProxyResolved; TMaybe HttpResolverConfig; @@ -698,6 +798,13 @@ class TS3Uploader: public TActorBootstrapped { TVector Parts; TMaybe Error; + bool EnableChecksums; + TString DataChecksum; + TString MetadataChecksum; + TString SchemeChecksum; + TString PermissionsChecksum; + std::function ChecksumUploadedCallback; + }; // TS3Uploader IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { @@ -710,9 +817,10 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { : Nothing(); NBackupRestore::TMetadata metadata; + metadata.SetVersion(Task.GetEnableChecksums() ? 1 : 0); NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{ - .SnapshotVts = NBackupRestore::TVirtualTimestamp( + .SnapshotVts = NBackup::TVirtualTimestamp( Task.GetSnapshotStep(), Task.GetSnapshotTxId()) }; diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h index 1873795255ef..8c723448770d 100644 --- a/ydb/core/tx/datashard/export_scan.h +++ b/ydb/core/tx/datashard/export_scan.h @@ -2,6 +2,8 @@ #include "defs.h" +#include "export_checksum.h" + #include #include @@ -32,18 +34,21 @@ struct TEvExportScan { struct TEvBuffer: public TEventLocal, EvBuffer> { TBuffer Buffer; bool Last; + TString Checksum; TEvBuffer() = default; - explicit TEvBuffer(TBuffer&& buffer, bool last) + explicit TEvBuffer(TBuffer&& buffer, bool last, TString&& checksum = "") : Buffer(std::move(buffer)) , Last(last) + , Checksum(std::move(checksum)) { } TString ToString() const override { return TStringBuilder() << this->ToStringHeader() << " {" << " Last: " << Last + << " Checksum:" << Checksum << " }"; } }; diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h index 873e1b04ac43..c4f261184e73 100644 --- a/ydb/core/tx/datashard/extstorage_usage_config.h +++ b/ydb/core/tx/datashard/extstorage_usage_config.h @@ -60,6 +60,12 @@ class TS3Settings { return NBackupRestoreTraits::DataKey(ObjectKeyPattern, Shard, format, codec); } + inline TString GetRelativeDataKey( + NBackupRestoreTraits::EDataFormat format, + NBackupRestoreTraits::ECompressionCodec codec) const { + return NBackupRestoreTraits::DataKey("", Shard, format, codec); + } + }; // TS3Settings } diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 3ec752d2898c..9dfa6bc84a75 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -152,6 +152,7 @@ SRCS( execution_unit.h execution_unit_ctors.h execution_unit_kind.h + export_checksum.cpp export_common.cpp export_iface.cpp export_iface.h @@ -254,6 +255,7 @@ PEERDIR( library/cpp/l1_distance library/cpp/l2_distance ydb/core/actorlib_impl + ydb/core/backup/common ydb/core/base ydb/core/change_exchange ydb/core/engine diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 10db6d9c49f6..aa07665ed936 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4279,6 +4279,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { exportInfo->StartTime = TInstant::Seconds(rowset.GetValueOrDefault()); exportInfo->EndTime = TInstant::Seconds(rowset.GetValueOrDefault()); + exportInfo->EnableChecksums = rowset.GetValueOrDefault(false); Self->Exports[id] = exportInfo; if (uid) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index a8ae8e67d9a6..ea6f881ee39a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -153,7 +153,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T NIceDb::TUpdate(exportInfo->Settings), NIceDb::TUpdate(exportInfo->DomainPathId.OwnerId), NIceDb::TUpdate(exportInfo->DomainPathId.LocalPathId), - NIceDb::TUpdate(exportInfo->Items.size()) + NIceDb::TUpdate(exportInfo->Items.size()), + NIceDb::TUpdate(exportInfo->EnableChecksums) ); if (exportInfo->UserSID) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index f65eabb43f89..7f63a60a7970 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -118,7 +118,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName()); - + exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableExportChecksums(); TString explain; if (!FillItems(exportInfo, settings, explain)) { return Reply( diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index d6fca52aecaf..ce03d1669a91 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -193,6 +193,8 @@ THolder BackupPropose( if (const auto compression = exportSettings.compression()) { Y_ABORT_UNLESS(FillCompression(*task.MutableCompression(), compression)); } + + task.SetEnableChecksums(exportInfo->EnableChecksums); } break; } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index f265463bc98a..765a40771588 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2709,6 +2709,8 @@ struct TExportInfo: public TSimpleRefCount { TInstant StartTime = TInstant::Zero(); TInstant EndTime = TInstant::Zero(); + bool EnableChecksums = false; + explicit TExportInfo( const ui64 id, const TString& uid, diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 632616f6fc9a..76d29549ea33 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1175,6 +1175,8 @@ struct Schema : NIceDb::Schema { struct EndTime : Column<15, NScheme::NTypeIds::Uint64> {}; struct PeerName : Column<16, NScheme::NTypeIds::Utf8> {}; + struct EnableChecksums : Column<17, NScheme::NTypeIds::Bool> {}; + using TKey = TableKey; using TColumns = TableColumns< Id, @@ -1192,7 +1194,8 @@ struct Schema : NIceDb::Schema { UserSID, StartTime, EndTime, - PeerName + PeerName, + EnableChecksums >; }; diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 47cfca6c2ed0..72f167836bf5 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -2346,4 +2346,54 @@ partitioning_settings { } )")); } + + Y_UNIT_TEST(Checksums) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port)); + env.TestWaitNotification(runtime, txId); + + const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + UNIT_ASSERT(dataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "eec941c7 data_00.csv"); + + const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); + UNIT_ASSERT(metadataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "f71ad64a metadata.json"); + + const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); + UNIT_ASSERT(schemeChecksum); + UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "bbc132af scheme.pb"); + + const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256"); + UNIT_ASSERT(permissionsChecksum); + UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "41935b38 permissions.pb"); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 16077cc065b7..e785ed85e089 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -548,6 +548,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableParameterizedDecimal(opts.EnableParameterizedDecimal_); app.SetEnableTopicAutopartitioningForCDC(opts.EnableTopicAutopartitioningForCDC_); app.SetEnableBackupService(opts.EnableBackupService_); + app.SetEnableExportChecksums(true); app.ColumnShardConfig.SetDisabledOnSchemeShard(false); From 63a89aa7699fcbca285c3d92f5c2f7d178cc0250 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 05:01:51 +0000 Subject: [PATCH 02/11] Add checksum interface --- .../tx/datashard/backup_restore_traits.cpp | 21 +++++++ ydb/core/tx/datashard/backup_restore_traits.h | 24 ++----- ydb/core/tx/datashard/export_checksum.cpp | 38 ++++++----- ydb/core/tx/datashard/export_checksum.h | 16 ++--- .../tx/datashard/export_s3_buffer_raw.cpp | 8 ++- ydb/core/tx/datashard/export_s3_buffer_raw.h | 2 +- .../tx/datashard/export_s3_buffer_zstd.cpp | 4 +- ydb/core/tx/datashard/export_s3_uploader.cpp | 35 +++++------ .../tx/datashard/extstorage_usage_config.h | 12 ++-- .../tx/schemeshard/ut_export/ut_export.cpp | 63 +++++++++++++++++-- 10 files changed, 150 insertions(+), 73 deletions(-) diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp index a55f5681d93b..9d4be0bcd5a4 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.cpp +++ b/ydb/core/tx/datashard/backup_restore_traits.cpp @@ -72,6 +72,27 @@ TString DataFileExtension(EDataFormat format, ECompressionCodec codec) { return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str()); } +TString PermissionsFile() { + return "permissions.pb"; +} + +TString SchemeFile() { + return "scheme.pb"; +} + +TString MetadataFile() { + return "metadata.json"; +} + +TString DataFile(ui32 n, EDataFormat format, ECompressionCodec codec) { + const auto ext = DataFileExtension(format, codec); + return Sprintf("data_%02d%s", n, ext.c_str()); +} + +TString ChecksumKey(const TString& objKey) { + return objKey + ".sha256"; +} + } // NBackupRestoreTraits } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h index 69c27454d613..6f709b925a9d 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.h +++ b/ydb/core/tx/datashard/backup_restore_traits.h @@ -30,26 +30,12 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur); TString DataFileExtension(EDataFormat format, ECompressionCodec codec); -inline TString SchemeKey(const TString& objKeyPattern) { - return Sprintf("%s/scheme.pb", objKeyPattern.c_str()); -} - -inline TString PermissionsKey(const TString& objKeyPattern) { - return Sprintf("%s/permissions.pb", objKeyPattern.c_str()); -} - -inline TString MetadataKey(const TString& objKeyPattern) { - return Sprintf("%s/metadata.json", objKeyPattern.c_str()); -} - -inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) { - const auto ext = DataFileExtension(format, codec); - return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str()); -} +TString PermissionsFile(); +TString SchemeFile(); +TString MetadataFile(); +TString DataFile(ui32 n, EDataFormat format, ECompressionCodec codec); -inline TString ChecksumKey(const TString& objKey) { - return Sprintf("%s.sha256", objKey.c_str()); -} +TString ChecksumKey(const TString& objKey); } // NBackupRestoreTraits } // NDataShard diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/tx/datashard/export_checksum.cpp index 676033de5526..2a6dbc8ed0d5 100644 --- a/ydb/core/tx/datashard/export_checksum.cpp +++ b/ydb/core/tx/datashard/export_checksum.cpp @@ -4,24 +4,34 @@ namespace NKikimr::NDataShard { -TExportChecksum::TExportChecksum() { - SHA256_Init(&Context); -} +class TSHA256 : public IExportChecksum { +public: + TSHA256() { + SHA256_Init(&Context); + } -void TExportChecksum::AddData(const char* data, size_t dataSize) { - SHA256_Update(&Context, data, dataSize); -} + void AddData(TStringBuf data) override { + SHA256_Update(&Context, data.data(), data.size()); + } + + TString Serialize() override { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_Final(hash, &Context); + return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH)); + } + +private: + SHA256_CTX Context; +}; -TString TExportChecksum::Serialize() { - unsigned char hash[SHA256_DIGEST_LENGTH]; - SHA256_Final(hash, &Context); - return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH)); +TString IExportChecksum::Compute(TStringBuf data) { + IExportChecksum::TPtr checksum(CreateExportChecksum()); + checksum->AddData(data); + return checksum->Serialize(); } -TString TExportChecksum::Compute(const char* data, size_t dataSize) { - TExportChecksum checksum; - checksum.AddData(data, dataSize); - return checksum.Serialize(); +IExportChecksum* CreateExportChecksum() { + return new TSHA256(); } } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_checksum.h b/ydb/core/tx/datashard/export_checksum.h index 7b4f4dd1b2f6..c1c7ed2eee8e 100644 --- a/ydb/core/tx/datashard/export_checksum.h +++ b/ydb/core/tx/datashard/export_checksum.h @@ -6,16 +6,18 @@ namespace NKikimr::NDataShard { -class TExportChecksum { +class IExportChecksum { public: - TExportChecksum(); + using TPtr = std::unique_ptr; - void AddData(const char* data, size_t dataSize); - TString Serialize(); + virtual ~IExportChecksum() = default; - static TString Compute(const char* data, size_t dataSize); -private: - SHA256_CTX Context; + virtual void AddData(TStringBuf data) = 0; + virtual TString Serialize() = 0; + + static TString Compute(TStringBuf data); }; +IExportChecksum* CreateExportChecksum(); + } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp index dcb8ad2ac122..14343195da44 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp @@ -22,6 +22,7 @@ TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 byt , Rows(0) , BytesRead(0) , EnableChecksums(enableChecksums) + , Checksum(EnableChecksums ? CreateExportChecksum() : nullptr) { } @@ -156,13 +157,14 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) { TBufferOutput out(Buffer); ErrorString.clear(); - size_t startSize = Buffer.Size(); + size_t beforeSize = Buffer.Size(); if (!Collect(row, out)) { return false; } if (EnableChecksums) { - Checksum.AddData(Buffer.Data() + startSize, Buffer.Size() - startSize); + TStringBuf data(Buffer.Data(), Buffer.Size()); + Checksum->AddData(data.Tail(beforeSize)); } return true; } @@ -179,7 +181,7 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats.BytesSent = buffer->Size(); if (EnableChecksums && last) { - return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum.Serialize()); + return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum->Serialize()); } else { return new TEvExportScan::TEvBuffer(std::move(*buffer), last); } diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h index 230a5f2fc300..7c9c7c1fddcb 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.h +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h @@ -43,7 +43,7 @@ class TS3BufferRaw: public NExportScan::IBuffer { TBuffer Buffer; bool EnableChecksums; - TExportChecksum Checksum; + IExportChecksum::TPtr Checksum; TString ErrorString; }; // TS3BufferRaw diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp index 17d6b0e59606..4691e933c88d 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp @@ -68,7 +68,9 @@ class TS3BufferZstd: public TS3BufferRaw { return false; } - Checksum.AddData(BufferRaw.data(), BufferRaw.size()); + if (EnableChecksums) { + Checksum->AddData(BufferRaw); + } BytesRaw += BufferRaw.size(); auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0}; diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 52f6ab3c1846..a5764937fff1 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -183,7 +183,7 @@ class TS3Uploader: public TActorBootstrapped { google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); if (EnableChecksums) { - SchemeChecksum = TExportChecksum::Compute(Buffer.data(), Buffer.size()); + SchemeChecksum = IExportChecksum::Compute(Buffer); } auto request = Aws::S3::Model::PutObjectRequest() @@ -202,7 +202,7 @@ class TS3Uploader: public TActorBootstrapped { google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer); if (EnableChecksums) { - PermissionsChecksum = TExportChecksum::Compute(Buffer.data(), Buffer.size()); + PermissionsChecksum = IExportChecksum::Compute(Buffer); } auto request = Aws::S3::Model::PutObjectRequest() @@ -217,7 +217,7 @@ class TS3Uploader: public TActorBootstrapped { Buffer = std::move(Metadata); if (EnableChecksums) { - MetadataChecksum = TExportChecksum::Compute(Buffer.data(), Buffer.size()); + MetadataChecksum = IExportChecksum::Compute(Buffer); } auto request = Aws::S3::Model::PutObjectRequest() @@ -227,12 +227,11 @@ class TS3Uploader: public TActorBootstrapped { this->Become(&TThis::StateUploadMetadata); } - void UploadChecksum(TString&& checksum, const TString& key, const TString& relativeObjKey, + void UploadChecksum(TString&& checksum, const TString& key, const TString& checksumFile, std::function checksumUploadedCallback) { - // make it verifiable from sha256sum CLI - checksum += " "; - checksum += relativeObjKey; + // make checksum verifiable using sha256sum CLI + checksum += ' ' + checksumFile; auto request = Aws::S3::Model::PutObjectRequest() .WithKey(key); @@ -263,8 +262,8 @@ class TS3Uploader: public TActorBootstrapped { }; if (EnableChecksums) { - TString relativeObjKey = SchemeKey(""); - UploadChecksum(std::move(SchemeChecksum), ChecksumKey(Settings.GetSchemeKey()), relativeObjKey, nextStep); + TString checksumKey = ChecksumKey(Settings.GetSchemeKey()); + UploadChecksum(std::move(SchemeChecksum), checksumKey, SchemeFile(), nextStep); } else { nextStep(); } @@ -291,8 +290,8 @@ class TS3Uploader: public TActorBootstrapped { }; if (EnableChecksums) { - TString relativeObjKey = PermissionsKey(""); - UploadChecksum(std::move(PermissionsChecksum), ChecksumKey(Settings.GetPermissionsKey()), relativeObjKey, nextStep); + TString checksumKey = ChecksumKey(Settings.GetPermissionsKey()); + UploadChecksum(std::move(PermissionsChecksum), checksumKey, PermissionsFile(), nextStep); } else { nextStep(); } @@ -315,8 +314,8 @@ class TS3Uploader: public TActorBootstrapped { }; if (EnableChecksums) { - TString relativeObjKey = MetadataKey(""); - UploadChecksum(std::move(MetadataChecksum), ChecksumKey(Settings.GetMetadataKey()), relativeObjKey, nextStep); + TString checksumKey = ChecksumKey(Settings.GetMetadataKey()); + UploadChecksum(std::move(MetadataChecksum), checksumKey, MetadataFile(), nextStep); } else { nextStep(); } @@ -411,8 +410,8 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { // checksum is always calculated before compression TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); - TString relativeObjKey = Settings.GetRelativeDataKey(DataFormat, ECompressionCodec::None); - UploadChecksum(std::move(DataChecksum), checksumKey, relativeObjKey, nextStep); + TString dataFile = Settings.GetDataFile(DataFormat, ECompressionCodec::None); + UploadChecksum(std::move(DataChecksum), checksumKey, dataFile, nextStep); } else { nextStep(); } @@ -505,8 +504,8 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { // checksum is always calculated before compression TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); - TString relativeObjKey = Settings.GetRelativeDataKey(DataFormat, ECompressionCodec::None); - return UploadChecksum(std::move(DataChecksum), checksumKey, relativeObjKey, nextStep); + TString dataFile = Settings.GetDataFile(DataFormat, ECompressionCodec::None); + return UploadChecksum(std::move(DataChecksum), checksumKey, dataFile, nextStep); } else { return nextStep(); } @@ -820,7 +819,7 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { metadata.SetVersion(Task.GetEnableChecksums() ? 1 : 0); NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{ - .SnapshotVts = NBackup::TVirtualTimestamp( + .SnapshotVts = NBackupRestore::TVirtualTimestamp( Task.GetSnapshotStep(), Task.GetSnapshotTxId()) }; diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h index c4f261184e73..b4fda7d9fb5e 100644 --- a/ydb/core/tx/datashard/extstorage_usage_config.h +++ b/ydb/core/tx/datashard/extstorage_usage_config.h @@ -43,27 +43,27 @@ class TS3Settings { Aws::S3::Model::StorageClass GetStorageClass() const; inline TString GetPermissionsKey() const { - return NBackupRestoreTraits::PermissionsKey(ObjectKeyPattern); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::PermissionsFile(); } inline TString GetMetadataKey() const { - return NBackupRestoreTraits::MetadataKey(ObjectKeyPattern); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::MetadataFile(); } inline TString GetSchemeKey() const { - return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::SchemeFile(); } inline TString GetDataKey( NBackupRestoreTraits::EDataFormat format, NBackupRestoreTraits::ECompressionCodec codec) const { - return NBackupRestoreTraits::DataKey(ObjectKeyPattern, Shard, format, codec); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::DataFile(Shard, format, codec); } - inline TString GetRelativeDataKey( + inline TString GetDataFile( NBackupRestoreTraits::EDataFormat format, NBackupRestoreTraits::ECompressionCodec codec) const { - return NBackupRestoreTraits::DataKey("", Shard, format, codec); + return NBackupRestoreTraits::DataFile(Shard, format, codec); } }; // TS3Settings diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 72f167836bf5..f09413afcfdf 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -2380,20 +2380,75 @@ partitioning_settings { )", port)); env.TestWaitNotification(runtime, txId); + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); + + const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + UNIT_ASSERT(dataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); + + const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); + UNIT_ASSERT(metadataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json"); + + const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); + UNIT_ASSERT(schemeChecksum); + UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb"); + + const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256"); + UNIT_ASSERT(permissionsChecksum); + UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb"); + } + + Y_UNIT_TEST(ChecksumsWithCompression) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + compression: "zstd" + } + )", port)); + env.TestWaitNotification(runtime, txId); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); + const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); UNIT_ASSERT(dataChecksum); - UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "eec941c7 data_00.csv"); + UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); UNIT_ASSERT(metadataChecksum); - UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "f71ad64a metadata.json"); + UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json"); const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); UNIT_ASSERT(schemeChecksum); - UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "bbc132af scheme.pb"); + UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb"); const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256"); UNIT_ASSERT(permissionsChecksum); - UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "41935b38 permissions.pb"); + UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb"); } } From 8cfc5ebcda4565d12a4f527f22c33a27230635fb Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 05:08:01 +0000 Subject: [PATCH 03/11] Init metadata version --- ydb/core/tx/datashard/backup_restore_common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h index 254a6ed21004..d932f35de94d 100644 --- a/ydb/core/tx/datashard/backup_restore_common.h +++ b/ydb/core/tx/datashard/backup_restore_common.h @@ -183,7 +183,7 @@ class TMetadata { TString ConsistencyKey; TMap FullBackups; TMap Logs; - ui64 Version; + ui64 Version = 0; }; } // NBackupRestore From ffaaed8a8824f71d4bc8608977f88a757e02f8ec Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 06:22:00 +0000 Subject: [PATCH 04/11] Remove peerdir --- ydb/core/tx/datashard/ya.make | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 9dfa6bc84a75..d73553229a85 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -255,7 +255,6 @@ PEERDIR( library/cpp/l1_distance library/cpp/l2_distance ydb/core/actorlib_impl - ydb/core/backup/common ydb/core/base ydb/core/change_exchange ydb/core/engine From 25773130387314b20aecc6bca37ea014a7cfcc23 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 08:37:05 +0000 Subject: [PATCH 05/11] Fix canonization --- .../flat_schemeshard.schema | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index cd58d3db7563..ed19c6e0a55d 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -3031,6 +3031,11 @@ 1 ], "ColumnsAdded": [ + { + "ColumnId": 17, + "ColumnName": "EnableChecksums", + "ColumnType": "Bool" + }, { "ColumnId": 1, "ColumnName": "Id", @@ -3116,6 +3121,7 @@ "ColumnFamilies": { "0": { "Columns": [ + 17, 1, 2, 3, From 592910d5e86aaad241ba585f7f63c63b3db6bee7 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 08:58:37 +0000 Subject: [PATCH 06/11] Fix multipart --- ydb/core/tx/datashard/export_s3_uploader.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index a5764937fff1..313b144d9018 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -632,6 +632,7 @@ class TS3Uploader: public TActorBootstrapped { this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId, TS3Upload::EStatus::Abort, *Error)); } + Become(&TThis::StateUploadData); } } From 9ab3139579982a65474c280b952a66b51b423aee Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 14:05:36 +0000 Subject: [PATCH 07/11] Rename file to suffix --- .../tx/datashard/backup_restore_traits.cpp | 8 +++--- ydb/core/tx/datashard/backup_restore_traits.h | 8 +++--- ydb/core/tx/datashard/export_s3_uploader.cpp | 28 ++++++++++--------- .../tx/datashard/extstorage_usage_config.h | 10 +++---- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp index 9d4be0bcd5a4..c7c8443c4e43 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.cpp +++ b/ydb/core/tx/datashard/backup_restore_traits.cpp @@ -72,19 +72,19 @@ TString DataFileExtension(EDataFormat format, ECompressionCodec codec) { return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str()); } -TString PermissionsFile() { +TString PermissionsKeySuffix() { return "permissions.pb"; } -TString SchemeFile() { +TString SchemeKeySuffix() { return "scheme.pb"; } -TString MetadataFile() { +TString MetadataKeySuffix() { return "metadata.json"; } -TString DataFile(ui32 n, EDataFormat format, ECompressionCodec codec) { +TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec) { const auto ext = DataFileExtension(format, codec); return Sprintf("data_%02d%s", n, ext.c_str()); } diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h index 6f709b925a9d..539931a33e1e 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.h +++ b/ydb/core/tx/datashard/backup_restore_traits.h @@ -30,10 +30,10 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur); TString DataFileExtension(EDataFormat format, ECompressionCodec codec); -TString PermissionsFile(); -TString SchemeFile(); -TString MetadataFile(); -TString DataFile(ui32 n, EDataFormat format, ECompressionCodec codec); +TString PermissionsKeySuffix(); +TString SchemeKeySuffix(); +TString MetadataKeySuffix(); +TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec); TString ChecksumKey(const TString& objKey); diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 313b144d9018..cfad41d943c7 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -227,14 +227,14 @@ class TS3Uploader: public TActorBootstrapped { this->Become(&TThis::StateUploadMetadata); } - void UploadChecksum(TString&& checksum, const TString& key, const TString& checksumFile, + void UploadChecksum(TString&& checksum, const TString& checksumKey, const TString& objectKeySuffix, std::function checksumUploadedCallback) { // make checksum verifiable using sha256sum CLI - checksum += ' ' + checksumFile; + checksum += ' ' + objectKeySuffix; auto request = Aws::S3::Model::PutObjectRequest() - .WithKey(key); + .WithKey(checksumKey); this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(checksum))); ChecksumUploadedCallback = checksumUploadedCallback; @@ -263,7 +263,7 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { TString checksumKey = ChecksumKey(Settings.GetSchemeKey()); - UploadChecksum(std::move(SchemeChecksum), checksumKey, SchemeFile(), nextStep); + UploadChecksum(std::move(SchemeChecksum), checksumKey, SchemeKeySuffix(), nextStep); } else { nextStep(); } @@ -291,7 +291,7 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { TString checksumKey = ChecksumKey(Settings.GetPermissionsKey()); - UploadChecksum(std::move(PermissionsChecksum), checksumKey, PermissionsFile(), nextStep); + UploadChecksum(std::move(PermissionsChecksum), checksumKey, PermissionsKeySuffix(), nextStep); } else { nextStep(); } @@ -315,7 +315,7 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { TString checksumKey = ChecksumKey(Settings.GetMetadataKey()); - UploadChecksum(std::move(MetadataChecksum), checksumKey, MetadataFile(), nextStep); + UploadChecksum(std::move(MetadataChecksum), checksumKey, MetadataKeySuffix(), nextStep); } else { nextStep(); } @@ -410,8 +410,8 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { // checksum is always calculated before compression TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); - TString dataFile = Settings.GetDataFile(DataFormat, ECompressionCodec::None); - UploadChecksum(std::move(DataChecksum), checksumKey, dataFile, nextStep); + TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None); + UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep); } else { nextStep(); } @@ -504,8 +504,8 @@ class TS3Uploader: public TActorBootstrapped { if (EnableChecksums) { // checksum is always calculated before compression TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); - TString dataFile = Settings.GetDataFile(DataFormat, ECompressionCodec::None); - return UploadChecksum(std::move(DataChecksum), checksumKey, dataFile, nextStep); + TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None); + UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep); } else { return nextStep(); } @@ -669,6 +669,7 @@ class TS3Uploader: public TActorBootstrapped { , Settings(TS3Settings::FromBackupTask(task)) , DataFormat(EDataFormat::Csv) , CompressionCodec(CodecFromTask(task)) + , ShardNum(task.GetShardNum()) , HttpResolverConfig(GetHttpResolverConfig(*GetS3StorageConfig())) , DataShard(dataShard) , TxId(txId) @@ -678,9 +679,9 @@ class TS3Uploader: public TActorBootstrapped { , Retries(task.GetNumberOfRetries()) , Attempt(0) , Delay(TDuration::Minutes(1)) - , SchemeUploaded(task.GetShardNum() == 0 ? false : true) - , MetadataUploaded(task.GetShardNum() == 0 ? false : true) - , PermissionsUploaded(task.GetShardNum() == 0 ? false : true) + , SchemeUploaded(ShardNum == 0 ? false : true) + , MetadataUploaded(ShardNum == 0 ? false : true) + , PermissionsUploaded(ShardNum == 0 ? false : true) , EnableChecksums(task.GetEnableChecksums()) { } @@ -767,6 +768,7 @@ class TS3Uploader: public TActorBootstrapped { TS3Settings Settings; const EDataFormat DataFormat; const ECompressionCodec CompressionCodec; + const ui32 ShardNum; bool ProxyResolved; TMaybe HttpResolverConfig; diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h index b4fda7d9fb5e..a64cc757f3e2 100644 --- a/ydb/core/tx/datashard/extstorage_usage_config.h +++ b/ydb/core/tx/datashard/extstorage_usage_config.h @@ -43,27 +43,27 @@ class TS3Settings { Aws::S3::Model::StorageClass GetStorageClass() const; inline TString GetPermissionsKey() const { - return ObjectKeyPattern + '/' + NBackupRestoreTraits::PermissionsFile(); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::PermissionsKeySuffix(); } inline TString GetMetadataKey() const { - return ObjectKeyPattern + '/' + NBackupRestoreTraits::MetadataFile(); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::MetadataKeySuffix(); } inline TString GetSchemeKey() const { - return ObjectKeyPattern + '/' + NBackupRestoreTraits::SchemeFile(); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::SchemeKeySuffix(); } inline TString GetDataKey( NBackupRestoreTraits::EDataFormat format, NBackupRestoreTraits::ECompressionCodec codec) const { - return ObjectKeyPattern + '/' + NBackupRestoreTraits::DataFile(Shard, format, codec); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::DataKeySuffix(Shard, format, codec); } inline TString GetDataFile( NBackupRestoreTraits::EDataFormat format, NBackupRestoreTraits::ECompressionCodec codec) const { - return NBackupRestoreTraits::DataFile(Shard, format, codec); + return NBackupRestoreTraits::DataKeySuffix(Shard, format, codec); } }; // TS3Settings From 0db204b4a021ea17ff6e7b220bb10649600d2c92 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 14:09:15 +0000 Subject: [PATCH 08/11] Fix export_checksum --- ydb/core/tx/datashard/export_checksum.cpp | 2 +- ydb/core/tx/datashard/export_checksum.h | 5 +---- ydb/core/tx/datashard/export_s3_uploader.cpp | 6 +++--- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/tx/datashard/export_checksum.cpp index 2a6dbc8ed0d5..6ba36642f11c 100644 --- a/ydb/core/tx/datashard/export_checksum.cpp +++ b/ydb/core/tx/datashard/export_checksum.cpp @@ -24,7 +24,7 @@ class TSHA256 : public IExportChecksum { SHA256_CTX Context; }; -TString IExportChecksum::Compute(TStringBuf data) { +TString ComputeExportChecksum(TStringBuf data) { IExportChecksum::TPtr checksum(CreateExportChecksum()); checksum->AddData(data); return checksum->Serialize(); diff --git a/ydb/core/tx/datashard/export_checksum.h b/ydb/core/tx/datashard/export_checksum.h index c1c7ed2eee8e..0944c111f118 100644 --- a/ydb/core/tx/datashard/export_checksum.h +++ b/ydb/core/tx/datashard/export_checksum.h @@ -1,7 +1,5 @@ #pragma once -#include - #include namespace NKikimr::NDataShard { @@ -14,10 +12,9 @@ class IExportChecksum { virtual void AddData(TStringBuf data) = 0; virtual TString Serialize() = 0; - - static TString Compute(TStringBuf data); }; IExportChecksum* CreateExportChecksum(); +TString ComputeExportChecksum(TStringBuf data); } // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index cfad41d943c7..c3df25d0c63c 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -183,7 +183,7 @@ class TS3Uploader: public TActorBootstrapped { google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); if (EnableChecksums) { - SchemeChecksum = IExportChecksum::Compute(Buffer); + SchemeChecksum = ComputeExportChecksum(Buffer); } auto request = Aws::S3::Model::PutObjectRequest() @@ -202,7 +202,7 @@ class TS3Uploader: public TActorBootstrapped { google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer); if (EnableChecksums) { - PermissionsChecksum = IExportChecksum::Compute(Buffer); + PermissionsChecksum = ComputeExportChecksum(Buffer); } auto request = Aws::S3::Model::PutObjectRequest() @@ -217,7 +217,7 @@ class TS3Uploader: public TActorBootstrapped { Buffer = std::move(Metadata); if (EnableChecksums) { - MetadataChecksum = IExportChecksum::Compute(Buffer); + MetadataChecksum = ComputeExportChecksum(Buffer); } auto request = Aws::S3::Model::PutObjectRequest() From 829a393fabb75a7ff17dce660a813584985b80f7 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Tue, 24 Dec 2024 14:17:15 +0000 Subject: [PATCH 09/11] Fix build --- ydb/core/tx/datashard/export_checksum.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/tx/datashard/export_checksum.cpp index 6ba36642f11c..06f8ac342331 100644 --- a/ydb/core/tx/datashard/export_checksum.cpp +++ b/ydb/core/tx/datashard/export_checksum.cpp @@ -1,5 +1,7 @@ #include "export_checksum.h" +#include + #include namespace NKikimr::NDataShard { From 7a09062b075ca72469a0711356cf0cc15ae2cdc0 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Wed, 25 Dec 2024 10:59:02 +0000 Subject: [PATCH 10/11] Fix after review --- ydb/core/tx/datashard/export_s3_buffer_raw.cpp | 7 +++---- ydb/core/tx/datashard/export_s3_buffer_raw.h | 1 - ydb/core/tx/datashard/export_s3_buffer_zstd.cpp | 2 +- ydb/core/tx/datashard/export_s3_uploader.cpp | 14 +++++--------- 4 files changed, 9 insertions(+), 15 deletions(-) diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp index 14343195da44..ad6d03789c23 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp @@ -21,8 +21,7 @@ TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 byt , BytesLimit(bytesLimit) , Rows(0) , BytesRead(0) - , EnableChecksums(enableChecksums) - , Checksum(EnableChecksums ? CreateExportChecksum() : nullptr) + , Checksum(enableChecksums ? CreateExportChecksum() : nullptr) { } @@ -162,7 +161,7 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) { return false; } - if (EnableChecksums) { + if (Checksum) { TStringBuf data(Buffer.Data(), Buffer.Size()); Checksum->AddData(data.Tail(beforeSize)); } @@ -180,7 +179,7 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats.BytesSent = buffer->Size(); - if (EnableChecksums && last) { + if (Checksum && last) { return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum->Serialize()); } else { return new TEvExportScan::TEvBuffer(std::move(*buffer), last); diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h index 7c9c7c1fddcb..816a6fa58649 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.h +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h @@ -42,7 +42,6 @@ class TS3BufferRaw: public NExportScan::IBuffer { ui64 BytesRead; TBuffer Buffer; - bool EnableChecksums; IExportChecksum::TPtr Checksum; TString ErrorString; diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp index 4691e933c88d..d1312b398470 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp @@ -68,7 +68,7 @@ class TS3BufferZstd: public TS3BufferRaw { return false; } - if (EnableChecksums) { + if (Checksum) { Checksum->AddData(BufferRaw); } BytesRaw += BufferRaw.size(); diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index c3df25d0c63c..b571f0995a94 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -252,7 +252,7 @@ class TS3Uploader: public TActorBootstrapped { return; } - auto nextStep = [this](){ + auto nextStep = [this]() { SchemeUploaded = true; if (Scanner) { @@ -280,12 +280,8 @@ class TS3Uploader: public TActorBootstrapped { return; } - auto nextStep = [this](){ - PermissionsUploaded = true; - PermissionsUploaded = true; - + auto nextStep = [this]() { PermissionsUploaded = true; - UploadScheme(); }; @@ -308,7 +304,7 @@ class TS3Uploader: public TActorBootstrapped { return; } - auto nextStep = [this](){ + auto nextStep = [this]() { MetadataUploaded = true; UploadPermissions(); }; @@ -403,7 +399,7 @@ class TS3Uploader: public TActorBootstrapped { return; } - auto nextStep = [this](){ + auto nextStep = [this]() { Finish(); }; @@ -497,7 +493,7 @@ class TS3Uploader: public TActorBootstrapped { Parts.push_back(result.GetResult().GetETag().c_str()); if (Last) { - auto nextStep = [this](){ + auto nextStep = [this]() { Finish(); }; From 30cd3296efa125f31f7246f942615996abcf1eb9 Mon Sep 17 00:00:00 2001 From: Ilia Shakhov Date: Wed, 25 Dec 2024 11:44:09 +0000 Subject: [PATCH 11/11] Fix after review --- ydb/core/tx/datashard/export_s3_uploader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index b571f0995a94..cedcec73c70b 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -501,7 +501,7 @@ class TS3Uploader: public TActorBootstrapped { // checksum is always calculated before compression TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None); - UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep); + return UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep); } else { return nextStep(); }