Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): New auto-journal types/read/write #560

Merged
merged 3 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@

#pragma once

#include <absl/types/span.h>
#include <absl/container/flat_hash_map.h>
#include <absl/types/span.h>

#include <string>

namespace facade {

enum class Protocol : uint8_t {
MEMCACHE = 1,
REDIS = 2
};
enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };

using MutableSlice = absl::Span<char>;
using CmdArgList = absl::Span<MutableSlice>;
using CmdArgVec = std::vector<MutableSlice>;

inline std::string_view ToSV(MutableSlice slice) {
return std::string_view{slice.data(), slice.size()};
}

struct CmdArgListFormatter {
void operator()(std::string* out, MutableSlice arg) const {
out->append(absl::StrCat("`", std::string_view(arg.data(), arg.size()), "`"));
Expand Down Expand Up @@ -66,7 +69,6 @@ constexpr inline unsigned long long operator""_KB(unsigned long long x) {

} // namespace facade


namespace std {
ostream& operator<<(ostream& os, facade::CmdArgList args);

Expand Down
6 changes: 4 additions & 2 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc)
zset_family.cc version.cc bitops_family.cc container_utils.cc
serializer_commons.cc journal/serializer.cc)

cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
absl::random_random TRDP::jsoncons zstd TRDP::lz4)
Expand All @@ -40,9 +41,10 @@ cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
cxx_test(snapshot_test dragonfly_lib LABELS DFLY)
cxx_test(json_family_test dfly_test_lib LABELS DFLY)
cxx_test(journal_test dfly_test_lib LABELS DFLY)


add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
generic_family_test memcache_parser_test rdb_test
generic_family_test memcache_parser_test rdb_test journal_test
redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test)
166 changes: 166 additions & 0 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/journal/serializer.h"

#include "base/io_buf.h"
#include "base/logging.h"
#include "io/io.h"
#include "server/common.h"
#include "server/error.h"
#include "server/journal/types.h"
#include "server/main_service.h"
#include "server/serializer_commons.h"
#include "server/transaction.h"

using namespace std;

namespace dfly {

JournalWriter::JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid)
: sink_{sink}, cur_dbid_{dbid} {
}

error_code JournalWriter::Write(uint64_t v) {
uint8_t buf[10];
unsigned len = WritePackedUInt(v, buf);
return sink_->Write(io::Bytes{buf, len});
}

error_code JournalWriter::Write(std::string_view sv) {
RETURN_ON_ERR(Write(sv.size()));
return sink_->Write(io::Buffer(sv));
}

error_code JournalWriter::Write(CmdArgList args) {
RETURN_ON_ERR(Write(args.size()));
for (auto v : args)
RETURN_ON_ERR(Write(facade::ToSV(v)));

return std::error_code{};
}

error_code JournalWriter::Write(std::pair<std::string_view, ArgSlice> args) {
auto [cmd, tail_args] = args;

RETURN_ON_ERR(Write(1 + tail_args.size()));
RETURN_ON_ERR(Write(cmd));
for (auto v : tail_args)
RETURN_ON_ERR(Write(v));

return std::error_code{};
}

error_code JournalWriter::Write(std::monostate) {
return std::error_code{};
}

error_code JournalWriter::Write(const journal::EntryNew& entry) {
// Check if entry has a new db index and we need to emit a SELECT entry.
if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) {
RETURN_ON_ERR(Write(journal::EntryNew{journal::Op::SELECT, entry.dbid}));
cur_dbid_ = entry.dbid;
}

RETURN_ON_ERR(Write(uint8_t(entry.opcode)));

switch (entry.opcode) {
case journal::Op::SELECT:
return Write(entry.dbid);
case journal::Op::VAL:
RETURN_ON_ERR(Write(entry.txid));
return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload);
default:
break;
};
return std::error_code{};
}

JournalReader::JournalReader(io::Source* source, DbIndex dbid)
: source_{source}, buf_{}, dbid_{dbid} {
}

template <typename UT> io::Result<UT> ReadPackedUIntTyped(io::Source* source) {
uint64_t v;
SET_OR_UNEXPECT(ReadPackedUInt(source), v);
if (v > std::numeric_limits<UT>::max())
return make_unexpected(make_error_code(errc::result_out_of_range));
return static_cast<UT>(v);
}

io::Result<uint8_t> JournalReader::ReadU8() {
return ReadPackedUIntTyped<uint8_t>(source_);
}

io::Result<uint16_t> JournalReader::ReadU16() {
return ReadPackedUIntTyped<uint16_t>(source_);
}

io::Result<uint64_t> JournalReader::ReadU64() {
return ReadPackedUIntTyped<uint64_t>(source_);
}

io::Result<size_t> JournalReader::ReadString() {
size_t size = 0;
SET_OR_UNEXPECT(ReadU64(), size);

buf_.EnsureCapacity(size);
auto dest = buf_.AppendBuffer().first(size);
uint64_t read = 0;
SET_OR_UNEXPECT(source_->Read(dest), read);

buf_.CommitWrite(read);
if (read != size)
return make_unexpected(std::make_error_code(std::errc::message_size));

return size;
}

std::error_code JournalReader::Read(CmdArgVec* vec) {
buf_.ConsumeInput(buf_.InputBuffer().size());

size_t size = 0;
SET_OR_RETURN(ReadU64(), size);

vec->resize(size);
for (auto& span : *vec) {
size_t len;
SET_OR_RETURN(ReadString(), len);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can change ReadString to return IoBuf::Bytes and this way you dont need the second loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need it, because I can't keep any references to it while its growing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, please consider writing the cmd length in the writer so you could use reserve on the buffer at the beginning of the loop

span = MutableSlice{nullptr, len};
}

size_t offset = 0;
for (auto& span : *vec) {
size_t len = span.size();
auto ptr = buf_.InputBuffer().subspan(offset).data();
span = MutableSlice{reinterpret_cast<char*>(ptr), len};
offset += len;
}

return std::error_code{};
}

io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
uint8_t opcode;
SET_OR_UNEXPECT(ReadU8(), opcode);

journal::ParsedEntry entry{static_cast<journal::Op>(opcode), dbid_};

switch (entry.opcode) {
case journal::Op::VAL:
SET_OR_UNEXPECT(ReadU64(), entry.txid);
entry.payload = CmdArgVec{};
if (auto ec = Read(&*entry.payload); ec)
return make_unexpected(ec);
break;
case journal::Op::SELECT:
SET_OR_UNEXPECT(ReadU16(), dbid_);
return ReadEntry();
default:
break;
};
return entry;
}

} // namespace dfly
70 changes: 70 additions & 0 deletions src/server/journal/serializer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <optional>
#include <string>

#include "base/io_buf.h"
#include "io/io.h"
#include "server/common.h"
#include "server/journal/types.h"

namespace dfly {

// JournalWriter serializes journal entries to a sink.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know know we have an option to write journal to file. Is this flow going to be dropped?
If not and you will use the JournalWriter, it is writing directly to sink without any buffering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what sense dropped?

Its a bit more complicated. For now we can leave it as it is so the PR doesn't grow

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropped i.e we are going to remove this flow of writing to the journal to file.
If you want to write the buffering code later maybe add TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? The source is generic enough

For now its not part of the PR. Its a split of the original, its not finished

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont follow..
I dont see in the original PR buffering in the JournalWriter class
If you remember in the RDB serializer we had some buffering code which I removed cause it was redundant as we use it with StringSink and buffering is not needed
If this class is used with a sink which is a file (not a socket as in the replica flow) you will have multiple writes of 1/2/8 bytes and in this case we will need buffering in the JournalWriter or JournalWriter will write to a StringSink and there should be some wrapping code to write to the journal file.
Anyway I can wait to the next PRs and see how you use JournalWriter with File Sink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • There is no journal file yet, I didn't cover this part at all
  • In my original PR we write to a string sink in the rdb saver
  • In stable state we write directly to the sink simply because I didn't yet develop a mechanism for doing it efficiently. Once we have it working and have full test coverage, we can start benchmarking it

// It automatically keeps track of the current database index.
class JournalWriter {
public:
// Initialize with sink and optional start database index. If no start index is set,
// a SELECT will be issued before the first entry.
JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid = std::nullopt);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you use the dbid in any flow? I didnt see this flow in the bigger PR.
If you do initialize with db id than caller mast make sure to call the select this is a bit confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do. In which case must the caller care about the id? The should absorb all select entries


// Write single entry.
std::error_code Write(const journal::EntryNew& entry);

private:
std::error_code Write(uint64_t v); // Write packed unsigned integer.
std::error_code Write(std::string_view sv); // Write string.
std::error_code Write(CmdArgList args);
std::error_code Write(std::pair<std::string_view, ArgSlice> args);

std::error_code Write(std::monostate); // Overload for empty std::variant

private:
io::Sink* sink_;
std::optional<DbIndex> cur_dbid_;
};

// JournalReader allows deserializing journal entries from a source.
// Like the writer, it automatically keeps track of the database index.
struct JournalReader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct -> class

public:
// Initialize with source and start database index.
JournalReader(io::Source* source, DbIndex dbid);

// Try reading entry from source.
io::Result<journal::ParsedEntry> ReadEntry();

private:
// TODO: Templated endian encoding to not repeat...?
io::Result<uint8_t> ReadU8();
io::Result<uint16_t> ReadU16();
io::Result<uint64_t> ReadU64();

// Read string into internal buffer and return size.
io::Result<size_t> ReadString();

// Read argument array into internal buffer and build slice.
// TODO: Inline store span data inside buffer to avoid alloaction
std::error_code Read(CmdArgVec* vec);

private:
io::Source* source_;
base::IoBuf buf_;
DbIndex dbid_;
};

} // namespace dfly
44 changes: 44 additions & 0 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//
#pragma once

#include <string>
#include <variant>

#include "server/common.h"
#include "server/table.h"

Expand All @@ -16,6 +19,7 @@ enum class Op : uint8_t {
LOCK_SHARD = 3,
UNLOCK_SHARD = 4,
SCHED = 5,
SELECT = 6,
VAL = 10,
DEL,
MSET,
Expand Down Expand Up @@ -44,6 +48,46 @@ struct Entry {
uint64_t expire_ms = 0; // 0 means no expiry.
};

struct EntryBase {
TxId txid;
Op opcode;
DbIndex dbid;
};

// This struct represents a single journal entry.
// Those are either control instructions or commands.
struct EntryNew : public EntryBase { // Called this "New" because I can't delete the old neither
// replace it partially
// Payload represents a non-owning view into a command executed on the shard.
using Payload =
std::variant<std::monostate, // No payload.
CmdArgList, // Parts of a full command.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give example when CmdArgList is set , and why only parts of the full command enter the payload
and when the std::pair<std::string_view, ArgSlice> is set
Why do we need this 2 options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CmdArgList is for single shard commands, the pair for a combination of command name shard args.

Please look at how it's used in my original PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I believe you can simplify this by using only std::pair<std::string_view, ArgSlice> and for single shard commands the ShardArgsInShard will return all args of the command, the command name is stored in Transaction class cid_ var I think, this way you dont need full_args_ .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF has a weird split, where multi shard commands use shard args for all values, and single shard commands use it only for determining the shard to run on.

For single shard commands it contains only the key, but not the full data. Changing this internal behavior is possible but requires revisiting transaction code and all the commands

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. If this will simplify the code you could still use full_args_ and only std::pair<std::string_view, ArgSlice> init with full_args_.front(), full_args.Span(1) . If you think this makes the flow simpler...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, but there is currently no simple way of converting a Span<Span<char>> (aka a CmdArgList) into a Span<string_view> (aka an ArgSlice) without allocating the backing container for storing the string views.

I referenced this issue in the original PR, emphasising that the differently typed spans are not easily interchangeable. But this is a difficult question on its own because it is used all over the place

std::pair<std::string_view, ArgSlice> // Command and its shard parts.
>;

EntryNew(TxId txid, DbIndex dbid, Payload pl)
: EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} {
}

EntryNew(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} {
}

Payload payload;
};

struct ParsedEntry : public EntryBase {
using Payload = std::optional<CmdArgVec>;

ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} {
}

ParsedEntry(TxId txid, DbIndex dbid, Payload pl)
: EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} {
}

Payload payload;
};

using ChangeCallback = std::function<void(const Entry&)>;

} // namespace journal
Expand Down
Loading