Skip to content

Commit

Permalink
Optimized batch processing (#10131)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 6, 2024
1 parent 54b866e commit 791558d
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 87 deletions.
90 changes: 72 additions & 18 deletions ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ void TBatch::Unpack() {
PackedData.Clear();
}

void TBatch::UnpackTo(TVector<TClientBlob> *blobs)
void TBatch::UnpackTo(TVector<TClientBlob> *blobs) const
{
Y_ABORT_UNLESS(PackedData.size());
auto type = Header.GetFormat();
Expand All @@ -446,7 +446,7 @@ NScheme::TDataRef GetChunk(const char*& data, const char *end)
return NScheme::TDataRef(data - size, size);
}

void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
Y_ABORT_UNLESS(Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed);
Y_ABORT_UNLESS(PackedData.size());
ui32 totalBlobs = Header.GetCount() + Header.GetInternalPartsCount();
Expand Down Expand Up @@ -606,7 +606,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
}
}

void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) {
void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) const {
Y_ABORT_UNLESS(Header.GetFormat() == NKikimrPQ::TBatchHeader::EUncompressed);
Y_ABORT_UNLESS(PackedData.size());
ui32 shift = 0;
Expand Down Expand Up @@ -640,7 +640,7 @@ ui32 TBatch::FindPos(const ui64 offset, const ui16 partNo) const {
void THead::Clear()
{
Offset = PartNo = PackedSize = 0;
Batches.clear();
ClearBatches();
}

ui64 THead::GetNextOffset() const
Expand All @@ -650,11 +650,7 @@ ui64 THead::GetNextOffset() const

ui16 THead::GetInternalPartsCount() const
{
ui16 res = 0;
for (auto& b : Batches) {
res += b.GetInternalPartsCount();
}
return res;
return InternalPartsCount;
}

ui32 THead::GetCount() const
Expand All @@ -675,15 +671,73 @@ IOutputStream& operator <<(IOutputStream& out, const THead& value)
}

ui32 THead::FindPos(const ui64 offset, const ui16 partNo) const {
ui32 i = 0;
for (; i < Batches.size(); ++i) {
//this batch contains blobs with position bigger than requested
if (Batches[i].GetOffset() > offset || Batches[i].GetOffset() == offset && Batches[i].GetPartNo() > partNo)
break;
}
if (i == 0)
if (Batches.empty()) {
return Max<ui32>();
return i - 1;
}

ui32 i = Batches.size() - 1;
while (i > 0 && Batches[i].IsGreaterThan(offset, partNo)) {
--i;
}

if (i == 0) {
if (Batches[i].IsGreaterThan(offset, partNo)) {
return Max<ui32>();
} else {
return 0;
}
}

return i;
}

void THead::AddBatch(const TBatch& batch) {
auto& b = Batches.emplace_back(batch);
InternalPartsCount += b.GetInternalPartsCount();
}

void THead::ClearBatches() {
Batches.clear();
InternalPartsCount = 0;
}

const std::deque<TBatch>& THead::GetBatches() const {
return Batches;
}

const TBatch& THead::GetBatch(ui32 idx) const {
return Batches.at(idx);
}

const TBatch& THead::GetLastBatch() const {
Y_ABORT_UNLESS(!Batches.empty());
return Batches.back();
}

TBatch THead::ExtractFirstBatch() {
Y_ABORT_UNLESS(!Batches.empty());
auto batch = std::move(Batches.front());
InternalPartsCount -= batch.GetInternalPartsCount();
Batches.pop_front();
return batch;
}

THead::TBatchAccessor THead::MutableBatch(ui32 idx) {
Y_ABORT_UNLESS(idx < Batches.size());
return TBatchAccessor(Batches[idx]);
}

THead::TBatchAccessor THead::MutableLastBatch() {
Y_ABORT_UNLESS(!Batches.empty());
return TBatchAccessor(Batches.back());
}

void THead::AddBlob(const TClientBlob& blob) {
Y_ABORT_UNLESS(!Batches.empty());
auto& batch = Batches.back();
InternalPartsCount -= batch.GetInternalPartsCount();
batch.AddBlob(blob);
InternalPartsCount += batch.GetInternalPartsCount();
}

TPartitionedBlob::TRenameFormedBlobInfo::TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size) :
Expand Down Expand Up @@ -832,7 +886,7 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio

GlueHead = GlueNewHead = false;
if (!Blobs.empty()) {
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
auto batch = TBatch::FromBlobs(Offset, std::move(Blobs));
Blobs.clear();
batch.Pack();
Y_ABORT_UNLESS(batch.Packed);
Expand Down
72 changes: 52 additions & 20 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,38 +121,30 @@ struct TBatch {
TVector<ui32> InternalPartsPos;
NKikimrPQ::TBatchHeader Header;
TBuffer PackedData;

TBatch()
: Packed(false)
{
PackedData.Reserve(8_MB);
}

TBatch(const ui64 offset, const ui16 partNo, const TVector<TClientBlob>& blobs)
: Packed(false)
TBatch(const ui64 offset, const ui16 partNo)
: TBatch()
{
PackedData.Reserve(8_MB);
Header.SetOffset(offset);
Header.SetPartNo(partNo);
Header.SetUnpackedSize(0);
Header.SetCount(0);
Header.SetInternalPartsCount(0);
for (auto& b : blobs) {
AddBlob(b);
}
}

TBatch(const ui64 offset, const ui16 partNo, const std::deque<TClientBlob>& blobs)
: Packed(false)
{
PackedData.Reserve(8_MB);
Header.SetOffset(offset);
Header.SetPartNo(partNo);
Header.SetUnpackedSize(0);
Header.SetCount(0);
Header.SetInternalPartsCount(0);
static TBatch FromBlobs(const ui64 offset, std::deque<TClientBlob>&& blobs) {
Y_ABORT_UNLESS(!blobs.empty());
TBatch batch(offset, blobs.front().GetPartNo());
for (auto& b : blobs) {
AddBlob(b);
batch.AddBlob(b);
}
return batch;
}

void AddBlob(const TClientBlob &b) {
Expand Down Expand Up @@ -187,6 +179,9 @@ struct TBatch {
ui16 GetInternalPartsCount() const {
return Header.GetInternalPartsCount();
}
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
}

TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
: Packed(true)
Expand All @@ -198,9 +193,9 @@ struct TBatch {
ui32 GetPackedSize() const { Y_ABORT_UNLESS(Packed); return sizeof(ui16) + PackedData.size() + Header.ByteSize(); }
void Pack();
void Unpack();
void UnpackTo(TVector<TClientBlob> *result);
void UnpackToType0(TVector<TClientBlob> *result);
void UnpackToType1(TVector<TClientBlob> *result);
void UnpackTo(TVector<TClientBlob> *result) const;
void UnpackToType0(TVector<TClientBlob> *result) const;
void UnpackToType1(TVector<TClientBlob> *result) const;

void SerializeTo(TString& res) const;

Expand Down Expand Up @@ -232,14 +227,39 @@ class TBlobIterator {
ui16 InternalPartsCount;
};

class TPartitionedBlob;

//THead represents bathes, stored in head(at most 8 Mb)
struct THead {
std::deque<TBatch> Batches;
//all batches except last must be packed
// BlobsSize <= 512Kb
// size of Blobs after packing must be <= BlobsSize
//otherwise head will be compacted not in total, some blobs will still remain in head
//PackedSize + BlobsSize must be <= 8Mb
private:
std::deque<TBatch> Batches;
ui16 InternalPartsCount = 0;

friend class TPartitionedBlob;

class TBatchAccessor {
TBatch& Batch;

public:
explicit TBatchAccessor(TBatch& batch)
: Batch(batch)
{}

void Pack() {
Batch.Pack();
}

void Unpack() {
Batch.Unpack();
}
};

public:
ui64 Offset;
ui16 PartNo;
ui32 PackedSize;
Expand All @@ -261,6 +281,18 @@ struct THead {
//return Max<ui32> if not such pos in head
//returns batch with such position
ui32 FindPos(const ui64 offset, const ui16 partNo) const;

void AddBatch(const TBatch& batch);
void ClearBatches();
const std::deque<TBatch>& GetBatches() const;
const TBatch& GetBatch(ui32 idx) const;
const TBatch& GetLastBatch() const;
TBatchAccessor MutableBatch(ui32 idx);
TBatchAccessor MutableLastBatch();
TBatch ExtractFirstBatch();
void AddBlob(const TClientBlob& blob);

friend IOutputStream& operator <<(IOutputStream& out, const THead& value);
};

IOutputStream& operator <<(IOutputStream& out, const THead& value);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
Y_ABORT_UNLESS(size == read.GetValue().size());

for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
head.Batches.emplace_back(it.GetBatch());
head.AddBatch(it.GetBatch());
}
head.PackedSize += size;

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,13 +626,13 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(
Y_ABORT_UNLESS(pos != Max<ui32>());
}
ui32 lastBlobSize = 0;
for (;pos < Head.Batches.size(); ++pos) {
for (;pos < Head.GetBatches().size(); ++pos) {

TVector<TClientBlob> blobs;
Head.Batches[pos].UnpackTo(&blobs);
Head.GetBatch(pos).UnpackTo(&blobs);
ui32 i = 0;
ui64 offset = Head.Batches[pos].GetOffset();
ui16 pno = Head.Batches[pos].GetPartNo();
ui64 offset = Head.GetBatch(pos).GetOffset();
ui16 pno = Head.GetBatch(pos).GetPartNo();
for (; i < blobs.size(); ++i) {

ui64 curOffset = offset;
Expand Down
47 changes: 23 additions & 24 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
Head.PackedSize = 0;
Head.Offset = NewHead.Offset;
Head.PartNo = NewHead.PartNo; //no partNo at this point
Head.Batches.clear();
Head.ClearBatches();
}

while (!CompactedKeys.empty()) {
Expand All @@ -430,9 +430,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
} // head cleared, all data moved to body

//append Head with newHead
while (!NewHead.Batches.empty()) {
Head.Batches.push_back(NewHead.Batches.front());
NewHead.Batches.pop_front();
while (!NewHead.GetBatches().empty()) {
Head.AddBatch(NewHead.ExtractFirstBatch());
}
Head.PackedSize += NewHead.PackedSize;

Expand Down Expand Up @@ -1326,22 +1325,22 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
ctx);
ui32 countOfLastParts = 0;
for (auto& x : PartitionedBlob.GetClientBlobs()) {
if (NewHead.Batches.empty() || NewHead.Batches.back().Packed) {
NewHead.Batches.emplace_back(curOffset, x.GetPartNo(), TVector<TClientBlob>());
if (NewHead.GetBatches().empty() || NewHead.GetLastBatch().Packed) {
NewHead.AddBatch(TBatch(curOffset, x.GetPartNo()));
NewHead.PackedSize += GetMaxHeaderSize(); //upper bound for packed size
}
if (x.IsLastPart()) {
++countOfLastParts;
}
Y_ABORT_UNLESS(!NewHead.Batches.back().Packed);
NewHead.Batches.back().AddBlob(x);
Y_ABORT_UNLESS(!NewHead.GetLastBatch().Packed);
NewHead.AddBlob(x);
NewHead.PackedSize += x.GetBlobSize();
if (NewHead.Batches.back().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) {
NewHead.Batches.back().Pack();
NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob
if (NewHead.GetLastBatch().GetUnpackedSize() >= BATCH_UNPACK_SIZE_BORDER) {
NewHead.MutableLastBatch().Pack();
NewHead.PackedSize += NewHead.GetLastBatch().GetPackedSize(); //add real packed size for this blob

NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
NewHead.PackedSize -= NewHead.GetLastBatch().GetUnpackedSize();
}
}

Expand Down Expand Up @@ -1418,15 +1417,15 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
valueD.reserve(res.second);
ui32 pp = Head.FindPos(key.GetOffset(), key.GetPartNo());
if (pp < Max<ui32>() && key.GetOffset() < EndOffset) { //this batch trully contains this offset
Y_ABORT_UNLESS(pp < Head.Batches.size());
Y_ABORT_UNLESS(Head.Batches[pp].GetOffset() == key.GetOffset());
Y_ABORT_UNLESS(Head.Batches[pp].GetPartNo() == key.GetPartNo());
for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here
Y_ABORT_UNLESS(Head.Batches[pp].Packed);
Head.Batches[pp].SerializeTo(valueD);
Y_ABORT_UNLESS(pp < Head.GetBatches().size());
Y_ABORT_UNLESS(Head.GetBatch(pp).GetOffset() == key.GetOffset());
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
Head.GetBatch(pp).SerializeTo(valueD);
}
}
for (auto& b : NewHead.Batches) {
for (auto& b : NewHead.GetBatches()) {
Y_ABORT_UNLESS(b.Packed);
b.SerializeTo(valueD);
}
Expand Down Expand Up @@ -1703,7 +1702,7 @@ void TPartition::BeginAppendHeadWithNewWrites(const TActorContext& ctx)
NewHead.PartNo = 0;
NewHead.PackedSize = 0;

Y_ABORT_UNLESS(NewHead.Batches.empty());
Y_ABORT_UNLESS(NewHead.GetBatches().empty());

Parameters->OldPartsCleared = false;
Parameters->HeadCleared = (Head.PackedSize == 0);
Expand Down Expand Up @@ -1748,12 +1747,12 @@ void TPartition::EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, co

UpdateWriteBufferIsFullState(ctx.Now());

if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
NewHead.Batches.back().Pack();
NewHead.PackedSize += NewHead.Batches.back().GetPackedSize(); //add real packed size for this blob
if (!NewHead.GetBatches().empty() && !NewHead.GetLastBatch().Packed) {
NewHead.MutableLastBatch().Pack();
NewHead.PackedSize += NewHead.GetLastBatch().GetPackedSize(); //add real packed size for this blob

NewHead.PackedSize -= GetMaxHeaderSize(); //instead of upper bound
NewHead.PackedSize -= NewHead.Batches.back().GetUnpackedSize();
NewHead.PackedSize -= NewHead.GetLastBatch().GetUnpackedSize();
}

Y_ABORT_UNLESS((Parameters->HeadCleared ? 0 : Head.PackedSize) + NewHead.PackedSize <= MaxBlobSize); //otherwise last PartitionedBlob.Add must compact all except last cl
Expand Down
Loading

0 comments on commit 791558d

Please sign in to comment.