Skip to content

Commit

Permalink
fix: support loading of 7.x streams correctly
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Dec 9, 2024
1 parent bf4f45a commit 9ccce99
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 58 deletions.
4 changes: 4 additions & 0 deletions src/facade/facade_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ inline bool operator==(const RespExpr& left, std::string_view s) {
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
}

inline bool operator==(const RespExpr& left, int64_t val) {
return left.type == RespExpr::INT64 && left.GetInt() == val;
}

inline bool operator!=(const RespExpr& left, std::string_view s) {
return !(left == s);
}
Expand Down
1 change: 0 additions & 1 deletion src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ extern "C" {
}

// Custom types: Range 30-35 is used by DF RDB types.
constexpr uint8_t RDB_TYPE_JSON_OLD = 20;
constexpr uint8_t RDB_TYPE_JSON = 30;
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
Expand Down
62 changes: 31 additions & 31 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
CreateZSet(ptr.get());
break;
case RDB_TYPE_STREAM_LISTPACKS:
case RDB_TYPE_STREAM_LISTPACKS_2:
case RDB_TYPE_STREAM_LISTPACKS_3:
CreateStream(ptr.get());
break;
default:
Expand Down Expand Up @@ -955,16 +957,29 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
}

s->length = ltrace->stream_trace->stream_len;
s->last_id.ms = ltrace->stream_trace->ms;
s->last_id.seq = ltrace->stream_trace->seq;
CopyStreamId(ltrace->stream_trace->last_id, &s->last_id);
CopyStreamId(ltrace->stream_trace->first_id, &s->first_id);
CopyStreamId(ltrace->stream_trace->max_deleted_entry_id, &s->max_deleted_entry_id);
s->entries_added = ltrace->stream_trace->entries_added;

if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
/* Since the rax is already loaded, we can find the first entry's
* ID. */
streamGetEdgeID(s, 1, 1, &s->first_id);
}

for (const auto& cg : ltrace->stream_trace->cgroup) {
string_view cgname = ToSV(cg.name);
streamID cg_id;
cg_id.ms = cg.ms;
cg_id.seq = cg.seq;

streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, 0);
uint64_t entries_read = cg.entries_read;
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
entries_read = streamEstimateDistanceFromFirstEverEntry(s, &cg_id);
}

streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, entries_read);
if (cgroup == NULL) {
LOG(ERROR) << "Duplicated consumer group name " << cgname;
ec_ = RdbError(errc::duplicate_key);
Expand Down Expand Up @@ -1512,7 +1527,7 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK
if (rdb_version_ < 10) {
// consider it RDB_TYPE_JSON_OLD
// consider it RDB_TYPE_JSON_OLD (20)
iores = ReadJson();
} else {
iores = ReadGeneric(rdbtype);
Expand Down Expand Up @@ -1876,21 +1891,20 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
// so if there are still unread elements return the partial stream.
if (listpacks > n) {
pending_read_.remaining = listpacks - n;
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
return OpaqueObj{std::move(load_trace), rdbtype};
}

// Load stream metadata.
pending_read_.remaining = 0;

// Load stream metadata.
load_trace->stream_trace.reset(new StreamTrace);

/* Load total number of items inside the stream. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len);

/* Load the last entry ID. */
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.seq);

if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Load the first entry ID. */
Expand All @@ -1907,13 +1921,7 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
/* During migration the offset can be initialized to the stream's
* length. At this point, we also don't care about tombstones
* because CG offsets will be later initialized as well. */
load_trace->stream_trace->max_deleted_entry_id.ms = 0;
load_trace->stream_trace->max_deleted_entry_id.seq = 0;
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;

// TODO add implementation, we need to find the first entry's ID.
// The redis code is next
// streamGetEdgeID(s,1,1,&s->first_id);
}

/* Consumer groups loading */
Expand All @@ -1937,24 +1945,11 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);

uint64_t cg_offset;
cgroup.entries_read = 0;
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset);
(void)cg_offset;
} else {
// TODO implement
// cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry();
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.entries_read);
}

// TODO add our implementation for the next Redis logic
// streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset);
// if (cgroup == NULL) {
// rdbReportCorruptRDB("Duplicated consumer group name %s", cgname);
// decrRefCount(o);
// sdsfree(cgname);
// return NULL;
// }

/* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message
* owner, since consumers for this group and their messages will
Expand Down Expand Up @@ -2724,6 +2719,11 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig co
return visitor.ec();
}

void RdbLoaderBase::CopyStreamId(const StreamID& src, struct streamID* dest) {
dest->ms = src.ms;
dest->seq = src.seq;
}

void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
EngineShard* es = EngineShard::tlocal();
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
Expand Down
14 changes: 9 additions & 5 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ extern "C" {
#include "server/common.h"
#include "server/journal/serializer.h"

struct streamID;

namespace dfly {

class EngineShardSet;
Expand Down Expand Up @@ -84,26 +86,26 @@ class RdbLoaderBase {
};

struct StreamID {
uint64_t ms;
uint64_t seq;
uint64_t ms = 0;
uint64_t seq = 0;
};

struct StreamCGTrace {
RdbVariant name;
uint64_t ms;
uint64_t seq;

uint64_t entries_read;
std::vector<StreamPelTrace> pel_arr;
std::vector<StreamConsumerTrace> cons_arr;
};

struct StreamTrace {
size_t lp_len;
size_t stream_len;
uint64_t ms, seq;
StreamID last_id;
StreamID first_id; /* The first non-tombstone entry, zero if empty. */
StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
uint64_t entries_added = 0; /* All time count of elements added. */
std::vector<StreamCGTrace> cgroup;
};

Expand Down Expand Up @@ -192,6 +194,8 @@ class RdbLoaderBase {

std::error_code EnsureReadInternal(size_t min_to_read);

static void CopyStreamId(const StreamID& src, struct streamID* dest);

base::IoBuf* mem_buf_ = nullptr;
base::IoBuf origin_mem_buf_;
::io::Source* src_ = nullptr;
Expand Down
44 changes: 38 additions & 6 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");

// TODO: to retire both flags in v1.27 (Jan 2025)
ABSL_FLAG(bool, list_rdb_encode_v2, true,
"V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb "
"enconding of list uses ziplist encoding compatible with redis 6");

ABSL_FLAG(bool, stream_rdb_encode_v2, false,
"V2 uses format, compatible with redis 7.2, while v1 format "
"is compatible with redis 6 and Dragonfly v1.25 or lower");

namespace dfly {

using namespace std;
Expand Down Expand Up @@ -209,12 +215,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
}
break;
case OBJ_STREAM:
return RDB_TYPE_STREAM_LISTPACKS;
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS
: RDB_TYPE_STREAM_LISTPACKS_2;
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
case OBJ_JSON:
return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July
// 2024.
return RDB_TYPE_JSON;
case OBJ_SBF:
return RDB_TYPE_SBF;
}
Expand Down Expand Up @@ -657,6 +663,22 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveLen(s->last_id.ms));
RETURN_ON_ERR(SaveLen(s->last_id.seq));

uint8_t rdb_type = RdbObjectType(pv);

// 'first_id', 'max_deleted_entry_id' and 'entries_added' are added
// in RDB_TYPE_STREAM_LISTPACKS_2
if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Save the first entry ID. */
RETURN_ON_ERR(SaveLen(s->first_id.ms));
RETURN_ON_ERR(SaveLen(s->first_id.seq));

/* Save the maximal tombstone ID. */
RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.ms));
RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.seq));

/* Save the offset. */
RETURN_ON_ERR(SaveLen(s->entries_added));
}
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */

Expand All @@ -678,9 +700,14 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));

/* Last ID. */
RETURN_ON_ERR(SaveLen(s->last_id.ms));
RETURN_ON_ERR(SaveLen(cg->last_id.ms));

RETURN_ON_ERR(SaveLen(s->last_id.seq));
RETURN_ON_ERR(SaveLen(cg->last_id.seq));

if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Save the group's logical reads counter. */
RETURN_ON_ERR(SaveLen(cg->entries_read));
}

/* Save the global PEL. */
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));
Expand Down Expand Up @@ -836,10 +863,15 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
/* Consumer name. */
RETURN_ON_ERR(SaveString(ri.key, ri.key_len));

/* Last seen time. */
/* seen time. */
absl::little_endian::Store64(buf, consumer->seen_time);
RETURN_ON_ERR(WriteRaw(buf));

// TODO: enable this when we switch to RDB_TYPE_STREAM_LISTPACKS_3
/* Active time. */
// absl::little_endian::Store64(buf, consumer->active_time);
// RETURN_ON_ERR(WriteRaw(buf));

/* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
Expand Down
Loading

0 comments on commit 9ccce99

Please sign in to comment.