diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index ef4d7dd8821b..525f89d132d5 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -431,7 +431,7 @@ OpResult OpDump(const OpArgs& op_args, string_view key) { DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it"; std::unique_ptr<::io::StringSink> sink = std::make_unique<::io::StringSink>(); int compression_mode = absl::GetFlag(FLAGS_compression_mode); - RdbSerializer serializer(sink.get(), compression_mode != 0); + RdbSerializer serializer(compression_mode != 0); // According to Redis code we need to // 1. Save the value itself - without the key @@ -444,7 +444,7 @@ OpResult OpDump(const OpArgs& op_args, string_view key) { CHECK(!ec); ec = serializer.SaveValue(it->second); CHECK(!ec); // make sure that fully was successful - ec = serializer.FlushMem(); + ec = serializer.FlushToSink(sink.get()); CHECK(!ec); // make sure that fully was successful std::string dump_payload(sink->str()); AppendFooter(&dump_payload); // version and crc diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 864549720943..3b2e6a19f2d6 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -166,8 +166,8 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) { return 0; /* avoid warning */ } -RdbSerializer::RdbSerializer(io::Sink* s, bool do_compression) - : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr), do_entry_level_compression_(do_compression) { +RdbSerializer::RdbSerializer(bool do_compression) + : mem_buf_{4_KB}, tmp_buf_(nullptr), do_entry_level_compression_(do_compression) { } RdbSerializer::~RdbSerializer() { @@ -591,49 +591,37 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { return error_code{}; } -error_code RdbSerializer::SendFullSyncCut() { +error_code RdbSerializer::SendFullSyncCut(io::Sink* s) { RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); - return FlushMem(); + return FlushToSink(s); } -// TODO: if buf is large enough, it makes sense to write both mem_buf and buf -// directly to sink_. error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { + mem_buf_.Reserve(mem_buf_.InputLen() + buf.size()); IoBuf::Bytes dest = mem_buf_.AppendBuffer(); - if (dest.size() >= buf.size()) { - memcpy(dest.data(), buf.data(), buf.size()); - mem_buf_.CommitWrite(buf.size()); - return error_code{}; - } - - io::Bytes ib = mem_buf_.InputBuffer(); - - if (ib.empty()) { - return sink_->Write(buf); - } - // else - iovec v[2] = {{.iov_base = const_cast(ib.data()), .iov_len = ib.size()}, - {.iov_base = const_cast(buf.data()), .iov_len = buf.size()}}; - RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v))); - mem_buf_.ConsumeInput(ib.size()); - + memcpy(dest.data(), buf.data(), buf.size()); + mem_buf_.CommitWrite(buf.size()); return error_code{}; } -error_code RdbSerializer::FlushMem() { +error_code RdbSerializer::FlushToSink(io::Sink* s) { size_t sz = mem_buf_.InputLen(); if (sz == 0) return error_code{}; - DVLOG(2) << "FlushMem " << sz << " bytes"; + DVLOG(2) << "FlushToSink " << sz << " bytes"; // interrupt point. - RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer())); + RETURN_ON_ERR(s->Write(mem_buf_.InputBuffer())); mem_buf_.ConsumeInput(sz); return error_code{}; } +size_t RdbSerializer::SerializedLen() const { + return mem_buf_.InputLen(); +} + error_code RdbSerializer::SaveString(string_view val) { /* Try integer encoding */ if (val.size() <= 11) { @@ -772,6 +760,9 @@ class RdbSaver::Impl { RdbSerializer* serializer() { return &meta_serializer_; } + io::Sink* sink() { + return sink_; + } void Cancel(); @@ -795,12 +786,11 @@ class RdbSaver::Impl { RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, io::Sink* sink) : sink_(sink), shard_snapshots_(producers_len), - meta_serializer_(sink, compression_mode != CompressionMode::NONE), channel_{128, - producers_len}, + meta_serializer_(compression_mode != CompressionMode::NONE), channel_{128, producers_len}, compression_mode_(compression_mode) { if (align_writes) { aligned_buf_.emplace(kBufLen, sink); - meta_serializer_.set_sink(&aligned_buf_.value()); + sink_ = &aligned_buf_.value(); } DCHECK(producers_len > 0 || channel_.IsClosing()); @@ -841,11 +831,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { unsigned enclen = SerializeLen(record.db_index, buf + 1); string_view str{(char*)buf, enclen + 1}; - if (aligned_buf_) { - io_error = aligned_buf_->Write(str); - } else { - io_error = sink_->Write(io::Buffer(str)); - } + io_error = sink_->Write(io::Buffer(str)); if (io_error) break; last_db_index = record.db_index; @@ -854,11 +840,8 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { DVLOG(2) << "Pulled " << record.id; channel_bytes += record.value.size(); - if (aligned_buf_) { - io_error = aligned_buf_->Write(record.value); - } else { - io_error = sink_->Write(io::Buffer(record.value)); - } + io_error = sink_->Write(io::Buffer(record.value)); + record.value.clear(); } while (!io_error && channel.TryPop(record)); } // while (channel.pop) @@ -981,10 +964,10 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) { } error_code RdbSaver::SaveBody(const Cancellation* cll, RdbTypeFreqMap* freq_map) { - RETURN_ON_ERR(impl_->serializer()->FlushMem()); + RETURN_ON_ERR(impl_->serializer()->FlushToSink(impl_->sink())); if (save_mode_ == SaveMode::SUMMARY) { - impl_->serializer()->SendFullSyncCut(); + impl_->serializer()->SendFullSyncCut(impl_->sink()); } else { VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); error_code io_error = impl_->ConsumeChannel(cll); @@ -1046,7 +1029,7 @@ error_code RdbSaver::SaveEpilog() { absl::little_endian::Store64(buf, chksum); RETURN_ON_ERR(ser.WriteRaw(buf)); - RETURN_ON_ERR(ser.FlushMem()); + RETURN_ON_ERR(ser.FlushToSink(impl_->sink())); return impl_->Flush(); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index f54420660f10..7d6ecbba660d 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -137,15 +137,10 @@ class RdbSerializer { // TODO: for aligned cased, it does not make sense that RdbSerializer buffers into unaligned // mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer // directly. - RdbSerializer(::io::Sink* s, bool do_entry_level_compression); + RdbSerializer(bool do_entry_level_compression); ~RdbSerializer(); - // The ownership stays with the caller. - void set_sink(::io::Sink* s) { - sink_ = s; - } - std::error_code WriteOpcode(uint8_t opcode) { return WriteRaw(::io::Bytes{&opcode, 1}); } @@ -163,17 +158,18 @@ class RdbSerializer { return SaveString(std::string_view{reinterpret_cast(buf), len}); } + // TODO(Adi) : add flag to flush compressed blob to sink, move zstd serializer under RdbSerializer + std::error_code FlushToSink(io::Sink* s); std::error_code SaveLen(size_t len); - std::error_code FlushMem(); - // This would work for either string or an object. // The arg pv is taken from it->second if accessing // this by finding the key. This function is used // for the dump command - thus it is public function std::error_code SaveValue(const PrimeValue& pv); - std::error_code SendFullSyncCut(); + std::error_code SendFullSyncCut(io::Sink* s); + size_t SerializedLen() const; private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); @@ -189,8 +185,6 @@ class RdbSerializer { std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamConsumers(streamCG* cg); - ::io::Sink* sink_; - std::unique_ptr lzf_; base::IoBuf mem_buf_; base::PODArray tmp_buf_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index f6f58721c1b2..4f65916a2d24 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -56,7 +56,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { sfile_.reset(new io::StringFile); bool do_compression = (compression_mode_ == CompressionMode::SINGLE_ENTRY); - rdb_serializer_.reset(new RdbSerializer(sfile_.get(), do_compression)); + rdb_serializer_.reset(new RdbSerializer(do_compression)); snapshot_fb_ = fiber([this, stream_journal, cll] { SerializeEntriesFb(cll); @@ -147,7 +147,7 @@ void SliceSnapshot::SerializeEntriesFb(const Cancellation* cll) { mu_.unlock(); for (unsigned i = 10; i > 1; i--) - CHECK(!rdb_serializer_->SendFullSyncCut()); + CHECK(!rdb_serializer_->SendFullSyncCut(sfile_.get())); FlushSfile(true); VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/" @@ -184,21 +184,16 @@ void SliceSnapshot::SerializeSingleEntry(DbIndex db_indx, const PrimeKey& pk, co } bool SliceSnapshot::FlushSfile(bool force) { - if (force) { - auto ec = rdb_serializer_->FlushMem(); - CHECK(!ec); - if (sfile_->val.empty()) - return false; - } else { - if (sfile_->val.size() < 4096) { - return false; - } - - // Make sure we flush everything from membuffer in order to preserve the atomicity of keyvalue - // serializations. - auto ec = rdb_serializer_->FlushMem(); - CHECK(!ec); // stringfile always succeeds. + if ((!force) && (rdb_serializer_->SerializedLen() < 4096)) { + return false; } + + auto ec = rdb_serializer_->FlushToSink(sfile_.get()); + CHECK(!ec); + + if (sfile_->val.empty()) + return false; + VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes"; uint32_t record_num = num_records_in_blob_; @@ -270,14 +265,14 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { io::Result res = rdb_serializer_->SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms); CHECK(res); // we write to StringFile. } else { - io::StringFile sfile; bool serializer_compression = (compression_mode_ != CompressionMode::NONE); - RdbSerializer tmp_serializer(&sfile, serializer_compression); + RdbSerializer tmp_serializer(serializer_compression); io::Result res = tmp_serializer.SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms); CHECK(res); // we write to StringFile. - error_code ec = tmp_serializer.FlushMem(); + io::StringFile sfile; + error_code ec = tmp_serializer.FlushToSink(&sfile); CHECK(!ec && !sfile.val.empty()); PushFileToChannel(&sfile, entry.db_ind, 1, false); } @@ -300,16 +295,16 @@ unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bu } num_records_in_blob_ += result; } else { - io::StringFile sfile; bool serializer_compression = (compression_mode_ != CompressionMode::NONE); - RdbSerializer tmp_serializer(&sfile, serializer_compression); + RdbSerializer tmp_serializer(serializer_compression); while (!it.is_done()) { ++result; SerializeSingleEntry(db_index, it->first, it->second, &tmp_serializer); ++it; } - error_code ec = tmp_serializer.FlushMem(); + io::StringFile sfile; + error_code ec = tmp_serializer.FlushToSink(&sfile); CHECK(!ec && !sfile.val.empty()); PushFileToChannel(&sfile, db_index, result, false); }