Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Switch to stable state replication #473

Merged
merged 9 commits into from
Nov 17, 2022

Conversation

dranikpg
Copy link
Contributor

@dranikpg dranikpg commented Nov 9, 2022

Migrating replication branch on main, part 6

Add basic stable state replication.

Mixed implementation. The general concept it taken from my Replication MVP.

src/redis/rdb.h Outdated Show resolved Hide resolved
Comment on lines 175 to 176
std::function<void()> fullsyncb;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should the loader be extended to react to events? Callback?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which events? what are you trying to do? (I have not reviewed the code yet).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to fullsync cut opcodes. The loader only finishes and unblocks once it has received the EOF code, but we need to record the FULLSYNC_CUT code earlier

Comment on lines -576 to +633
// pass leftover data from the loader.
io::PrefixSource chained(loader.Leftover(), &ps);
VLOG(1) << "Before reading from chained stream";
io::Result<size_t> eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()});
if (!eof_res || *eof_res != eof_token.size()) {
unique_ptr<uint8_t[]> buf{new uint8_t[eof_token.size()]};

io::Result<size_t> res =
chained_tail.ReadAtLeast(io::MutableBytes{buf.get(), eof_token.size()}, eof_token.size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You had a bug here: We need to use ReadAtLeast because the source might return less than n in a single call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure? based on the comments in io.h ReadAtLeast(buf, buf.size()) is equivalent to Read(buf).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read can return less than buf.size() in a single call (see the PrefixSource read, it returns only the prefix even if its smaller than buf.size())

src/server/replica.cc Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/rdb_extensions.h Outdated Show resolved Hide resolved
Comment on lines 1825 to 1829
auto [fit, _] = db_slice.FindExt(db_cntx, item.key);
if (IsValid(fit))
db_slice.Del(db_cntx.db_index, fit);

auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ineffective but its just a temporary stub.

The issue with AddEntry is that I give up my primevalue by moving it. In theory it should:

  • insert or fail
  • update if failed

Because looking it up separately would make the hot path a lot slower.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added AddOrUpdate to DbSlice as a possible solution

src/server/replica.cc Outdated Show resolved Hide resolved
Comment on lines 815 to 817
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED;
DCHECK(!lock_acquired); // Because CheckLock above failed.
// DCHECK(!lock_acquired); // Because CheckLock above failed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a careful check: If we failed to run quickly due to the shard being locked, then this lock acquisition doesn't fail. So I commented the DCHECK out... But does the logic change in this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's all good. you can remove this DCHECK.

@dranikpg dranikpg marked this pull request as ready for review November 14, 2022 22:18
@dranikpg
Copy link
Contributor Author

I added a line to the rdb.h file and now the ci-linter complains about formatting. Shouldn't this part of code be an exclusion?

Copy link
Collaborator

@romange romange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the commit message and the pr description accordingly.

src/server/db_slice.cc Outdated Show resolved Hide resolved
src/server/dflycmd.cc Show resolved Hide resolved
src/server/dflycmd.cc Show resolved Hide resolved
src/server/dflycmd.cc Show resolved Hide resolved
src/server/rdb_load.h Show resolved Hide resolved
VLOG(1) << "io error " << io_error;
return io_error;
if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to send it here as well? it's not for a flow channel, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The io thread runs the rdb saver in SUMMARY mode to transfer only the header and lua scripts. I just decided it'll be more consistent if all threads use this opcode without any corner cases. So I need the summary mode to write is as well.

@@ -581,6 +582,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}

error_code RdbSerializer::SendFullSyncCut() {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
return FlushMem();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need FlushMem here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I want it to be sent immediately to the replica. Can't it be stuck inside the buffer if I don't flush it? It seems like it can

Copy link
Collaborator

@romange romange Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not really matter. but you call this twice: once in summary flow - it does not matter there, and the second place calls flushmem right after as far as i remember

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I call it twice for the SUMMARY flow, right. But don't forget about the snapshot that sends a FS cut as well. It can be stuck there

Comment on lines 815 to 817
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED;
DCHECK(!lock_acquired); // Because CheckLock above failed.
// DCHECK(!lock_acquired); // Because CheckLock above failed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's all good. you can remove this DCHECK.

@dranikpg dranikpg requested a review from romange November 17, 2022 05:16
romange
romange previously approved these changes Nov 17, 2022
@@ -126,6 +126,9 @@ void SliceSnapshot::SerializeEntriesFb() {
mu_.lock();
mu_.unlock();

CHECK(!rdb_serializer_->SendFullSyncCut());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I have no error propagation... I can just remove it

@dranikpg
Copy link
Contributor Author

dranikpg commented Nov 17, 2022

Can I get a reply on this? 🤨 😅 #473 (comment)

@romange
Copy link
Collaborator

romange commented Nov 17, 2022

You've been just volunteered to check whether it's possible to exclude dirs in pre-commit checks. Инициатива наказуема!

@dranikpg dranikpg requested a review from romange November 17, 2022 16:12
@romange romange merged commit 96c9332 into dragonflydb:main Nov 17, 2022
@dranikpg dranikpg deleted the startstable branch December 25, 2022 12:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants