Skip to content

Commit

Permalink
fix: BLPOP BZPOP(MIN|MAX) moved error
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jul 18, 2024
1 parent e3eb851 commit d7c0082
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 16 deletions.
16 changes: 16 additions & 0 deletions src/server/cluster/cluster_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_defs.h"
#include "facade/error.h"
#include "slot_set.h"
#include "src/server/common.h"

// TODO remove when tl_cluster_config will be moved out from it
#include "server/cluster/cluster_family.h"

using namespace std;

ABSL_FLAG(string, cluster_mode, "",
Expand Down Expand Up @@ -92,4 +96,16 @@ bool IsClusterShardedByTag() {
return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled;
}

std::optional<std::string> SlotOwnershipErrorStr(SlotId slot_id) {
const cluster::ClusterConfig* cluster_config = ClusterFamily::cluster_config();
if (!cluster_config)
return facade::kClusterNotConfigured;

if (!cluster_config->IsMySlot(slot_id)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(slot_id);
return absl::StrCat("-MOVED ", slot_id, " ", master.ip, ":", master.port);
}
return std::nullopt;
}
} // namespace dfly::cluster
4 changes: 4 additions & 0 deletions src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
Expand Down Expand Up @@ -122,6 +123,9 @@ enum class MigrationState : uint8_t {

SlotId KeySlot(std::string_view key);

// return error message if slot doesn't belong to this node
std::optional<std::string> SlotOwnershipErrorStr(SlotId slot_id);

void InitializeCluster();
bool IsClusterEnabled();
bool IsClusterEmulated();
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ClusterFamily {
void Register(CommandRegistry* registry);

// Returns a thread-local pointer.
ClusterConfig* cluster_config();
static ClusterConfig* cluster_config();

void ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots,
bool is_outgoing);
Expand Down
1 change: 0 additions & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/compact_object.h"
#include "server/cluster/cluster_defs.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
Expand Down
4 changes: 2 additions & 2 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,8 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED:
// TODO: proper error for moved
return cntx->SendError("-MOVED");
// if key is moved the cluster is definitely exist and we must get an error
return cntx->SendError(*cluster::SlotOwnershipErrorStr(*transaction->GetUniqueSlotId()));
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();
}
Expand Down
18 changes: 6 additions & 12 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,10 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send,
}

if (cluster::IsClusterEnabled()) {
if (cluster_family->cluster_config() == nullptr) {
if (cluster::ClusterFamily::cluster_config() == nullptr) {
resp.body() += "<h2>Not yet configured.</h2>\n";
} else {
auto config = cluster_family->cluster_config()->GetConfig();
auto config = cluster::ClusterFamily::cluster_config()->GetConfig();
for (const auto& shard : config) {
resp.body() += "<div class='master'>\n";
resp.body() += "<h3>Master</h3>\n";
Expand Down Expand Up @@ -958,16 +958,10 @@ optional<ErrorReply> Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis
return ErrorReply{"-CROSSSLOT Keys in request don't hash to the same slot"};
}

// Check keys slot is in my ownership
const cluster::ClusterConfig* cluster_config = cluster_family_.cluster_config();
if (cluster_config == nullptr) {
return ErrorReply{kClusterNotConfigured};
}

if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot);
return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)};
if (keys_slot.has_value()) {
if (auto error_str = cluster::SlotOwnershipErrorStr(*keys_slot); error_str) {
return ErrorReply{*error_str};
}
}

return nullopt;
Expand Down
3 changes: 3 additions & 0 deletions src/server/zset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,9 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED:
// if key is moved the cluster is definitely exist and we must get an error
return cntx->SendError(*cluster::SlotOwnershipErrorStr(*transaction->GetUniqueSlotId()));
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();
}
Expand Down
41 changes: 41 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,47 @@ async def test_cluster_blocking_command(df_server):
await close_clients(c_master, c_master_admin)


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_blocking_comands_cancel(df_factory, df_seeder_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]

df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

set_task = asyncio.create_task(nodes[0].client.execute_command("BZPOPMIN set1 0"))
list_task = asyncio.create_task(nodes[0].client.execute_command("BLPOP list1 0"))

nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.port, [(0, 16383)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")

nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
logging.debug("remove finished migrations")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

with pytest.raises(aioredis.ResponseError) as set_e_info:
await set_task
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)

with pytest.raises(aioredis.ResponseError) as list_e_info:
await list_task
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)

await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])


@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(
Expand Down

0 comments on commit d7c0082

Please sign in to comment.