Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add export checksums #12728

Merged
merged 11 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,5 @@ message TFeatureFlags {
optional bool EnableDataShardInMemoryStateMigration = 159 [default = true];
optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false];
optional bool DisableLocalDBEraseCache = 161 [default = false];
optional bool EnableExportChecksums = 162 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ message TBackupTask {

optional uint64 SnapshotStep = 14;
optional uint64 SnapshotTxId = 15;
optional bool EnableChecksums = 16; // currently available for s3
}

message TRestoreTask {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableParameterizedDecimal)
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC)
FEATURE_FLAG_SETTER(EnableFollowerStats)
FEATURE_FLAG_SETTER(EnableExportChecksums)

#undef FEATURE_FLAG_SETTER
};
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/datashard/backup_restore_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) {
FullBackups.emplace(fb->SnapshotVts, fb);
}

void TMetadata::SetVersion(ui64 version) {
Version = version;
}

TString TMetadata::Serialize() const {
NJson::TJsonMap m;
m["version"] = 0;
m["version"] = Version;
NJson::TJsonArray fullBackups;
for (auto &[tp, _] : FullBackups) {
NJson::TJsonMap backupMap;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/backup_restore_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,15 @@ class TMetadata {
void AddFullBackup(TFullBackupMetadata::TPtr fullBackup);
void AddLog(TLogMetadata::TPtr log);
void SetConsistencyKey(const TString& key);
void SetVersion(ui64 version);

TString Serialize() const;
static TMetadata Deserialize(const TString& metadata);
private:
TString ConsistencyKey;
TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups;
TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs;
ui64 Version = 0;
};

} // NBackupRestore
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/tx/datashard/backup_restore_traits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ TString DataFileExtension(EDataFormat format, ECompressionCodec codec) {
return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str());
}

TString PermissionsKeySuffix() {
return "permissions.pb";
}

TString SchemeKeySuffix() {
return "scheme.pb";
}

TString MetadataKeySuffix() {
return "metadata.json";
}

TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec) {
const auto ext = DataFileExtension(format, codec);
return Sprintf("data_%02d%s", n, ext.c_str());
}

TString ChecksumKey(const TString& objKey) {
return objKey + ".sha256";
}

} // NBackupRestoreTraits
} // NDataShard
} // NKikimr
20 changes: 5 additions & 15 deletions ydb/core/tx/datashard/backup_restore_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,12 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur);

TString DataFileExtension(EDataFormat format, ECompressionCodec codec);

inline TString SchemeKey(const TString& objKeyPattern) {
return Sprintf("%s/scheme.pb", objKeyPattern.c_str());
}

inline TString PermissionsKey(const TString& objKeyPattern) {
return Sprintf("%s/permissions.pb", objKeyPattern.c_str());
}
TString PermissionsKeySuffix();
TString SchemeKeySuffix();
TString MetadataKeySuffix();
TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec);

inline TString MetadataKey(const TString& objKeyPattern) {
return Sprintf("%s/metadata.json", objKeyPattern.c_str());
}

inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) {
const auto ext = DataFileExtension(format, codec);
return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str());
}
TString ChecksumKey(const TString& objKey);

} // NBackupRestoreTraits
} // NDataShard
Expand Down
39 changes: 39 additions & 0 deletions ydb/core/tx/datashard/export_checksum.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "export_checksum.h"

#include <openssl/sha.h>

#include <util/string/hex.h>

namespace NKikimr::NDataShard {

class TSHA256 : public IExportChecksum {
public:
TSHA256() {
SHA256_Init(&Context);
}

void AddData(TStringBuf data) override {
SHA256_Update(&Context, data.data(), data.size());
}

TString Serialize() override {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_Final(hash, &Context);
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

private:
SHA256_CTX Context;
};

TString ComputeExportChecksum(TStringBuf data) {
IExportChecksum::TPtr checksum(CreateExportChecksum());
checksum->AddData(data);
return checksum->Serialize();
}

IExportChecksum* CreateExportChecksum() {
return new TSHA256();
}

} // NKikimr::NDataShard
20 changes: 20 additions & 0 deletions ydb/core/tx/datashard/export_checksum.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <util/generic/string.h>

namespace NKikimr::NDataShard {

class IExportChecksum {
public:
using TPtr = std::unique_ptr<IExportChecksum>;

virtual ~IExportChecksum() = default;

virtual void AddData(TStringBuf data) = 0;
virtual TString Serialize() = 0;
};

IExportChecksum* CreateExportChecksum();
TString ComputeExportChecksum(TStringBuf data);

} // NKikimr::NDataShard
5 changes: 3 additions & 2 deletions ydb/core/tx/datashard/export_s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ class TS3Export: public IExport {

switch (CodecFromTask(Task)) {
case ECompressionCodec::None:
return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes);
return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes, Task.GetEnableChecksums());
case ECompressionCodec::Zstd:
return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes);
return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows,
maxBytes, minBytes, Task.GetEnableChecksums());
case ECompressionCodec::Invalid:
Y_ABORT("unreachable");
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/export_s3_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ namespace NKikimr {
namespace NDataShard {

NExportScan::IBuffer* CreateS3ExportBufferRaw(
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes);
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, bool enableChecksums);

NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes);
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, bool enableChecksums);

} // NDataShard
} // NKikimr
Expand Down
26 changes: 21 additions & 5 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
namespace NKikimr {
namespace NDataShard {

TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit)
TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
: Columns(columns)
, RowsLimit(rowsLimit)
, BytesLimit(bytesLimit)
, Rows(0)
, BytesRead(0)
, Checksum(enableChecksums ? CreateExportChecksum() : nullptr)
{
}

Expand Down Expand Up @@ -154,7 +155,17 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
TBufferOutput out(Buffer);
ErrorString.clear();
return Collect(row, out);

size_t beforeSize = Buffer.Size();
if (!Collect(row, out)) {
return false;
}

if (Checksum) {
TStringBuf data(Buffer.Data(), Buffer.Size());
Checksum->AddData(data.Tail(beforeSize));
}
return true;
}

IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
Expand All @@ -167,7 +178,12 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats&
}

stats.BytesSent = buffer->Size();
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);

if (Checksum && last) {
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Serialize());
} else {
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
}
}

void TS3BufferRaw::Clear() {
Expand All @@ -189,9 +205,9 @@ TMaybe<TBuffer> TS3BufferRaw::Flush(bool) {
}

NExportScan::IBuffer* CreateS3ExportBufferRaw(
const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit)
const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
{
return new TS3BufferRaw(columns, rowsLimit, bytesLimit);
return new TS3BufferRaw(columns, rowsLimit, bytesLimit, enableChecksums);
}

} // NDataShard
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/export_s3_buffer_raw.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TS3BufferRaw: public NExportScan::IBuffer {
using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow

public:
explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit);
explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums);

void ColumnsOrder(const TVector<ui32>& tags) override;
bool Collect(const NTable::IScan::TRow& row) override;
Expand Down Expand Up @@ -42,6 +42,8 @@ class TS3BufferRaw: public NExportScan::IBuffer {
ui64 BytesRead;
TBuffer Buffer;

IExportChecksum::TPtr Checksum;

TString ErrorString;
}; // TS3BufferRaw

Expand Down
13 changes: 9 additions & 4 deletions ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ class TS3BufferZstd: public TS3BufferRaw {

public:
explicit TS3BufferZstd(int compressionLevel,
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes)
: TS3BufferRaw(columns, maxRows, maxBytes)
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes,
bool enableChecksums)
: TS3BufferRaw(columns, maxRows, maxBytes, enableChecksums)
, CompressionLevel(compressionLevel)
, MinBytes(minBytes)
, Context(ZSTD_createCCtx())
Expand All @@ -67,6 +68,9 @@ class TS3BufferZstd: public TS3BufferRaw {
return false;
}

if (Checksum) {
Checksum->AddData(BufferRaw);
}
BytesRaw += BufferRaw.size();

auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0};
Expand Down Expand Up @@ -122,9 +126,10 @@ class TS3BufferZstd: public TS3BufferRaw {
}; // TS3BufferZstd

NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes)
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes,
bool enableChecksums)
{
return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes);
return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes, enableChecksums);
}

} // NDataShard
Expand Down
Loading
Loading