Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(snapshot) : Do not preempt inside OnDbChange issue #829 #868

Merged
merged 3 commits into from
Feb 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ JournalReader::JournalReader(io::Source* source, DbIndex dbid)
: source_{source}, buf_{4_KB}, dbid_{dbid} {
}

void JournalReader::SetDb(DbIndex dbid) {
dbid_ = dbid;
}

void JournalReader::SetSource(io::Source* source) {
CHECK_EQ(buf_.InputLen(), 0ULL);
source_ = source;
Expand Down
3 changes: 0 additions & 3 deletions src/server/journal/serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ struct JournalReader {
// Initialize start database index.
JournalReader(io::Source* source, DbIndex dbid);

// Overwrite current db index.
void SetDb(DbIndex dbid);

// Overwrite current source and ensure there is no leftover from previous.
void SetSource(io::Source* source);

Expand Down
5 changes: 2 additions & 3 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ error_code RdbLoader::Load(io::Source* src) {
for (unsigned i = 0; i < shard_set->size(); ++i) {
FlushShardAsync(i);
}
RETURN_ON_ERR(HandleJournalBlob(service_, cur_db_index_));
RETURN_ON_ERR(HandleJournalBlob(service_));
continue;
}

Expand Down Expand Up @@ -1961,7 +1961,7 @@ error_code RdbLoaderBase::HandleCompressedBlobFinish() {
return kOk;
}

error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
// Read the number of entries in the journal blob.
size_t num_entries;
bool _encoded;
Expand All @@ -1972,7 +1972,6 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
SET_OR_RETURN(FetchGenericString(), journal_blob);

io::BytesSource bs{io::Buffer(journal_blob)};
journal_reader_.SetDb(dbid);
journal_reader_.SetSource(&bs);

// Parse and exectue in loop.
Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class RdbLoaderBase {
std::error_code HandleCompressedBlobFinish();
void AllocateDecompressOnce(int op_type);

std::error_code HandleJournalBlob(Service* service, DbIndex dbid);
std::error_code HandleJournalBlob(Service* service);

static size_t StrLen(const RdbVariant& tset);

Expand Down
35 changes: 14 additions & 21 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) {
}

error_code RdbSerializer::SelectDb(uint32_t dbid) {
if (dbid == last_entry_db_index_) {
return error_code{};
}
last_entry_db_index_ = dbid;
uint8_t buf[16];
buf[0] = RDB_OPCODE_SELECTDB;
unsigned enclen = WritePackedUInt(dbid, io::MutableBytes{buf}.subspan(1));
Expand All @@ -263,7 +267,8 @@ error_code RdbSerializer::SelectDb(uint32_t dbid) {

// Called by snapshot
io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv,
uint64_t expire_ms) {
uint64_t expire_ms, DbIndex dbid) {
SelectDb(dbid);
uint8_t buf[16];
error_code ec;
/* Save the expire time */
Expand Down Expand Up @@ -681,6 +686,11 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
// interrupt point.
RETURN_ON_ERR(s->Write(bytes));
mem_buf_.ConsumeInput(bytes.size());
// After every flush we should write the DB index again because the blobs in the channel are
// interleaved and multiple savers can correspond to a single writer (in case of single file rdb
// snapshot)
last_entry_db_index_ = kInvalidDbId;

return error_code{};
}

Expand All @@ -701,17 +711,14 @@ io::Bytes RdbSerializer::PrepareFlush() {
return mem_buf_.InputBuffer();
}

error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) {
error_code RdbSerializer::WriteJournalEntry(const journal::Entry& entry) {
io::BufSink buf_sink{&journal_mem_buf_};
JournalWriter writer{&buf_sink};
for (const auto& entry : entries) {
writer.Write(entry);
}
writer.Write(entry);

RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB));
RETURN_ON_ERR(SaveLen(entries.size()));
RETURN_ON_ERR(SaveLen(1));
RETURN_ON_ERR(SaveString(io::View(journal_mem_buf_.InputBuffer())));

journal_mem_buf_.Clear();
return error_code{};
}
Expand Down Expand Up @@ -903,12 +910,8 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val)
error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
error_code io_error;

uint8_t buf[16];
size_t channel_bytes = 0;
SliceSnapshot::DbRecord record;
DbIndex last_db_index = kInvalidDbId;

buf[0] = RDB_OPCODE_SELECTDB;

// we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error.
Expand All @@ -922,16 +925,6 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
if (cll->IsCancelled())
continue;

if (record.db_index != last_db_index) {
unsigned enclen = WritePackedUInt(record.db_index, io::MutableBytes{buf}.subspan(1));
string_view str{(char*)buf, enclen + 1};

io_error = sink_->Write(io::Buffer(str));
if (io_error)
break;
last_db_index = record.db_index;
}

DVLOG(2) << "Pulled " << record.id;
channel_bytes += record.value.size();

Expand Down
8 changes: 5 additions & 3 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class RdbSerializer {
// Must be called in the thread to which `it` belongs.
// Returns the serialized rdb_type or the error.
// expire_ms = 0 means no expiry.
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
DbIndex dbid);

std::error_code SaveLen(size_t len);
std::error_code SaveString(std::string_view val);
Expand All @@ -148,8 +149,8 @@ class RdbSerializer {
return WriteRaw(::io::Bytes{&opcode, 1});
}

// Write journal entries as an embedded journal blob.
std::error_code WriteJournalEntries(absl::Span<const journal::Entry> entries);
// Write journal entry as an embedded journal blob.
std::error_code WriteJournalEntry(const journal::Entry& entry);

// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut();
Expand Down Expand Up @@ -180,6 +181,7 @@ class RdbSerializer {
base::IoBuf journal_mem_buf_;
std::string tmp_str_;
base::PODArray<uint8_t> tmp_buf_;
DbIndex last_entry_db_index_ = kInvalidDbId;

std::unique_ptr<LZF_HSLOT[]> lzf_;

Expand Down
82 changes: 23 additions & 59 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
journal_cb_id_ = journal->RegisterOnChange(move(journal_cb));
}

default_serializer_.reset(new RdbSerializer(compression_mode_));
serializer_.reset(new RdbSerializer(compression_mode_));

VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;

Expand All @@ -74,7 +74,7 @@ void SliceSnapshot::Stop() {
db_slice_->shard_owner()->journal()->UnregisterOnChange(journal_cb_id_);
}

FlushDefaultBuffer(true);
PushSerializedToChannel(true);
CloseRecordChannel();
}

Expand Down Expand Up @@ -135,22 +135,22 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
PrimeTable::Cursor next =
pt->Traverse(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
cursor = next;
FlushDefaultBuffer(false);
PushSerializedToChannel(false);

if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
fibers_ext::Yield();
DVLOG(2) << "After sleep";

last_yield = stats_.loop_serialized;
// flush in case other fibers (writes commands that pushed previous values)
// Push in case other fibers (writes commands that pushed previous values)
// filled the buffer.
FlushDefaultBuffer(false);
PushSerializedToChannel(false);
}
} while (cursor);

DVLOG(2) << "after loop " << this_fiber::properties<FiberProps>().name();
FlushDefaultBuffer(true);
PushSerializedToChannel(true);
} // for (dbindex)

// Wait for SerializePhysicalBucket to finish.
Expand All @@ -159,8 +159,8 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {

// TODO: investigate why a single byte gets stuck and does not arrive to replica
for (unsigned i = 10; i > 1; i--)
CHECK(!default_serializer_->SendFullSyncCut());
FlushDefaultBuffer(true);
CHECK(!serializer_->SendFullSyncCut());
PushSerializedToChannel(true);
Comment on lines 160 to +163
Copy link
Contributor

@dranikpg dranikpg Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that reminded me we still have this issue 😅

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG, maybe we do not? I suggest removing this loop and see if the issue remains.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have bug here


// serialized + side_saved must be equal to the total saved.
VLOG(1) << "Exit SnapshotSerializer (loop_serialized/side_saved/cbcalls): "
Expand Down Expand Up @@ -192,33 +192,16 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
unsigned result = 0;

lock_guard lk(mu_);

optional<RdbSerializer> tmp_serializer;
RdbSerializer* serializer_ptr = default_serializer_.get();
if (db_index != current_db_) {
CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE
? CompressionMode::NONE
: CompressionMode::SINGLE_ENTRY;
tmp_serializer.emplace(compression_mode);
serializer_ptr = &*tmp_serializer;
}

while (!it.is_done()) {
++result;
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_ptr);
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}

if (tmp_serializer) {
PushBytesToChannel(db_index, &*tmp_serializer);
VLOG(1) << "Pushed " << result << " entries via tmp_serializer";
}

return result;
}

// This function should not block and should not preempt because it's called
// from SerializePhysicalBucket which should execute atomically.
// from SerializeBucket which should execute atomically.
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
optional<uint64_t> expire, RdbSerializer* serializer) {
time_t expire_time = expire.value_or(0);
Expand All @@ -227,35 +210,30 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
expire_time = db_slice_->ExpireTime(eit);
}

io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time);
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx);
CHECK(res);
++type_freq_map_[*res];
}

size_t SliceSnapshot::PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer) {
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;
bool SliceSnapshot::PushSerializedToChannel(bool force) {
if (!force && serializer_->SerializedLen() < 4096)
return false;

io::StringFile sfile;
serializer->FlushToSink(&sfile);
serializer_->FlushToSink(&sfile);

size_t serialized = sfile.val.size();
if (serialized == 0)
return 0;
stats_.channel_bytes += serialized;

DbRecord db_rec{.db_index = db_index, .id = id, .value = std::move(sfile.val)};
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};

dest_->Push(std::move(db_rec));
return serialized;
}

bool SliceSnapshot::FlushDefaultBuffer(bool force) {
if (!force && default_serializer_->SerializedLen() < 4096)
return false;

size_t written = PushBytesToChannel(current_db_, default_serializer_.get());
VLOG(2) << "FlushDefaultBuffer " << written << " bytes";
VLOG(2) << "PushSerializedToChannel " << serialized << " bytes";
return true;
}

Expand Down Expand Up @@ -288,25 +266,11 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_awai
return;
}

optional<RdbSerializer> tmp_serializer;
RdbSerializer* serializer_ptr = default_serializer_.get();
if (entry.dbid != current_db_) {
CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE
? CompressionMode::NONE
: CompressionMode::SINGLE_ENTRY;
tmp_serializer.emplace(compression_mode);
serializer_ptr = &*tmp_serializer;
}

serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1});
serializer_->WriteJournalEntry(entry);

if (tmp_serializer) {
PushBytesToChannel(entry.dbid, &*tmp_serializer);
} else {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
FlushDefaultBuffer(false);
}
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
PushSerializedToChannel(false);
}

void SliceSnapshot::CloseRecordChannel() {
Expand Down
20 changes: 5 additions & 15 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ class RdbSerializer;
// over the channel until explicitly stopped.
class SliceSnapshot {
public:
// Each dbrecord should belong to exactly one db.
// RdbSaver adds "select" opcodes when necessary in order to maintain consistency.
struct DbRecord {
DbIndex db_index;
uint64_t id;
std::string value;
};
Expand Down Expand Up @@ -95,9 +92,6 @@ class SliceSnapshot {
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
std::optional<uint64_t> expire, RdbSerializer* serializer);

// Push rdb serializer's internal buffer to channel. Return now many bytes were written.
size_t PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer);

// DbChange listener
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);

Expand All @@ -107,20 +101,16 @@ class SliceSnapshot {
// Close dest channel if not closed yet.
void CloseRecordChannel();

// Call PushFileToChannel on default buffer if needed.
// Flush regradless of size if force is true.
// Return if flushed.
bool FlushDefaultBuffer(bool force);
// Push serializer's internal buffer to channel.
// Push regardless of buffer size if force is true.
// Return if pushed.
bool PushSerializedToChannel(bool force);

public:
uint64_t snapshot_version() const {
return snapshot_version_;
}

RdbSerializer* serializer() {
return default_serializer_.get();
}

size_t channel_bytes() const {
return stats_.channel_bytes;
}
Expand All @@ -138,7 +128,7 @@ class SliceSnapshot {

DbIndex current_db_;

std::unique_ptr<RdbSerializer> default_serializer_;
std::unique_ptr<RdbSerializer> serializer_;

::boost::fibers::mutex mu_;
::boost::fibers::fiber snapshot_fb_; // IterateEntriesFb
Expand Down