Skip to content

Commit

Permalink
feat(rdb save): serializer flush to sink only on flush call (#519)
Browse files Browse the repository at this point in the history
Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Nov 30, 2022
1 parent cf2ba8e commit 574afe0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 77 deletions.
4 changes: 2 additions & 2 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it";
std::unique_ptr<::io::StringSink> sink = std::make_unique<::io::StringSink>();
int compression_mode = absl::GetFlag(FLAGS_compression_mode);
RdbSerializer serializer(sink.get(), compression_mode != 0);
RdbSerializer serializer(compression_mode != 0);

// According to Redis code we need to
// 1. Save the value itself - without the key
Expand All @@ -444,7 +444,7 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
CHECK(!ec);
ec = serializer.SaveValue(it->second);
CHECK(!ec); // make sure that fully was successful
ec = serializer.FlushMem();
ec = serializer.FlushToSink(sink.get());
CHECK(!ec); // make sure that fully was successful
std::string dump_payload(sink->str());
AppendFooter(&dump_payload); // version and crc
Expand Down
67 changes: 25 additions & 42 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) {
return 0; /* avoid warning */
}

RdbSerializer::RdbSerializer(io::Sink* s, bool do_compression)
: sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr), do_entry_level_compression_(do_compression) {
RdbSerializer::RdbSerializer(bool do_compression)
: mem_buf_{4_KB}, tmp_buf_(nullptr), do_entry_level_compression_(do_compression) {
}

RdbSerializer::~RdbSerializer() {
Expand Down Expand Up @@ -591,49 +591,37 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}

error_code RdbSerializer::SendFullSyncCut() {
error_code RdbSerializer::SendFullSyncCut(io::Sink* s) {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
return FlushMem();
return FlushToSink(s);
}

// TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
mem_buf_.Reserve(mem_buf_.InputLen() + buf.size());
IoBuf::Bytes dest = mem_buf_.AppendBuffer();
if (dest.size() >= buf.size()) {
memcpy(dest.data(), buf.data(), buf.size());
mem_buf_.CommitWrite(buf.size());
return error_code{};
}

io::Bytes ib = mem_buf_.InputBuffer();

if (ib.empty()) {
return sink_->Write(buf);
}
// else
iovec v[2] = {{.iov_base = const_cast<uint8_t*>(ib.data()), .iov_len = ib.size()},
{.iov_base = const_cast<uint8_t*>(buf.data()), .iov_len = buf.size()}};
RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v)));
mem_buf_.ConsumeInput(ib.size());

memcpy(dest.data(), buf.data(), buf.size());
mem_buf_.CommitWrite(buf.size());
return error_code{};
}

error_code RdbSerializer::FlushMem() {
error_code RdbSerializer::FlushToSink(io::Sink* s) {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return error_code{};

DVLOG(2) << "FlushMem " << sz << " bytes";
DVLOG(2) << "FlushToSink " << sz << " bytes";

// interrupt point.
RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer()));
RETURN_ON_ERR(s->Write(mem_buf_.InputBuffer()));
mem_buf_.ConsumeInput(sz);

return error_code{};
}

size_t RdbSerializer::SerializedLen() const {
return mem_buf_.InputLen();
}

error_code RdbSerializer::SaveString(string_view val) {
/* Try integer encoding */
if (val.size() <= 11) {
Expand Down Expand Up @@ -772,6 +760,9 @@ class RdbSaver::Impl {
RdbSerializer* serializer() {
return &meta_serializer_;
}
io::Sink* sink() {
return sink_;
}

void Cancel();

Expand All @@ -795,12 +786,11 @@ class RdbSaver::Impl {
RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode,
io::Sink* sink)
: sink_(sink), shard_snapshots_(producers_len),
meta_serializer_(sink, compression_mode != CompressionMode::NONE), channel_{128,
producers_len},
meta_serializer_(compression_mode != CompressionMode::NONE), channel_{128, producers_len},
compression_mode_(compression_mode) {
if (align_writes) {
aligned_buf_.emplace(kBufLen, sink);
meta_serializer_.set_sink(&aligned_buf_.value());
sink_ = &aligned_buf_.value();
}

DCHECK(producers_len > 0 || channel_.IsClosing());
Expand Down Expand Up @@ -841,11 +831,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
unsigned enclen = SerializeLen(record.db_index, buf + 1);
string_view str{(char*)buf, enclen + 1};

if (aligned_buf_) {
io_error = aligned_buf_->Write(str);
} else {
io_error = sink_->Write(io::Buffer(str));
}
io_error = sink_->Write(io::Buffer(str));
if (io_error)
break;
last_db_index = record.db_index;
Expand All @@ -854,11 +840,8 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
DVLOG(2) << "Pulled " << record.id;
channel_bytes += record.value.size();

if (aligned_buf_) {
io_error = aligned_buf_->Write(record.value);
} else {
io_error = sink_->Write(io::Buffer(record.value));
}
io_error = sink_->Write(io::Buffer(record.value));

record.value.clear();
} while (!io_error && channel.TryPop(record));
} // while (channel.pop)
Expand Down Expand Up @@ -981,10 +964,10 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
}

error_code RdbSaver::SaveBody(const Cancellation* cll, RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer()->FlushMem());
RETURN_ON_ERR(impl_->serializer()->FlushToSink(impl_->sink()));

if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut();
impl_->serializer()->SendFullSyncCut(impl_->sink());
} else {
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
error_code io_error = impl_->ConsumeChannel(cll);
Expand Down Expand Up @@ -1046,7 +1029,7 @@ error_code RdbSaver::SaveEpilog() {
absl::little_endian::Store64(buf, chksum);
RETURN_ON_ERR(ser.WriteRaw(buf));

RETURN_ON_ERR(ser.FlushMem());
RETURN_ON_ERR(ser.FlushToSink(impl_->sink()));

return impl_->Flush();
}
Expand Down
16 changes: 5 additions & 11 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,10 @@ class RdbSerializer {
// TODO: for aligned cased, it does not make sense that RdbSerializer buffers into unaligned
// mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer
// directly.
RdbSerializer(::io::Sink* s, bool do_entry_level_compression);
RdbSerializer(bool do_entry_level_compression);

~RdbSerializer();

// The ownership stays with the caller.
void set_sink(::io::Sink* s) {
sink_ = s;
}

std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
Expand All @@ -163,17 +158,18 @@ class RdbSerializer {
return SaveString(std::string_view{reinterpret_cast<const char*>(buf), len});
}

// TODO(Adi) : add flag to flush compressed blob to sink, move zstd serializer under RdbSerializer
std::error_code FlushToSink(io::Sink* s);
std::error_code SaveLen(size_t len);

std::error_code FlushMem();

// This would work for either string or an object.
// The arg pv is taken from it->second if accessing
// this by finding the key. This function is used
// for the dump command - thus it is public function
std::error_code SaveValue(const PrimeValue& pv);

std::error_code SendFullSyncCut();
std::error_code SendFullSyncCut(io::Sink* s);
size_t SerializedLen() const;

private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
Expand All @@ -189,8 +185,6 @@ class RdbSerializer {
std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg);

::io::Sink* sink_;

std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_;
Expand Down
39 changes: 17 additions & 22 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
sfile_.reset(new io::StringFile);

bool do_compression = (compression_mode_ == CompressionMode::SINGLE_ENTRY);
rdb_serializer_.reset(new RdbSerializer(sfile_.get(), do_compression));
rdb_serializer_.reset(new RdbSerializer(do_compression));

snapshot_fb_ = fiber([this, stream_journal, cll] {
SerializeEntriesFb(cll);
Expand Down Expand Up @@ -147,7 +147,7 @@ void SliceSnapshot::SerializeEntriesFb(const Cancellation* cll) {
mu_.unlock();

for (unsigned i = 10; i > 1; i--)
CHECK(!rdb_serializer_->SendFullSyncCut());
CHECK(!rdb_serializer_->SendFullSyncCut(sfile_.get()));
FlushSfile(true);

VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/"
Expand Down Expand Up @@ -184,21 +184,16 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk, co
}

bool SliceSnapshot::FlushSfile(bool force) {
if (force) {
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec);
if (sfile_->val.empty())
return false;
} else {
if (sfile_->val.size() < 4096) {
return false;
}

// Make sure we flush everything from membuffer in order to preserve the atomicity of keyvalue
// serializations.
auto ec = rdb_serializer_->FlushMem();
CHECK(!ec); // stringfile always succeeds.
if ((!force) && (rdb_serializer_->SerializedLen() < 4096)) {
return false;
}

auto ec = rdb_serializer_->FlushToSink(sfile_.get());
CHECK(!ec);

if (sfile_->val.empty())
return false;

VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes";

uint32_t record_num = num_records_in_blob_;
Expand Down Expand Up @@ -270,14 +265,14 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
io::Result<uint8_t> res = rdb_serializer_->SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms);
CHECK(res); // we write to StringFile.
} else {
io::StringFile sfile;
bool serializer_compression = (compression_mode_ != CompressionMode::NONE);
RdbSerializer tmp_serializer(&sfile, serializer_compression);
RdbSerializer tmp_serializer(serializer_compression);

io::Result<uint8_t> res = tmp_serializer.SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms);
CHECK(res); // we write to StringFile.

error_code ec = tmp_serializer.FlushMem();
io::StringFile sfile;
error_code ec = tmp_serializer.FlushToSink(&sfile);
CHECK(!ec && !sfile.val.empty());
PushFileToChannel(&sfile, entry.db_ind, 1, false);
}
Expand All @@ -300,16 +295,16 @@ unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bu
}
num_records_in_blob_ += result;
} else {
io::StringFile sfile;
bool serializer_compression = (compression_mode_ != CompressionMode::NONE);
RdbSerializer tmp_serializer(&sfile, serializer_compression);
RdbSerializer tmp_serializer(serializer_compression);

while (!it.is_done()) {
++result;
SerializeSingleEntry(db_index, it->first, it->second, &tmp_serializer);
++it;
}
error_code ec = tmp_serializer.FlushMem();
io::StringFile sfile;
error_code ec = tmp_serializer.FlushToSink(&sfile);
CHECK(!ec && !sfile.val.empty());
PushFileToChannel(&sfile, db_index, result, false);
}
Expand Down

0 comments on commit 574afe0

Please sign in to comment.