diff --git a/requirements_frozen.txt b/requirements_frozen.txt index 86fd996af865..d9a5872dbf24 100644 --- a/requirements_frozen.txt +++ b/requirements_frozen.txt @@ -13,7 +13,7 @@ Deprecated==1.2.13 distro==1.6.0 downloadutil==1.0.2 idna==3.2 -importlib-metadata==4.10.0 +importlib-metadata==4.8.3 iniconfig==1.1.1 mypy==0.910 mypy-extensions==0.4.3 @@ -43,4 +43,4 @@ typing-utils==0.1.0 urllib3==1.26.7 wrapt==1.12.1 yugabyte-pycommon==1.9.15 -zipp==3.7.0 +zipp==3.6.0 diff --git a/src/yb/consensus/log.cc b/src/yb/consensus/log.cc index 20fd3d0452a0..acae19d9b8e5 100644 --- a/src/yb/consensus/log.cc +++ b/src/yb/consensus/log.cc @@ -787,9 +787,9 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, // If the size of this entry overflows the current segment, get a new one. if (allocation_state() == SegmentAllocationState::kAllocationNotStarted) { - if ((active_segment_->Size() + entry_batch_bytes + 4) > cur_max_segment_size_) { + if (active_segment_->Size() + entry_batch_bytes + kEntryHeaderSize > cur_max_segment_size_) { LOG_WITH_PREFIX(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. " - << "Starting new segment allocation. "; + << "Starting new segment allocation."; RETURN_NOT_OK(AsyncAllocateSegment()); if (!options_.async_preallocate_segments) { RETURN_NOT_OK(RollOver()); @@ -812,7 +812,7 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, } if (metrics_) { - metrics_->bytes_logged->IncrementBy(entry_batch_bytes); + metrics_->bytes_logged->IncrementBy(active_segment_->written_offset() - start_offset); } // Populate the offset and sequence number for the entry batch if we did a WAL write. diff --git a/src/yb/consensus/log_util.cc b/src/yb/consensus/log_util.cc index 6a97d353fd91..a053d8fc31fb 100644 --- a/src/yb/consensus/log_util.cc +++ b/src/yb/consensus/log_util.cc @@ -33,6 +33,7 @@ #include "yb/consensus/log_util.h" #include +#include #include #include @@ -879,12 +880,14 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& data) { uint32_t header_crc = crc::Crc32c(&header_buf, 8); InlineEncodeFixed32(&header_buf[8], header_crc); - // Write the header to the file, followed by the batch data itself. - RETURN_NOT_OK(writable_file_->Append(Slice(header_buf, sizeof(header_buf)))); - written_offset_ += sizeof(header_buf); + std::array slices = { + Slice(header_buf, sizeof(header_buf)), + Slice(data), + }; - RETURN_NOT_OK(writable_file_->Append(data)); - written_offset_ += data.size(); + // Write the header to the file, followed by the batch data itself. + RETURN_NOT_OK(writable_file_->AppendSlices(slices.data(), slices.size())); + written_offset_ += sizeof(header_buf) + data.size(); return Status::OK(); } diff --git a/src/yb/encryption/encrypted_file_factory.cc b/src/yb/encryption/encrypted_file_factory.cc index 4832d5c73c8d..78eae833072f 100644 --- a/src/yb/encryption/encrypted_file_factory.cc +++ b/src/yb/encryption/encrypted_file_factory.cc @@ -42,12 +42,35 @@ class EncryptedWritableFile : public WritableFileWrapper { ~EncryptedWritableFile() {} Status Append(const Slice& data) override { - if (data.size() > 0) { - uint8_t* buf = static_cast(EncryptionBuffer::Get()->GetBuffer(data.size())); - RETURN_NOT_OK(stream_->Encrypt(Size() - header_size_, data, buf)); - RETURN_NOT_OK(WritableFileWrapper::Append(Slice(buf, data.size()))); + if (data.empty()) { + return Status::OK(); } - return Status::OK(); + + uint8_t* buf = static_cast(EncryptionBuffer::Get()->GetBuffer(data.size())); + RETURN_NOT_OK(stream_->Encrypt(Size() - header_size_, data, buf)); + return WritableFileWrapper::Append(Slice(buf, data.size())); + } + + Status AppendSlices(const Slice* slices, size_t num) override { + auto end = slices + num; + size_t total_size = 0; + for (auto it = slices; it != end; ++it) { + total_size += it->size(); + } + if (total_size == 0) { + return Status::OK(); + } + + uint8_t* buf = static_cast(EncryptionBuffer::Get()->GetBuffer(total_size)); + auto write_pos = buf; + auto offset = Size() - header_size_; + for (auto it = slices; it != end; ++it) { + RETURN_NOT_OK(stream_->Encrypt(offset, *it, write_pos)); + write_pos += it->size(); + offset += it->size(); + } + + return WritableFileWrapper::Append(Slice(buf, total_size)); } private: