Skip to content

Commit

Permalink
Add export checksums (#12728)
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Dec 25, 2024
1 parent c19b139 commit 2edc9b5
Show file tree
Hide file tree
Showing 27 changed files with 398 additions and 56 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,5 @@ message TFeatureFlags {
optional bool EnableDataShardInMemoryStateMigration = 159 [default = true];
optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false];
optional bool DisableLocalDBEraseCache = 161 [default = false];
optional bool EnableExportChecksums = 162 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ message TBackupTask {

optional uint64 SnapshotStep = 14;
optional uint64 SnapshotTxId = 15;
optional bool EnableChecksums = 16; // currently available for s3
}

message TRestoreTask {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableParameterizedDecimal)
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC)
FEATURE_FLAG_SETTER(EnableFollowerStats)
FEATURE_FLAG_SETTER(EnableExportChecksums)

#undef FEATURE_FLAG_SETTER
};
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/datashard/backup_restore_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) {
FullBackups.emplace(fb->SnapshotVts, fb);
}

void TMetadata::SetVersion(ui64 version) {
Version = version;
}

TString TMetadata::Serialize() const {
NJson::TJsonMap m;
m["version"] = 0;
m["version"] = Version;
NJson::TJsonArray fullBackups;
for (auto &[tp, _] : FullBackups) {
NJson::TJsonMap backupMap;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/backup_restore_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,15 @@ class TMetadata {
void AddFullBackup(TFullBackupMetadata::TPtr fullBackup);
void AddLog(TLogMetadata::TPtr log);
void SetConsistencyKey(const TString& key);
void SetVersion(ui64 version);

TString Serialize() const;
static TMetadata Deserialize(const TString& metadata);
private:
TString ConsistencyKey;
TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups;
TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs;
ui64 Version = 0;
};

} // NBackupRestore
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/tx/datashard/backup_restore_traits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ TString DataFileExtension(EDataFormat format, ECompressionCodec codec) {
return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str());
}

TString PermissionsKeySuffix() {
return "permissions.pb";
}

TString SchemeKeySuffix() {
return "scheme.pb";
}

TString MetadataKeySuffix() {
return "metadata.json";
}

TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec) {
const auto ext = DataFileExtension(format, codec);
return Sprintf("data_%02d%s", n, ext.c_str());
}

TString ChecksumKey(const TString& objKey) {
return objKey + ".sha256";
}

} // NBackupRestoreTraits
} // NDataShard
} // NKikimr
20 changes: 5 additions & 15 deletions ydb/core/tx/datashard/backup_restore_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,12 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur);

TString DataFileExtension(EDataFormat format, ECompressionCodec codec);

inline TString SchemeKey(const TString& objKeyPattern) {
return Sprintf("%s/scheme.pb", objKeyPattern.c_str());
}

inline TString PermissionsKey(const TString& objKeyPattern) {
return Sprintf("%s/permissions.pb", objKeyPattern.c_str());
}
TString PermissionsKeySuffix();
TString SchemeKeySuffix();
TString MetadataKeySuffix();
TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec);

inline TString MetadataKey(const TString& objKeyPattern) {
return Sprintf("%s/metadata.json", objKeyPattern.c_str());
}

inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) {
const auto ext = DataFileExtension(format, codec);
return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str());
}
TString ChecksumKey(const TString& objKey);

} // NBackupRestoreTraits
} // NDataShard
Expand Down
39 changes: 39 additions & 0 deletions ydb/core/tx/datashard/export_checksum.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "export_checksum.h"

#include <openssl/sha.h>

#include <util/string/hex.h>

namespace NKikimr::NDataShard {

class TSHA256 : public IExportChecksum {
public:
TSHA256() {
SHA256_Init(&Context);
}

void AddData(TStringBuf data) override {
SHA256_Update(&Context, data.data(), data.size());
}

TString Serialize() override {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_Final(hash, &Context);
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

private:
SHA256_CTX Context;
};

TString ComputeExportChecksum(TStringBuf data) {
IExportChecksum::TPtr checksum(CreateExportChecksum());
checksum->AddData(data);
return checksum->Serialize();
}

IExportChecksum* CreateExportChecksum() {
return new TSHA256();
}

} // NKikimr::NDataShard
20 changes: 20 additions & 0 deletions ydb/core/tx/datashard/export_checksum.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <util/generic/string.h>

namespace NKikimr::NDataShard {

class IExportChecksum {
public:
using TPtr = std::unique_ptr<IExportChecksum>;

virtual ~IExportChecksum() = default;

virtual void AddData(TStringBuf data) = 0;
virtual TString Serialize() = 0;
};

IExportChecksum* CreateExportChecksum();
TString ComputeExportChecksum(TStringBuf data);

} // NKikimr::NDataShard
5 changes: 3 additions & 2 deletions ydb/core/tx/datashard/export_s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ class TS3Export: public IExport {

switch (CodecFromTask(Task)) {
case ECompressionCodec::None:
return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes);
return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes, Task.GetEnableChecksums());
case ECompressionCodec::Zstd:
return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes);
return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows,
maxBytes, minBytes, Task.GetEnableChecksums());
case ECompressionCodec::Invalid:
Y_ABORT("unreachable");
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/export_s3_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ namespace NKikimr {
namespace NDataShard {

NExportScan::IBuffer* CreateS3ExportBufferRaw(
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes);
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, bool enableChecksums);

NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes);
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, bool enableChecksums);

} // NDataShard
} // NKikimr
Expand Down
26 changes: 21 additions & 5 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
namespace NKikimr {
namespace NDataShard {

TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit)
TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
: Columns(columns)
, RowsLimit(rowsLimit)
, BytesLimit(bytesLimit)
, Rows(0)
, BytesRead(0)
, Checksum(enableChecksums ? CreateExportChecksum() : nullptr)
{
}

Expand Down Expand Up @@ -154,7 +155,17 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
TBufferOutput out(Buffer);
ErrorString.clear();
return Collect(row, out);

size_t beforeSize = Buffer.Size();
if (!Collect(row, out)) {
return false;
}

if (Checksum) {
TStringBuf data(Buffer.Data(), Buffer.Size());
Checksum->AddData(data.Tail(beforeSize));
}
return true;
}

IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
Expand All @@ -167,7 +178,12 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats&
}

stats.BytesSent = buffer->Size();
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);

if (Checksum && last) {
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Serialize());
} else {
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
}
}

void TS3BufferRaw::Clear() {
Expand All @@ -189,9 +205,9 @@ TMaybe<TBuffer> TS3BufferRaw::Flush(bool) {
}

NExportScan::IBuffer* CreateS3ExportBufferRaw(
const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit)
const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
{
return new TS3BufferRaw(columns, rowsLimit, bytesLimit);
return new TS3BufferRaw(columns, rowsLimit, bytesLimit, enableChecksums);
}

} // NDataShard
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/export_s3_buffer_raw.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TS3BufferRaw: public NExportScan::IBuffer {
using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow

public:
explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit);
explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums);

void ColumnsOrder(const TVector<ui32>& tags) override;
bool Collect(const NTable::IScan::TRow& row) override;
Expand Down Expand Up @@ -42,6 +42,8 @@ class TS3BufferRaw: public NExportScan::IBuffer {
ui64 BytesRead;
TBuffer Buffer;

IExportChecksum::TPtr Checksum;

TString ErrorString;
}; // TS3BufferRaw

Expand Down
13 changes: 9 additions & 4 deletions ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ class TS3BufferZstd: public TS3BufferRaw {

public:
explicit TS3BufferZstd(int compressionLevel,
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes)
: TS3BufferRaw(columns, maxRows, maxBytes)
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes,
bool enableChecksums)
: TS3BufferRaw(columns, maxRows, maxBytes, enableChecksums)
, CompressionLevel(compressionLevel)
, MinBytes(minBytes)
, Context(ZSTD_createCCtx())
Expand All @@ -67,6 +68,9 @@ class TS3BufferZstd: public TS3BufferRaw {
return false;
}

if (Checksum) {
Checksum->AddData(BufferRaw);
}
BytesRaw += BufferRaw.size();

auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0};
Expand Down Expand Up @@ -122,9 +126,10 @@ class TS3BufferZstd: public TS3BufferRaw {
}; // TS3BufferZstd

NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes)
const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes,
bool enableChecksums)
{
return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes);
return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes, enableChecksums);
}

} // NDataShard
Expand Down
Loading

0 comments on commit 2edc9b5

Please sign in to comment.