-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): New auto-journal types/read/write #560
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
// Copyright 2022, DragonflyDB authors. All rights reserved. | ||
// See LICENSE for licensing terms. | ||
// | ||
|
||
#include "server/journal/serializer.h" | ||
|
||
#include "base/io_buf.h" | ||
#include "base/logging.h" | ||
#include "io/io.h" | ||
#include "server/common.h" | ||
#include "server/error.h" | ||
#include "server/journal/types.h" | ||
#include "server/main_service.h" | ||
#include "server/serializer_commons.h" | ||
#include "server/transaction.h" | ||
|
||
using namespace std; | ||
|
||
namespace dfly { | ||
|
||
JournalWriter::JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid) | ||
: sink_{sink}, cur_dbid_{dbid} { | ||
} | ||
|
||
error_code JournalWriter::Write(uint64_t v) { | ||
uint8_t buf[10]; | ||
unsigned len = WritePackedUInt(v, buf); | ||
return sink_->Write(io::Bytes{buf, len}); | ||
} | ||
|
||
error_code JournalWriter::Write(std::string_view sv) { | ||
RETURN_ON_ERR(Write(sv.size())); | ||
return sink_->Write(io::Buffer(sv)); | ||
} | ||
|
||
error_code JournalWriter::Write(CmdArgList args) { | ||
RETURN_ON_ERR(Write(args.size())); | ||
for (auto v : args) | ||
RETURN_ON_ERR(Write(facade::ToSV(v))); | ||
|
||
return std::error_code{}; | ||
} | ||
|
||
error_code JournalWriter::Write(std::pair<std::string_view, ArgSlice> args) { | ||
auto [cmd, tail_args] = args; | ||
|
||
RETURN_ON_ERR(Write(1 + tail_args.size())); | ||
RETURN_ON_ERR(Write(cmd)); | ||
for (auto v : tail_args) | ||
RETURN_ON_ERR(Write(v)); | ||
|
||
return std::error_code{}; | ||
} | ||
|
||
error_code JournalWriter::Write(std::monostate) { | ||
return std::error_code{}; | ||
} | ||
|
||
error_code JournalWriter::Write(const journal::EntryNew& entry) { | ||
// Check if entry has a new db index and we need to emit a SELECT entry. | ||
if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { | ||
RETURN_ON_ERR(Write(journal::EntryNew{journal::Op::SELECT, entry.dbid})); | ||
cur_dbid_ = entry.dbid; | ||
} | ||
|
||
RETURN_ON_ERR(Write(uint8_t(entry.opcode))); | ||
|
||
switch (entry.opcode) { | ||
case journal::Op::SELECT: | ||
return Write(entry.dbid); | ||
case journal::Op::VAL: | ||
RETURN_ON_ERR(Write(entry.txid)); | ||
return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); | ||
default: | ||
break; | ||
}; | ||
return std::error_code{}; | ||
} | ||
|
||
JournalReader::JournalReader(io::Source* source, DbIndex dbid) | ||
: source_{source}, buf_{}, dbid_{dbid} { | ||
} | ||
|
||
template <typename UT> io::Result<UT> ReadPackedUIntTyped(io::Source* source) { | ||
uint64_t v; | ||
SET_OR_UNEXPECT(ReadPackedUInt(source), v); | ||
if (v > std::numeric_limits<UT>::max()) | ||
return make_unexpected(make_error_code(errc::result_out_of_range)); | ||
return static_cast<UT>(v); | ||
} | ||
|
||
io::Result<uint8_t> JournalReader::ReadU8() { | ||
return ReadPackedUIntTyped<uint8_t>(source_); | ||
} | ||
|
||
io::Result<uint16_t> JournalReader::ReadU16() { | ||
return ReadPackedUIntTyped<uint16_t>(source_); | ||
} | ||
|
||
io::Result<uint64_t> JournalReader::ReadU64() { | ||
return ReadPackedUIntTyped<uint64_t>(source_); | ||
} | ||
|
||
io::Result<size_t> JournalReader::ReadString() { | ||
size_t size = 0; | ||
SET_OR_UNEXPECT(ReadU64(), size); | ||
|
||
buf_.EnsureCapacity(size); | ||
auto dest = buf_.AppendBuffer().first(size); | ||
uint64_t read = 0; | ||
SET_OR_UNEXPECT(source_->Read(dest), read); | ||
|
||
buf_.CommitWrite(read); | ||
if (read != size) | ||
return make_unexpected(std::make_error_code(std::errc::message_size)); | ||
|
||
return size; | ||
} | ||
|
||
std::error_code JournalReader::Read(CmdArgVec* vec) { | ||
buf_.ConsumeInput(buf_.InputBuffer().size()); | ||
|
||
size_t size = 0; | ||
SET_OR_RETURN(ReadU64(), size); | ||
|
||
vec->resize(size); | ||
for (auto& span : *vec) { | ||
size_t len; | ||
SET_OR_RETURN(ReadString(), len); | ||
span = MutableSlice{nullptr, len}; | ||
} | ||
|
||
size_t offset = 0; | ||
for (auto& span : *vec) { | ||
size_t len = span.size(); | ||
auto ptr = buf_.InputBuffer().subspan(offset).data(); | ||
span = MutableSlice{reinterpret_cast<char*>(ptr), len}; | ||
offset += len; | ||
} | ||
|
||
return std::error_code{}; | ||
} | ||
|
||
io::Result<journal::ParsedEntry> JournalReader::ReadEntry() { | ||
uint8_t opcode; | ||
SET_OR_UNEXPECT(ReadU8(), opcode); | ||
|
||
journal::ParsedEntry entry{static_cast<journal::Op>(opcode), dbid_}; | ||
|
||
switch (entry.opcode) { | ||
case journal::Op::VAL: | ||
SET_OR_UNEXPECT(ReadU64(), entry.txid); | ||
entry.payload = CmdArgVec{}; | ||
if (auto ec = Read(&*entry.payload); ec) | ||
return make_unexpected(ec); | ||
break; | ||
case journal::Op::SELECT: | ||
SET_OR_UNEXPECT(ReadU16(), dbid_); | ||
return ReadEntry(); | ||
default: | ||
break; | ||
}; | ||
return entry; | ||
} | ||
|
||
} // namespace dfly |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
// Copyright 2022, DragonflyDB authors. All rights reserved. | ||
// See LICENSE for licensing terms. | ||
// | ||
|
||
#pragma once | ||
|
||
#include <optional> | ||
#include <string> | ||
|
||
#include "base/io_buf.h" | ||
#include "io/io.h" | ||
#include "server/common.h" | ||
#include "server/journal/types.h" | ||
|
||
namespace dfly { | ||
|
||
// JournalWriter serializes journal entries to a sink. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I know know we have an option to write journal to file. Is this flow going to be dropped? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what sense dropped? Its a bit more complicated. For now we can leave it as it is so the PR doesn't grow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dropped i.e we are going to remove this flow of writing to the journal to file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? The source is generic enough For now its not part of the PR. Its a split of the original, its not finished There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont follow.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// It automatically keeps track of the current database index. | ||
class JournalWriter { | ||
public: | ||
// Initialize with sink and optional start database index. If no start index is set, | ||
// a SELECT will be issued before the first entry. | ||
JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid = std::nullopt); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you use the dbid in any flow? I didnt see this flow in the bigger PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do. In which case must the caller care about the id? The should absorb all select entries |
||
|
||
// Write single entry. | ||
std::error_code Write(const journal::EntryNew& entry); | ||
|
||
private: | ||
std::error_code Write(uint64_t v); // Write packed unsigned integer. | ||
std::error_code Write(std::string_view sv); // Write string. | ||
std::error_code Write(CmdArgList args); | ||
std::error_code Write(std::pair<std::string_view, ArgSlice> args); | ||
|
||
std::error_code Write(std::monostate); // Overload for empty std::variant | ||
|
||
private: | ||
io::Sink* sink_; | ||
std::optional<DbIndex> cur_dbid_; | ||
}; | ||
|
||
// JournalReader allows deserializing journal entries from a source. | ||
// Like the writer, it automatically keeps track of the database index. | ||
struct JournalReader { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. struct -> class |
||
public: | ||
// Initialize with source and start database index. | ||
JournalReader(io::Source* source, DbIndex dbid); | ||
|
||
// Try reading entry from source. | ||
io::Result<journal::ParsedEntry> ReadEntry(); | ||
|
||
private: | ||
// TODO: Templated endian encoding to not repeat...? | ||
io::Result<uint8_t> ReadU8(); | ||
io::Result<uint16_t> ReadU16(); | ||
io::Result<uint64_t> ReadU64(); | ||
|
||
// Read string into internal buffer and return size. | ||
io::Result<size_t> ReadString(); | ||
|
||
// Read argument array into internal buffer and build slice. | ||
// TODO: Inline store span data inside buffer to avoid alloaction | ||
std::error_code Read(CmdArgVec* vec); | ||
|
||
private: | ||
io::Source* source_; | ||
base::IoBuf buf_; | ||
DbIndex dbid_; | ||
}; | ||
|
||
} // namespace dfly |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,9 @@ | |
// | ||
#pragma once | ||
|
||
#include <string> | ||
#include <variant> | ||
|
||
#include "server/common.h" | ||
#include "server/table.h" | ||
|
||
|
@@ -16,6 +19,7 @@ enum class Op : uint8_t { | |
LOCK_SHARD = 3, | ||
UNLOCK_SHARD = 4, | ||
SCHED = 5, | ||
SELECT = 6, | ||
VAL = 10, | ||
DEL, | ||
MSET, | ||
|
@@ -44,6 +48,46 @@ struct Entry { | |
uint64_t expire_ms = 0; // 0 means no expiry. | ||
}; | ||
|
||
struct EntryBase { | ||
TxId txid; | ||
Op opcode; | ||
DbIndex dbid; | ||
}; | ||
|
||
// This struct represents a single journal entry. | ||
// Those are either control instructions or commands. | ||
struct EntryNew : public EntryBase { // Called this "New" because I can't delete the old neither | ||
// replace it partially | ||
// Payload represents a non-owning view into a command executed on the shard. | ||
using Payload = | ||
std::variant<std::monostate, // No payload. | ||
CmdArgList, // Parts of a full command. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you give example when CmdArgList is set , and why only parts of the full command enter the payload There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CmdArgList is for single shard commands, the pair for a combination of command name shard args. Please look at how it's used in my original PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok I believe you can simplify this by using only std::pair<std::string_view, ArgSlice> and for single shard commands the ShardArgsInShard will return all args of the command, the command name is stored in Transaction class cid_ var I think, this way you dont need full_args_ . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DF has a weird split, where multi shard commands use shard args for all values, and single shard commands use it only for determining the shard to run on. For single shard commands it contains only the key, but not the full data. Changing this internal behavior is possible but requires revisiting transaction code and all the commands There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. If this will simplify the code you could still use full_args_ and only std::pair<std::string_view, ArgSlice> init with full_args_.front(), full_args.Span(1) . If you think this makes the flow simpler... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does, but there is currently no simple way of converting a I referenced this issue in the original PR, emphasising that the differently typed spans are not easily interchangeable. But this is a difficult question on its own because it is used all over the place |
||
std::pair<std::string_view, ArgSlice> // Command and its shard parts. | ||
>; | ||
|
||
EntryNew(TxId txid, DbIndex dbid, Payload pl) | ||
: EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} { | ||
} | ||
|
||
EntryNew(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { | ||
} | ||
|
||
Payload payload; | ||
}; | ||
|
||
struct ParsedEntry : public EntryBase { | ||
using Payload = std::optional<CmdArgVec>; | ||
|
||
ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { | ||
} | ||
|
||
ParsedEntry(TxId txid, DbIndex dbid, Payload pl) | ||
: EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} { | ||
} | ||
|
||
Payload payload; | ||
}; | ||
|
||
using ChangeCallback = std::function<void(const Entry&)>; | ||
|
||
} // namespace journal | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can change ReadString to return IoBuf::Bytes and this way you dont need the second loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need it, because I can't keep any references to it while its growing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, please consider writing the cmd length in the writer so you could use reserve on the buffer at the beginning of the loop