diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 1f8a70465871..fd7947080aca 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -184,4 +184,5 @@ message TFeatureFlags { optional bool EnableDataShardInMemoryStateMigration = 159 [default = true]; optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false]; optional bool DisableLocalDBEraseCache = 161 [default = false]; + optional bool EnableExportChecksums = 162 [default = false]; } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 4c52ff262e8d..e412b2b1f63a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1246,6 +1246,7 @@ message TBackupTask { optional uint64 SnapshotStep = 14; optional uint64 SnapshotTxId = 15; + optional bool EnableChecksums = 16; // currently available for s3 } message TRestoreTask { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 5124e9ed3fa1..25df60794aaa 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -71,6 +71,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableParameterizedDecimal) FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC) FEATURE_FLAG_SETTER(EnableFollowerStats) + FEATURE_FLAG_SETTER(EnableExportChecksums) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/datashard/backup_restore_common.cpp b/ydb/core/tx/datashard/backup_restore_common.cpp index dad590c13d56..5daaf26e22f1 100644 --- a/ydb/core/tx/datashard/backup_restore_common.cpp +++ b/ydb/core/tx/datashard/backup_restore_common.cpp @@ -6,9 +6,13 @@ void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) { FullBackups.emplace(fb->SnapshotVts, fb); } +void TMetadata::SetVersion(ui64 version) { + Version = version; +} + TString TMetadata::Serialize() const { NJson::TJsonMap m; - m["version"] = 0; + m["version"] = Version; NJson::TJsonArray fullBackups; for (auto &[tp, _] : FullBackups) { NJson::TJsonMap backupMap; diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h index 61da3f7e5350..d932f35de94d 100644 --- a/ydb/core/tx/datashard/backup_restore_common.h +++ b/ydb/core/tx/datashard/backup_restore_common.h @@ -175,6 +175,7 @@ class TMetadata { void AddFullBackup(TFullBackupMetadata::TPtr fullBackup); void AddLog(TLogMetadata::TPtr log); void SetConsistencyKey(const TString& key); + void SetVersion(ui64 version); TString Serialize() const; static TMetadata Deserialize(const TString& metadata); @@ -182,6 +183,7 @@ class TMetadata { TString ConsistencyKey; TMap FullBackups; TMap Logs; + ui64 Version = 0; }; } // NBackupRestore diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp index a55f5681d93b..c7c8443c4e43 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.cpp +++ b/ydb/core/tx/datashard/backup_restore_traits.cpp @@ -72,6 +72,27 @@ TString DataFileExtension(EDataFormat format, ECompressionCodec codec) { return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str()); } +TString PermissionsKeySuffix() { + return "permissions.pb"; +} + +TString SchemeKeySuffix() { + return "scheme.pb"; +} + +TString MetadataKeySuffix() { + return "metadata.json"; +} + +TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec) { + const auto ext = DataFileExtension(format, codec); + return Sprintf("data_%02d%s", n, ext.c_str()); +} + +TString ChecksumKey(const TString& objKey) { + return objKey + ".sha256"; +} + } // NBackupRestoreTraits } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h index 036d5a6ea514..539931a33e1e 100644 --- a/ydb/core/tx/datashard/backup_restore_traits.h +++ b/ydb/core/tx/datashard/backup_restore_traits.h @@ -30,22 +30,12 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur); TString DataFileExtension(EDataFormat format, ECompressionCodec codec); -inline TString SchemeKey(const TString& objKeyPattern) { - return Sprintf("%s/scheme.pb", objKeyPattern.c_str()); -} - -inline TString PermissionsKey(const TString& objKeyPattern) { - return Sprintf("%s/permissions.pb", objKeyPattern.c_str()); -} +TString PermissionsKeySuffix(); +TString SchemeKeySuffix(); +TString MetadataKeySuffix(); +TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec); -inline TString MetadataKey(const TString& objKeyPattern) { - return Sprintf("%s/metadata.json", objKeyPattern.c_str()); -} - -inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) { - const auto ext = DataFileExtension(format, codec); - return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str()); -} +TString ChecksumKey(const TString& objKey); } // NBackupRestoreTraits } // NDataShard diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/tx/datashard/export_checksum.cpp new file mode 100644 index 000000000000..06f8ac342331 --- /dev/null +++ b/ydb/core/tx/datashard/export_checksum.cpp @@ -0,0 +1,39 @@ +#include "export_checksum.h" + +#include + +#include + +namespace NKikimr::NDataShard { + +class TSHA256 : public IExportChecksum { +public: + TSHA256() { + SHA256_Init(&Context); + } + + void AddData(TStringBuf data) override { + SHA256_Update(&Context, data.data(), data.size()); + } + + TString Serialize() override { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_Final(hash, &Context); + return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH)); + } + +private: + SHA256_CTX Context; +}; + +TString ComputeExportChecksum(TStringBuf data) { + IExportChecksum::TPtr checksum(CreateExportChecksum()); + checksum->AddData(data); + return checksum->Serialize(); +} + +IExportChecksum* CreateExportChecksum() { + return new TSHA256(); +} + +} // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_checksum.h b/ydb/core/tx/datashard/export_checksum.h new file mode 100644 index 000000000000..0944c111f118 --- /dev/null +++ b/ydb/core/tx/datashard/export_checksum.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +namespace NKikimr::NDataShard { + +class IExportChecksum { +public: + using TPtr = std::unique_ptr; + + virtual ~IExportChecksum() = default; + + virtual void AddData(TStringBuf data) = 0; + virtual TString Serialize() = 0; +}; + +IExportChecksum* CreateExportChecksum(); +TString ComputeExportChecksum(TStringBuf data); + +} // NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h index 3e3a651c8f53..cf21cd2e90df 100644 --- a/ydb/core/tx/datashard/export_s3.h +++ b/ydb/core/tx/datashard/export_s3.h @@ -31,9 +31,10 @@ class TS3Export: public IExport { switch (CodecFromTask(Task)) { case ECompressionCodec::None: - return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes); + return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes, Task.GetEnableChecksums()); case ECompressionCodec::Zstd: - return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes); + return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, + maxBytes, minBytes, Task.GetEnableChecksums()); case ECompressionCodec::Invalid: Y_ABORT("unreachable"); } diff --git a/ydb/core/tx/datashard/export_s3_buffer.h b/ydb/core/tx/datashard/export_s3_buffer.h index 2d332a561d73..130e0ba7c968 100644 --- a/ydb/core/tx/datashard/export_s3_buffer.h +++ b/ydb/core/tx/datashard/export_s3_buffer.h @@ -9,10 +9,10 @@ namespace NKikimr { namespace NDataShard { NExportScan::IBuffer* CreateS3ExportBufferRaw( - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes); + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, bool enableChecksums); NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel, - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes); + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, bool enableChecksums); } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp index e0647ee70a8c..ad6d03789c23 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp @@ -15,12 +15,13 @@ namespace NKikimr { namespace NDataShard { -TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit) +TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums) : Columns(columns) , RowsLimit(rowsLimit) , BytesLimit(bytesLimit) , Rows(0) , BytesRead(0) + , Checksum(enableChecksums ? CreateExportChecksum() : nullptr) { } @@ -154,7 +155,17 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) { bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) { TBufferOutput out(Buffer); ErrorString.clear(); - return Collect(row, out); + + size_t beforeSize = Buffer.Size(); + if (!Collect(row, out)) { + return false; + } + + if (Checksum) { + TStringBuf data(Buffer.Data(), Buffer.Size()); + Checksum->AddData(data.Tail(beforeSize)); + } + return true; } IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) { @@ -167,7 +178,12 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& } stats.BytesSent = buffer->Size(); - return new TEvExportScan::TEvBuffer(std::move(*buffer), last); + + if (Checksum && last) { + return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum->Serialize()); + } else { + return new TEvExportScan::TEvBuffer(std::move(*buffer), last); + } } void TS3BufferRaw::Clear() { @@ -189,9 +205,9 @@ TMaybe TS3BufferRaw::Flush(bool) { } NExportScan::IBuffer* CreateS3ExportBufferRaw( - const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) + const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums) { - return new TS3BufferRaw(columns, rowsLimit, bytesLimit); + return new TS3BufferRaw(columns, rowsLimit, bytesLimit, enableChecksums); } } // NDataShard diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h index 3573f4b6d597..816a6fa58649 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.h +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h @@ -14,7 +14,7 @@ class TS3BufferRaw: public NExportScan::IBuffer { using TTagToIndex = THashMap; // index in IScan::TRow public: - explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit); + explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums); void ColumnsOrder(const TVector& tags) override; bool Collect(const NTable::IScan::TRow& row) override; @@ -42,6 +42,8 @@ class TS3BufferRaw: public NExportScan::IBuffer { ui64 BytesRead; TBuffer Buffer; + IExportChecksum::TPtr Checksum; + TString ErrorString; }; // TS3BufferRaw diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp index 7588d3b7a58b..d1312b398470 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp @@ -49,8 +49,9 @@ class TS3BufferZstd: public TS3BufferRaw { public: explicit TS3BufferZstd(int compressionLevel, - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes) - : TS3BufferRaw(columns, maxRows, maxBytes) + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, + bool enableChecksums) + : TS3BufferRaw(columns, maxRows, maxBytes, enableChecksums) , CompressionLevel(compressionLevel) , MinBytes(minBytes) , Context(ZSTD_createCCtx()) @@ -67,6 +68,9 @@ class TS3BufferZstd: public TS3BufferRaw { return false; } + if (Checksum) { + Checksum->AddData(BufferRaw); + } BytesRaw += BufferRaw.size(); auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0}; @@ -122,9 +126,10 @@ class TS3BufferZstd: public TS3BufferRaw { }; // TS3BufferZstd NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel, - const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes) + const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, + bool enableChecksums) { - return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes); + return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes, enableChecksums); } } // NDataShard diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index 9acd52bc7b81..cedcec73c70b 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -31,6 +31,8 @@ namespace NKikimr { namespace NDataShard { +using namespace NBackupRestoreTraits; + class TS3Uploader: public TActorBootstrapped { using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig; @@ -180,6 +182,9 @@ class TS3Uploader: public TActorBootstrapped { } google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer); + if (EnableChecksums) { + SchemeChecksum = ComputeExportChecksum(Buffer); + } auto request = Aws::S3::Model::PutObjectRequest() .WithKey(Settings.GetSchemeKey()); @@ -196,6 +201,9 @@ class TS3Uploader: public TActorBootstrapped { } google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer); + if (EnableChecksums) { + PermissionsChecksum = ComputeExportChecksum(Buffer); + } auto request = Aws::S3::Model::PutObjectRequest() .WithKey(Settings.GetPermissionsKey()); @@ -208,6 +216,9 @@ class TS3Uploader: public TActorBootstrapped { Y_ABORT_UNLESS(!MetadataUploaded); Buffer = std::move(Metadata); + if (EnableChecksums) { + MetadataChecksum = ComputeExportChecksum(Buffer); + } auto request = Aws::S3::Model::PutObjectRequest() .WithKey(Settings.GetMetadataKey()); @@ -216,6 +227,20 @@ class TS3Uploader: public TActorBootstrapped { this->Become(&TThis::StateUploadMetadata); } + void UploadChecksum(TString&& checksum, const TString& checksumKey, const TString& objectKeySuffix, + std::function checksumUploadedCallback) + { + // make checksum verifiable using sha256sum CLI + checksum += ' ' + objectKeySuffix; + + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(checksumKey); + this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(checksum))); + + ChecksumUploadedCallback = checksumUploadedCallback; + this->Become(&TThis::StateUploadChecksum); + } + void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { const auto& result = ev->Get()->Result; @@ -227,13 +252,21 @@ class TS3Uploader: public TActorBootstrapped { return; } - SchemeUploaded = true; + auto nextStep = [this]() { + SchemeUploaded = true; - if (Scanner) { - this->Send(Scanner, new TEvExportScan::TEvFeed()); - } + if (Scanner) { + this->Send(Scanner, new TEvExportScan::TEvFeed()); + } + this->Become(&TThis::StateUploadData); + }; - this->Become(&TThis::StateUploadData); + if (EnableChecksums) { + TString checksumKey = ChecksumKey(Settings.GetSchemeKey()); + UploadChecksum(std::move(SchemeChecksum), checksumKey, SchemeKeySuffix(), nextStep); + } else { + nextStep(); + } } void HandlePermissions(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { @@ -247,9 +280,17 @@ class TS3Uploader: public TActorBootstrapped { return; } - PermissionsUploaded = true; + auto nextStep = [this]() { + PermissionsUploaded = true; + UploadScheme(); + }; - UploadScheme(); + if (EnableChecksums) { + TString checksumKey = ChecksumKey(Settings.GetPermissionsKey()); + UploadChecksum(std::move(PermissionsChecksum), checksumKey, PermissionsKeySuffix(), nextStep); + } else { + nextStep(); + } } void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { @@ -263,9 +304,31 @@ class TS3Uploader: public TActorBootstrapped { return; } - MetadataUploaded = true; + auto nextStep = [this]() { + MetadataUploaded = true; + UploadPermissions(); + }; - UploadPermissions(); + if (EnableChecksums) { + TString checksumKey = ChecksumKey(Settings.GetMetadataKey()); + UploadChecksum(std::move(MetadataChecksum), checksumKey, MetadataKeySuffix(), nextStep); + } else { + nextStep(); + } + } + + void HandleChecksum(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + EXPORT_LOG_D("HandleChecksum TEvExternalStorage::TEvPutObjectResponse" + << ": self# " << this->SelfId() + << ", result# " << result); + + if (!CheckResult(result, TStringBuf("PutObject (checksum)"))) { + return; + } + + ChecksumUploadedCallback(); } void Handle(TEvExportScan::TEvReady::TPtr& ev) { @@ -301,6 +364,7 @@ class TS3Uploader: public TActorBootstrapped { Last = ev->Get()->Last; MultiPart = MultiPart || !Last; ev->Get()->Buffer.AsString(Buffer); + DataChecksum = std::move(ev->Get()->Checksum); UploadData(); } @@ -335,7 +399,18 @@ class TS3Uploader: public TActorBootstrapped { return; } - Finish(); + auto nextStep = [this]() { + Finish(); + }; + + if (EnableChecksums) { + // checksum is always calculated before compression + TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); + TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None); + UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep); + } else { + nextStep(); + } } void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) { @@ -418,7 +493,18 @@ class TS3Uploader: public TActorBootstrapped { Parts.push_back(result.GetResult().GetETag().c_str()); if (Last) { - return Finish(); + auto nextStep = [this]() { + Finish(); + }; + + if (EnableChecksums) { + // checksum is always calculated before compression + TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); + TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None); + return UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep); + } else { + return nextStep(); + } } this->Send(Scanner, new TEvExportScan::TEvFeed()); @@ -542,6 +628,7 @@ class TS3Uploader: public TActorBootstrapped { this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId, TS3Upload::EStatus::Abort, *Error)); } + Become(&TThis::StateUploadData); } } @@ -576,8 +663,9 @@ class TS3Uploader: public TActorBootstrapped { TString&& metadata) : ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings())) , Settings(TS3Settings::FromBackupTask(task)) - , DataFormat(NBackupRestoreTraits::EDataFormat::Csv) - , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task)) + , DataFormat(EDataFormat::Csv) + , CompressionCodec(CodecFromTask(task)) + , ShardNum(task.GetShardNum()) , HttpResolverConfig(GetHttpResolverConfig(*GetS3StorageConfig())) , DataShard(dataShard) , TxId(txId) @@ -587,9 +675,10 @@ class TS3Uploader: public TActorBootstrapped { , Retries(task.GetNumberOfRetries()) , Attempt(0) , Delay(TDuration::Minutes(1)) - , SchemeUploaded(task.GetShardNum() == 0 ? false : true) - , MetadataUploaded(task.GetShardNum() == 0 ? false : true) - , PermissionsUploaded(task.GetShardNum() == 0 ? false : true) + , SchemeUploaded(ShardNum == 0 ? false : true) + , MetadataUploaded(ShardNum == 0 ? false : true) + , PermissionsUploaded(ShardNum == 0 ? false : true) + , EnableChecksums(task.GetEnableChecksums()) { } @@ -647,6 +736,14 @@ class TS3Uploader: public TActorBootstrapped { } } + STATEFN(StateUploadChecksum) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleChecksum); + default: + return StateBase(ev); + } + } + STATEFN(StateUploadData) { switch (ev->GetTypeRewrite()) { hFunc(TEvBuffer, Handle); @@ -665,8 +762,9 @@ class TS3Uploader: public TActorBootstrapped { private: NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; TS3Settings Settings; - const NBackupRestoreTraits::EDataFormat DataFormat; - const NBackupRestoreTraits::ECompressionCodec CompressionCodec; + const EDataFormat DataFormat; + const ECompressionCodec CompressionCodec; + const ui32 ShardNum; bool ProxyResolved; TMaybe HttpResolverConfig; @@ -698,6 +796,13 @@ class TS3Uploader: public TActorBootstrapped { TVector Parts; TMaybe Error; + bool EnableChecksums; + TString DataChecksum; + TString MetadataChecksum; + TString SchemeChecksum; + TString PermissionsChecksum; + std::function ChecksumUploadedCallback; + }; // TS3Uploader IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { @@ -710,6 +815,7 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { : Nothing(); NBackupRestore::TMetadata metadata; + metadata.SetVersion(Task.GetEnableChecksums() ? 1 : 0); NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{ .SnapshotVts = NBackupRestore::TVirtualTimestamp( diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h index 1873795255ef..8c723448770d 100644 --- a/ydb/core/tx/datashard/export_scan.h +++ b/ydb/core/tx/datashard/export_scan.h @@ -2,6 +2,8 @@ #include "defs.h" +#include "export_checksum.h" + #include #include @@ -32,18 +34,21 @@ struct TEvExportScan { struct TEvBuffer: public TEventLocal, EvBuffer> { TBuffer Buffer; bool Last; + TString Checksum; TEvBuffer() = default; - explicit TEvBuffer(TBuffer&& buffer, bool last) + explicit TEvBuffer(TBuffer&& buffer, bool last, TString&& checksum = "") : Buffer(std::move(buffer)) , Last(last) + , Checksum(std::move(checksum)) { } TString ToString() const override { return TStringBuilder() << this->ToStringHeader() << " {" << " Last: " << Last + << " Checksum:" << Checksum << " }"; } }; diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h index 873e1b04ac43..a64cc757f3e2 100644 --- a/ydb/core/tx/datashard/extstorage_usage_config.h +++ b/ydb/core/tx/datashard/extstorage_usage_config.h @@ -43,21 +43,27 @@ class TS3Settings { Aws::S3::Model::StorageClass GetStorageClass() const; inline TString GetPermissionsKey() const { - return NBackupRestoreTraits::PermissionsKey(ObjectKeyPattern); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::PermissionsKeySuffix(); } inline TString GetMetadataKey() const { - return NBackupRestoreTraits::MetadataKey(ObjectKeyPattern); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::MetadataKeySuffix(); } inline TString GetSchemeKey() const { - return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::SchemeKeySuffix(); } inline TString GetDataKey( NBackupRestoreTraits::EDataFormat format, NBackupRestoreTraits::ECompressionCodec codec) const { - return NBackupRestoreTraits::DataKey(ObjectKeyPattern, Shard, format, codec); + return ObjectKeyPattern + '/' + NBackupRestoreTraits::DataKeySuffix(Shard, format, codec); + } + + inline TString GetDataFile( + NBackupRestoreTraits::EDataFormat format, + NBackupRestoreTraits::ECompressionCodec codec) const { + return NBackupRestoreTraits::DataKeySuffix(Shard, format, codec); } }; // TS3Settings diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 3ec752d2898c..d73553229a85 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -152,6 +152,7 @@ SRCS( execution_unit.h execution_unit_ctors.h execution_unit_kind.h + export_checksum.cpp export_common.cpp export_iface.cpp export_iface.h diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 10db6d9c49f6..aa07665ed936 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4279,6 +4279,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { exportInfo->StartTime = TInstant::Seconds(rowset.GetValueOrDefault()); exportInfo->EndTime = TInstant::Seconds(rowset.GetValueOrDefault()); + exportInfo->EnableChecksums = rowset.GetValueOrDefault(false); Self->Exports[id] = exportInfo; if (uid) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index a8ae8e67d9a6..ea6f881ee39a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -153,7 +153,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T NIceDb::TUpdate(exportInfo->Settings), NIceDb::TUpdate(exportInfo->DomainPathId.OwnerId), NIceDb::TUpdate(exportInfo->DomainPathId.LocalPathId), - NIceDb::TUpdate(exportInfo->Items.size()) + NIceDb::TUpdate(exportInfo->Items.size()), + NIceDb::TUpdate(exportInfo->EnableChecksums) ); if (exportInfo->UserSID) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index f65eabb43f89..7f63a60a7970 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -118,7 +118,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName()); - + exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableExportChecksums(); TString explain; if (!FillItems(exportInfo, settings, explain)) { return Reply( diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index d6fca52aecaf..ce03d1669a91 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -193,6 +193,8 @@ THolder BackupPropose( if (const auto compression = exportSettings.compression()) { Y_ABORT_UNLESS(FillCompression(*task.MutableCompression(), compression)); } + + task.SetEnableChecksums(exportInfo->EnableChecksums); } break; } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index f265463bc98a..765a40771588 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2709,6 +2709,8 @@ struct TExportInfo: public TSimpleRefCount { TInstant StartTime = TInstant::Zero(); TInstant EndTime = TInstant::Zero(); + bool EnableChecksums = false; + explicit TExportInfo( const ui64 id, const TString& uid, diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 632616f6fc9a..76d29549ea33 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1175,6 +1175,8 @@ struct Schema : NIceDb::Schema { struct EndTime : Column<15, NScheme::NTypeIds::Uint64> {}; struct PeerName : Column<16, NScheme::NTypeIds::Utf8> {}; + struct EnableChecksums : Column<17, NScheme::NTypeIds::Bool> {}; + using TKey = TableKey; using TColumns = TableColumns< Id, @@ -1192,7 +1194,8 @@ struct Schema : NIceDb::Schema { UserSID, StartTime, EndTime, - PeerName + PeerName, + EnableChecksums >; }; diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 47cfca6c2ed0..f09413afcfdf 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -2346,4 +2346,109 @@ partitioning_settings { } )")); } + + Y_UNIT_TEST(Checksums) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port)); + env.TestWaitNotification(runtime, txId); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); + + const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + UNIT_ASSERT(dataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); + + const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); + UNIT_ASSERT(metadataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json"); + + const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); + UNIT_ASSERT(schemeChecksum); + UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb"); + + const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256"); + UNIT_ASSERT(permissionsChecksum); + UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb"); + } + + Y_UNIT_TEST(ChecksumsWithCompression) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + compression: "zstd" + } + )", port)); + env.TestWaitNotification(runtime, txId); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); + + const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256"); + UNIT_ASSERT(dataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv"); + + const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256"); + UNIT_ASSERT(metadataChecksum); + UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json"); + + const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256"); + UNIT_ASSERT(schemeChecksum); + UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb"); + + const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256"); + UNIT_ASSERT(permissionsChecksum); + UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb"); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 16077cc065b7..e785ed85e089 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -548,6 +548,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableParameterizedDecimal(opts.EnableParameterizedDecimal_); app.SetEnableTopicAutopartitioningForCDC(opts.EnableTopicAutopartitioningForCDC_); app.SetEnableBackupService(opts.EnableBackupService_); + app.SetEnableExportChecksums(true); app.ColumnShardConfig.SetDisabledOnSchemeShard(false); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index cd58d3db7563..ed19c6e0a55d 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -3031,6 +3031,11 @@ 1 ], "ColumnsAdded": [ + { + "ColumnId": 17, + "ColumnName": "EnableChecksums", + "ColumnType": "Bool" + }, { "ColumnId": 1, "ColumnName": "Id", @@ -3116,6 +3121,7 @@ "ColumnFamilies": { "0": { "Columns": [ + 17, 1, 2, 3,