Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Json rdb support #674

Merged
merged 1 commit into from
Jan 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/redis/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */

/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
#define __rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

/* Range 200-240 is used by Dragonfly specific opcodes */

Expand Down
1 change: 1 addition & 0 deletions src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum errc {
invalid_encoding = 9,
empty_key = 10,
out_of_memory = 11,
bad_json_string = 12,
};

} // namespace rdb
Expand Down
3 changes: 2 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/rdb_extensions.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
#include "server/transaction.h"
Expand Down Expand Up @@ -134,7 +135,7 @@ class RdbRestoreValue : protected RdbLoaderBase {
std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view payload) {
InMemSource source(payload);
src_ = &source;
if (io::Result<uint8_t> type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) {
if (io::Result<uint8_t> type_id = FetchType(); type_id && rdbIsObjectTypeDF(type_id.value())) {
OpaqueObj obj;
error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream
if (ec) {
Expand Down
15 changes: 14 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,26 @@

#pragma once

// Range 200-240 is used by DF extensions.
extern "C" {
#include "redis/rdb.h"
}

// Custom types: Range 20-25 is used by DF RDB types.
const uint8_t RDB_TYPE_JSON = 20;

constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON);
}

// Opcodes: Range 200-240 is used by DF extensions.

// This opcode is sent by the master Dragonfly instance to a replica
// to notify that it finished streaming static data and is ready
// to switch to the stable state replication phase.
const uint8_t RDB_OPCODE_FULLSYNC_END = 200;

const uint8_t RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START = 201;
const uint8_t RDB_OPCODE_COMPRESSED_LZ4_BLOB_START = 202;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_END = 203;

const uint8_t RDB_OPCODE_JOURNAL_BLOB = 210;
24 changes: 22 additions & 2 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ extern "C" {
#include <lz4frame.h>
#include <zstd.h>

#include <jsoncons/json.hpp>

#include "base/endian.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/engine_shard_set.h"
Expand Down Expand Up @@ -353,9 +356,9 @@ class RdbLoaderBase::OpaqueObjLoader {
}

void operator()(const base::PODArray<char>& str);

void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const JsonType& jt);

std::error_code ec() const {
return ec_;
Expand Down Expand Up @@ -426,6 +429,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
}
}

void RdbLoaderBase::OpaqueObjLoader::operator()(const JsonType& json) {
pv_->SetJson(JsonType{json});
}

void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->arr.size();

Expand Down Expand Up @@ -1209,6 +1216,8 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
case RDB_TYPE_STREAM_LISTPACKS:
iores = ReadStreams();
break;
case RDB_TYPE_JSON:
iores = ReadJson();
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
Expand Down Expand Up @@ -1597,6 +1606,17 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
}

auto RdbLoaderBase::ReadJson() -> io::Result<OpaqueObj> {
string json_str;
SET_OR_UNEXPECT(FetchGenericString(), json_str);

auto json = JsonFromString(json_str);
if (!json)
return Unexpected(errc::bad_json_string);

return OpaqueObj{std::move(*json), RDB_TYPE_JSON};
}

template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
auto ec = EnsureRead(sizeof(T));
if (ec)
Expand Down Expand Up @@ -1804,7 +1824,7 @@ error_code RdbLoader::Load(io::Source* src) {
continue;
}

if (!rdbIsObjectType(type)) {
if (!rdbIsObjectTypeDF(type)) {
return RdbError(errc::invalid_rdb_type);
}

Expand Down
7 changes: 5 additions & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <boost/fiber/mutex.hpp>
#include <jsoncons/json.hpp>
#include <system_error>

extern "C" {
Expand All @@ -12,6 +13,7 @@ extern "C" {

#include "base/io_buf.h"
#include "base/pod_array.h"
#include "core/json_object.h"
#include "core/mpsc_intrusive_queue.h"
#include "io/io.h"
#include "server/common.h"
Expand Down Expand Up @@ -39,8 +41,8 @@ class RdbLoaderBase {
uint64_t uncompressed_len;
};

using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>>;
using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>, JsonType>;

struct OpaqueObj {
RdbVariant obj;
Expand Down Expand Up @@ -121,6 +123,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadZSetZL();
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<OpaqueObj> ReadStreams();
::io::Result<OpaqueObj> ReadJson();

std::error_code HandleCompressedBlob(int op_type);
std::error_code HandleCompressedBlobFinish();
Expand Down
16 changes: 16 additions & 0 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <lz4frame.h>
#include <zstd.h>

#include <jsoncons/json.hpp>

extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
Expand All @@ -23,6 +25,7 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/engine_shard_set.h"
Expand Down Expand Up @@ -132,6 +135,8 @@ uint8_t RdbObjectType(unsigned type, unsigned compact_enc) {
return RDB_TYPE_STREAM_LISTPACKS;
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
case OBJ_JSON:
return RDB_TYPE_JSON;
}
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
return 0; /* avoid warning */
Expand Down Expand Up @@ -284,9 +289,11 @@ io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValu
ec = SaveString(key);
if (ec)
return make_unexpected(ec);

ec = SaveValue(pv);
if (ec)
return make_unexpected(ec);

return rdb_type;
}

Expand Down Expand Up @@ -314,6 +321,10 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
return SaveStreamObject(pv.AsRObj());
}

if (obj_type == OBJ_JSON) {
return SaveJsonObject(pv);
}

LOG(ERROR) << "Not implemented " << obj_type;
return make_error_code(errc::function_not_supported);
}
Expand Down Expand Up @@ -529,6 +540,11 @@ error_code RdbSerializer::SaveStreamObject(const robj* obj) {
return error_code{};
}

error_code RdbSerializer::SaveJsonObject(const PrimeValue& pv) {
auto json_string = pv.GetJson()->to_string();
return SaveString(json_string);
}
Comment on lines +543 to +546
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we should maybe enable one of jsoncons extensions and use a binary format like bson


/* Save a long long value as either an encoded string or a string. */
error_code RdbSerializer::SaveLongLongAsString(int64_t value) {
uint8_t buf[32];
Expand Down
2 changes: 2 additions & 0 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class RdbSerializer {
std::error_code SaveHSetObject(const PrimeValue& pv);
std::error_code SaveZSetObject(const robj* obj);
std::error_code SaveStreamObject(const robj* obj);
std::error_code SaveJsonObject(const PrimeValue& pv);

std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
Expand Down
Loading