Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Update helio, optimize and clean up rdb/snapshot #625

Merged
merged 2 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,12 @@ base::IoBuf& JournalWriter::Accumulated() {
void JournalWriter::Write(uint64_t v) {
uint8_t buf[10];
unsigned len = WritePackedUInt(v, buf);
buf_.EnsureCapacity(sizeof(buf));
memcpy(buf_.AppendBuffer().data(), buf, len);
buf_.CommitWrite(len);
buf_.WriteAndCommit(buf, len);
}

void JournalWriter::Write(std::string_view sv) {
Write(sv.size());
buf_.EnsureCapacity(sv.size());
memcpy(buf_.AppendBuffer().data(), sv.data(), sv.size());
buf_.CommitWrite(sv.size());
buf_.WriteAndCommit(sv.data(), sv.size());
}

void JournalWriter::Write(CmdArgList args) {
Expand Down
4 changes: 4 additions & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,10 @@ template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
return base::LE::LoadT<std::make_unsigned_t<T>>(buf);
}

io::Result<uint8_t> RdbLoaderBase::FetchType() {
return FetchInt<uint8_t>();
}

// -------------- RdbLoader ----------------------------

struct RdbLoader::ObjSettings {
Expand Down
4 changes: 1 addition & 3 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ class RdbLoaderBase {

class OpaqueObjLoader;

::io::Result<uint8_t> FetchType() {
return FetchInt<uint8_t>();
}
io::Result<uint8_t> FetchType();

template <typename T> io::Result<T> FetchInt();

Expand Down
32 changes: 21 additions & 11 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,8 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}

error_code RdbSerializer::SendFullSyncCut(io::Sink* s) {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
return FlushToSink(s);
error_code RdbSerializer::SendFullSyncCut() {
return WriteOpcode(RDB_OPCODE_FULLSYNC_END);
}

error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
Expand All @@ -656,12 +655,10 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
return error_code{};
}

error_code RdbSerializer::FlushToSink(io::Sink* s) {
io::Bytes RdbSerializer::Flush() {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return error_code{};

DVLOG(2) << "FlushToSink " << sz << " bytes";
return mem_buf_.InputBuffer();

if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD ||
compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
Expand All @@ -670,17 +667,30 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
sz = mem_buf_.InputLen();
}

// interrupt point.
RETURN_ON_ERR(s->Write(mem_buf_.InputBuffer()));
mem_buf_.ConsumeInput(sz);
return mem_buf_.InputBuffer();
}

error_code RdbSerializer::FlushToSink(io::Sink* s) {
auto bytes = Flush();
if (bytes.empty())
return error_code{};

DVLOG(2) << "FlushToSink " << bytes.size() << " bytes";

// interrupt point.
RETURN_ON_ERR(s->Write(bytes));
mem_buf_.ConsumeInput(bytes.size());
return error_code{};
}

size_t RdbSerializer::SerializedLen() const {
return mem_buf_.InputLen();
}

void RdbSerializer::Clear() {
mem_buf_.Clear();
}

error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) {
for (const auto& entry : entries) {
journal_writer_.Write(entry);
Expand Down Expand Up @@ -1045,7 +1055,7 @@ error_code RdbSaver::SaveBody(const Cancellation* cll, RdbTypeFreqMap* freq_map)
RETURN_ON_ERR(impl_->serializer()->FlushToSink(impl_->sink()));

if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut(impl_->sink());
impl_->serializer()->SendFullSyncCut();
} else {
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
error_code io_error = impl_->ConsumeChannel(cll);
Expand Down
30 changes: 19 additions & 11 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,39 +118,47 @@ class RdbSerializer {

~RdbSerializer();

std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
// Get access to internal buffer, compressed, if enabled.
io::Bytes Flush();

// Internal buffer size. Might shrink after flush due to compression.
size_t SerializedLen() const;

// Flush internal buffer to sink.
std::error_code FlushToSink(io::Sink* s);

// Clear internal buffer contents.
void Clear();

std::error_code SelectDb(uint32_t dbid);

// Must be called in the thread to which `it` belongs.
// Returns the serialized rdb_type or the error.
// expire_ms = 0 means no expiry.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code SaveString(std::string_view val);

std::error_code SaveLen(size_t len);
std::error_code SaveString(std::string_view val);
std::error_code SaveString(const uint8_t* buf, size_t len) {
return SaveString(std::string_view{reinterpret_cast<const char*>(buf), len});
return SaveString(io::View(io::Bytes{buf, len}));
}

std::error_code FlushToSink(io::Sink* s);
std::error_code SaveLen(size_t len);

// This would work for either string or an object.
// The arg pv is taken from it->second if accessing
// this by finding the key. This function is used
// for the dump command - thus it is public function
std::error_code SaveValue(const PrimeValue& pv);

size_t SerializedLen() const;
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}

// Write journal entries as an embedded journal blob.
std::error_code WriteJournalEntries(absl::Span<const journal::Entry> entries);

// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut(io::Sink* s);
std::error_code SendFullSyncCut();

private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
Expand Down
16 changes: 7 additions & 9 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,13 @@ error_code Replica::InitiatePSync() {

// TODO: handle gracefully...
CHECK_EQ(0, memcmp(token->data(), buf, kRdbEofMarkSize));
CHECK(chained.unused_prefix().empty());
CHECK(chained.UnusedPrefix().empty());
} else {
CHECK_EQ(0u, loader.Leftover().size());
CHECK_EQ(snapshot_size, loader.bytes_read());
}

CHECK(ps.unused_prefix().empty());
CHECK(ps.UnusedPrefix().empty());
io_buf.ConsumeInput(io_buf.InputLen());
last_io_time_ = sock_thread->GetMonotonicTimeNs();
}
Expand Down Expand Up @@ -653,9 +653,9 @@ error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* c

parser_.reset(new RedisParser{false}); // client mode

leftover_buf_.reset(new base::IoBuf(128));
leftover_buf_.emplace(128);
unsigned consumed = 0;
RETURN_ON_ERR(ReadRespReply(leftover_buf_.get(), &consumed)); // uses parser_
RETURN_ON_ERR(ReadRespReply(&*leftover_buf_, &consumed)); // uses parser_

if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})) {
LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer());
Expand Down Expand Up @@ -728,12 +728,10 @@ void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, C
}

// Keep loader leftover.
io::Bytes unused = chained_tail.unused_prefix();
io::Bytes unused = chained_tail.UnusedPrefix();
if (unused.size() > 0) {
leftover_buf_.reset(new base::IoBuf{unused.size()});
auto mut_bytes = leftover_buf_->AppendBuffer();
memcpy(mut_bytes.data(), unused.data(), unused.size());
leftover_buf_->CommitWrite(unused.size());
leftover_buf_.emplace(unused.size());
leftover_buf_->WriteAndCommit(unused.data(), unused.size());
} else {
leftover_buf_.reset();
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class Replica {
// Guard operations where flows might be in a mixed state (transition/setup)
::boost::fibers::mutex flows_op_mu_;

std::unique_ptr<base::IoBuf> leftover_buf_;
std::optional<base::IoBuf> leftover_buf_;
std::unique_ptr<facade::RedisParser> parser_;
facade::RespVec resp_args_;
facade::CmdArgVec cmd_str_args_;
Expand Down
28 changes: 10 additions & 18 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
journal_cb_id_ = journal->RegisterOnChange(move(journal_cb));
}

default_buffer_.reset(new io::StringFile);
default_serializer_.reset(new RdbSerializer(compression_mode_));

VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
Expand Down Expand Up @@ -158,7 +157,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {

// TODO: investigate why a single byte gets stuck and does not arrive to replica
for (unsigned i = 10; i > 1; i--)
CHECK(!default_serializer_->SendFullSyncCut(default_buffer_.get()));
CHECK(!default_serializer_->SendFullSyncCut());
FlushDefaultBuffer(true);

VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << stats_.serialized << "/"
Expand Down Expand Up @@ -216,7 +215,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
}

if (tmp_serializer) {
FlushTmpSerializer(db_index, &*tmp_serializer);
PushBytesToChannel(db_index, tmp_serializer->Flush());
}

return result;
Expand All @@ -237,22 +236,21 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
++type_freq_map_[*res];
}

void SliceSnapshot::PushFileToChannel(DbIndex db_index, io::StringFile* sfile) {
dest_->Push(GetDbRecord(db_index, std::move(sfile->val)));
void SliceSnapshot::PushBytesToChannel(DbIndex db_index, io::Bytes bytes) {
dest_->Push(GetDbRecord(db_index, std::string{io::View(bytes)}));
}

bool SliceSnapshot::FlushDefaultBuffer(bool force) {
if (!force && default_serializer_->SerializedLen() < 4096)
return false;

CHECK(!default_serializer_->FlushToSink(default_buffer_.get()));

if (default_buffer_->val.empty())
auto bytes = default_serializer_->Flush();
if (bytes.empty())
return false;

VLOG(2) << "FlushDefaultBuffer " << default_buffer_->val.size() << " bytes";

PushFileToChannel(current_db_, default_buffer_.get());
VLOG(2) << "FlushDefaultBuffer " << bytes.size() << " bytes";
PushBytesToChannel(current_db_, bytes);
default_serializer_->Clear();
return true;
}

Expand Down Expand Up @@ -291,7 +289,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1});

if (tmp_serializer) {
FlushTmpSerializer(entry.dbid, &*tmp_serializer);
PushBytesToChannel(entry.dbid, tmp_serializer->Flush());
} else {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
Expand Down Expand Up @@ -320,10 +318,4 @@ SliceSnapshot::DbRecord SliceSnapshot::GetDbRecord(DbIndex db_index, std::string
return DbRecord{.db_index = db_index, .id = id, .value = std::move(value)};
}

void SliceSnapshot::FlushTmpSerializer(DbIndex db_index, RdbSerializer* serializer) {
io::StringFile sfile{};
error_code ec = serializer->FlushToSink(&sfile);
CHECK(!ec && !sfile.val.empty());
PushFileToChannel(db_index, &sfile);
}
} // namespace dfly
47 changes: 21 additions & 26 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,27 @@ struct Entry;

class RdbSerializer;

//┌────────────────┐ ┌─────────────┐
//│IterateBucketsFb│ │ OnDbChange │
//└──────┬─────────┘ └─┬───────────┘
// │ │ OnDbChange forces whole bucket to be
// ▼ ▼ serialized if iterate didn't reach it yet
//┌──────────────────────────┐
//│ SerializeBucket │ Both might fall back to a temporary serializer
//└────────────┬─────────────┘ if default is used on another db index
// │
// | Channel is left open in journal streaming mode
// ▼
//┌──────────────────────────┐ ┌─────────────────────────┐
//│ SerializeEntry │ ◄────────┤ OnJournalEntry │
//└─────────────┬────────────┘ └─────────────────────────┘
// ┌────────────────┐ ┌─────────────┐
// │IterateBucketsFb│ │ OnDbChange │
// └──────┬─────────┘ └─┬───────────┘
// │ │ OnDbChange forces whole bucket to be
// ▼ ▼ serialized if iterate didn't reach it yet
// ┌──────────────────────────┐
// │ SerializeBucket │ Both might fall back to a temporary serializer
// └────────────┬─────────────┘ if default is used on another db index
// │
// PushFileToChannel Default buffer gets flushed on iteration,
// │ temporary on destruction
// | Channel is left open in journal streaming mode
// ▼
//┌──────────────────────────────┐
//│ dest->Push(buffer) │
//└──────────────────────────────┘
// ┌──────────────────────────┐ ┌─────────────────────────┐
// │ SerializeEntry │ ◄────────┤ OnJournalEntry │
// └─────────────┬────────────┘ └─────────────────────────┘
// │
// PushBytesToChannel Default buffer gets flushed on iteration,
// │ temporary on destruction
// ▼
// ┌──────────────────────────────┐
// │ dest->Push(buffer) │
// └──────────────────────────────┘

// SliceSnapshot is used for iterating over a shard at a specified point-in-time
// and submitting all values to an output channel.
Expand Down Expand Up @@ -95,8 +95,8 @@ class SliceSnapshot {
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
std::optional<uint64_t> expire, RdbSerializer* serializer);

// Push StringFile buffer to channel.
void PushFileToChannel(DbIndex db_index, io::StringFile* sfile);
// Push byte slice to channel.
void PushBytesToChannel(DbIndex db_index, io::Bytes bytes);
Comment on lines +98 to +99
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea:

If I'll be implementing an async writer with the IoBuf swapping technique, then I can use it for single shard snapshots as well instead of a channel with small portions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it important? Take into account that iobuf is not optimized for being a queue. It may realloc heavily for variable size workloads. I think it's too soon to start optimizing the code towards a winning approach.


// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
Expand All @@ -115,9 +115,6 @@ class SliceSnapshot {
// Convert value into DbRecord.
DbRecord GetDbRecord(DbIndex db_index, std::string value);

// Flush internals of a temporary serializer.
void FlushTmpSerializer(DbIndex db_index, RdbSerializer* serializer);

public:
uint64_t snapshot_version() const {
return snapshot_version_;
Expand All @@ -144,8 +141,6 @@ class SliceSnapshot {

DbIndex current_db_;

// TODO : drop default_buffer from this class, we dont realy need it.
std::unique_ptr<io::StringFile> default_buffer_; // filled by default_serializer_
Comment on lines -147 to -148
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way removing this is intended

std::unique_ptr<RdbSerializer> default_serializer_;

::boost::fibers::mutex mu_;
Expand Down