Skip to content

Commit

Permalink
[#11035] DocDB: Replace two calls to Append with one call to AppendSl…
Browse files Browse the repository at this point in the history
…ices in WritableLogSegment::WriteEntryBatch

Summary:
Currently, we call writable_file_->Append twice in WritableLogSegment::WriteEntryBatch.
It could be replaced with writable_file_->AppendSlices to minimize the amount of system calls.

Test Plan: Jenkins

Reviewers: rthallam

Reviewed By: rthallam

Subscribers: kannan, fhoogland, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D14963
  • Loading branch information
spolitov committed Jan 26, 2022
1 parent d4d77e8 commit 9c6202f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 15 deletions.
4 changes: 2 additions & 2 deletions requirements_frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Deprecated==1.2.13
distro==1.6.0
downloadutil==1.0.2
idna==3.2
importlib-metadata==4.10.0
importlib-metadata==4.8.3
iniconfig==1.1.1
mypy==0.910
mypy-extensions==0.4.3
Expand Down Expand Up @@ -43,4 +43,4 @@ typing-utils==0.1.0
urllib3==1.26.7
wrapt==1.12.1
yugabyte-pycommon==1.9.15
zipp==3.7.0
zipp==3.6.0
6 changes: 3 additions & 3 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -787,9 +787,9 @@ Status Log::DoAppend(LogEntryBatch* entry_batch,

// If the size of this entry overflows the current segment, get a new one.
if (allocation_state() == SegmentAllocationState::kAllocationNotStarted) {
if ((active_segment_->Size() + entry_batch_bytes + 4) > cur_max_segment_size_) {
if (active_segment_->Size() + entry_batch_bytes + kEntryHeaderSize > cur_max_segment_size_) {
LOG_WITH_PREFIX(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. "
<< "Starting new segment allocation. ";
<< "Starting new segment allocation.";
RETURN_NOT_OK(AsyncAllocateSegment());
if (!options_.async_preallocate_segments) {
RETURN_NOT_OK(RollOver());
Expand All @@ -812,7 +812,7 @@ Status Log::DoAppend(LogEntryBatch* entry_batch,
}

if (metrics_) {
metrics_->bytes_logged->IncrementBy(entry_batch_bytes);
metrics_->bytes_logged->IncrementBy(active_segment_->written_offset() - start_offset);
}

// Populate the offset and sequence number for the entry batch if we did a WAL write.
Expand Down
13 changes: 8 additions & 5 deletions src/yb/consensus/log_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "yb/consensus/log_util.h"

#include <algorithm>
#include <array>
#include <limits>
#include <utility>

Expand Down Expand Up @@ -879,12 +880,14 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& data) {
uint32_t header_crc = crc::Crc32c(&header_buf, 8);
InlineEncodeFixed32(&header_buf[8], header_crc);

// Write the header to the file, followed by the batch data itself.
RETURN_NOT_OK(writable_file_->Append(Slice(header_buf, sizeof(header_buf))));
written_offset_ += sizeof(header_buf);
std::array<Slice, 2> slices = {
Slice(header_buf, sizeof(header_buf)),
Slice(data),
};

RETURN_NOT_OK(writable_file_->Append(data));
written_offset_ += data.size();
// Write the header to the file, followed by the batch data itself.
RETURN_NOT_OK(writable_file_->AppendSlices(slices.data(), slices.size()));
written_offset_ += sizeof(header_buf) + data.size();

return Status::OK();
}
Expand Down
33 changes: 28 additions & 5 deletions src/yb/encryption/encrypted_file_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,35 @@ class EncryptedWritableFile : public WritableFileWrapper {
~EncryptedWritableFile() {}

Status Append(const Slice& data) override {
if (data.size() > 0) {
uint8_t* buf = static_cast<uint8_t*>(EncryptionBuffer::Get()->GetBuffer(data.size()));
RETURN_NOT_OK(stream_->Encrypt(Size() - header_size_, data, buf));
RETURN_NOT_OK(WritableFileWrapper::Append(Slice(buf, data.size())));
if (data.empty()) {
return Status::OK();
}
return Status::OK();

uint8_t* buf = static_cast<uint8_t*>(EncryptionBuffer::Get()->GetBuffer(data.size()));
RETURN_NOT_OK(stream_->Encrypt(Size() - header_size_, data, buf));
return WritableFileWrapper::Append(Slice(buf, data.size()));
}

Status AppendSlices(const Slice* slices, size_t num) override {
auto end = slices + num;
size_t total_size = 0;
for (auto it = slices; it != end; ++it) {
total_size += it->size();
}
if (total_size == 0) {
return Status::OK();
}

uint8_t* buf = static_cast<uint8_t*>(EncryptionBuffer::Get()->GetBuffer(total_size));
auto write_pos = buf;
auto offset = Size() - header_size_;
for (auto it = slices; it != end; ++it) {
RETURN_NOT_OK(stream_->Encrypt(offset, *it, write_pos));
write_pos += it->size();
offset += it->size();
}

return WritableFileWrapper::Append(Slice(buf, total_size));
}

private:
Expand Down

0 comments on commit 9c6202f

Please sign in to comment.