Skip to content

Commit

Permalink
fix(generic_family): Update indexes in the RESTORE and RENAME commands (
Browse files Browse the repository at this point in the history
  • Loading branch information
BagritsevichStepan authored Sep 26, 2024
1 parent cd279af commit c2da601
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" {
#include "server/rdb_extensions.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
#include "server/search/doc_index.h"
#include "server/set_family.h"
#include "server/transaction.h"
#include "util/varz.h"
Expand Down Expand Up @@ -151,7 +152,7 @@ class RdbRestoreValue : protected RdbLoaderBase {

std::optional<DbSlice::ItAndUpdater> Add(std::string_view payload, std::string_view key,
DbSlice& db_slice, const DbContext& cntx,
const RestoreArgs& args);
const RestoreArgs& args, EngineShard* shard);

private:
std::optional<OpaqueObj> Parse(std::string_view payload);
Expand All @@ -178,7 +179,8 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view
std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
std::string_view key, DbSlice& db_slice,
const DbContext& cntx,
const RestoreArgs& args) {
const RestoreArgs& args,
EngineShard* shard) {
auto opaque_res = Parse(data);
if (!opaque_res) {
return std::nullopt;
Expand All @@ -194,6 +196,7 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime());
res->it->first.SetSticky(args.Sticky());
if (res) {
shard->search_indices()->AddDoc(key, cntx, res->it->second);
return std::move(res.value());
}
return std::nullopt;
Expand Down Expand Up @@ -444,8 +447,8 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
}

RdbRestoreValue loader(serialized_value_.version.value());
auto restored_dest_it =
loader.Add(serialized_value_.value, dest_key_, db_slice, op_args.db_cntx, restore_args);
auto restored_dest_it = loader.Add(serialized_value_.value, dest_key_, db_slice, op_args.db_cntx,
restore_args, shard);

if (restored_dest_it) {
auto& dest_it = restored_dest_it->it;
Expand Down Expand Up @@ -535,7 +538,7 @@ OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::strin
}

RdbRestoreValue loader(rdb_version);
auto res = loader.Add(payload, key, db_slice, op_args.db_cntx, restore_args);
auto res = loader.Add(payload, key, db_slice, op_args.db_cntx, restore_args, op_args.shard);
return res.has_value();
}

Expand Down Expand Up @@ -1579,6 +1582,7 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
if (destination_should_not_exist)
return OpStatus::KEY_EXISTS;

op_args.shard->search_indices()->RemoveDoc(to_key, op_args.db_cntx, to_res.it->second);
is_prior_list = (to_res.it->second.ObjType() == OBJ_LIST);
}

Expand Down Expand Up @@ -1616,6 +1620,8 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
to_res.it->first.SetSticky(sticky);
}

op_args.shard->search_indices()->AddDoc(to_key, op_args.db_cntx, to_res.it->second);

auto bc = op_args.db_cntx.ns->GetBlockingController(es->shard_id());
if (!is_prior_list && to_res.it->second.ObjType() == OBJ_LIST && bc) {
bc->AwakeWatched(op_args.db_cntx.db_index, to_key);
Expand Down
35 changes: 35 additions & 0 deletions src/server/search/search_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,41 @@ TEST_F(SearchFamilyTest, SimpleExpiry) {
Run({"flushall"});
}

TEST_F(SearchFamilyTest, DocsEditing) {
auto resp = Run({"JSON.SET", "k1", ".", R"({"a":"1"})"});
EXPECT_EQ(resp, "OK");

resp = Run({"FT.CREATE", "index", "ON", "JSON", "SCHEMA", "$.a", "AS", "a", "TEXT"});
EXPECT_EQ(resp, "OK");

resp = Run({"FT.SEARCH", "index", "*"});
EXPECT_THAT(resp, IsArray(IntArg(1), "k1", IsArray("$", R"({"a":"1"})")));

// Test dump and restore
resp = Run({"DUMP", "k1"});
auto dump = resp.GetBuf();

resp = Run({"DEL", "k1"});
EXPECT_THAT(resp, IntArg(1));

resp = Run({"RESTORE", "k1", "0", ToSV(dump)});
EXPECT_EQ(resp, "OK");

resp = Run({"FT.SEARCH", "index", "*"});
EXPECT_THAT(resp, IsArray(IntArg(1), "k1", IsArray("$", R"({"a":"1"})")));

// Test renaming a key
EXPECT_EQ(Run({"RENAME", "k1", "new_k1"}), "OK");

resp = Run({"FT.SEARCH", "index", "*"});
EXPECT_THAT(resp, IsArray(IntArg(1), "new_k1", IsArray("$", R"({"a":"1"})")));

EXPECT_EQ(Run({"RENAME", "new_k1", "k1"}), "OK");

resp = Run({"FT.SEARCH", "index", "*"});
EXPECT_THAT(resp, IsArray(IntArg(1), "k1", IsArray("$", R"({"a":"1"})")));
}

TEST_F(SearchFamilyTest, AggregateGroupBy) {
Run({"hset", "key:1", "word", "item1", "foo", "10", "text", "\"first key\"", "non_indexed_value",
"1"});
Expand Down

0 comments on commit c2da601

Please sign in to comment.