Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking commands moved error #3334

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/server/cluster/cluster_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_defs.h"
#include "facade/error.h"
#include "slot_set.h"
#include "src/server/common.h"

// TODO remove when tl_cluster_config will be moved out from it
#include "server/cluster/cluster_family.h"

using namespace std;

ABSL_FLAG(string, cluster_mode, "",
Expand Down Expand Up @@ -92,4 +96,16 @@ bool IsClusterShardedByTag() {
return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled;
}

std::optional<std::string> SlotOwnershipErrorStr(SlotId slot_id) {
const cluster::ClusterConfig* cluster_config = ClusterFamily::cluster_config();
if (!cluster_config)
return facade::kClusterNotConfigured;

if (!cluster_config->IsMySlot(slot_id)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(slot_id);
return absl::StrCat("-MOVED ", slot_id, " ", master.ip, ":", master.port);
}
return std::nullopt;
}
} // namespace dfly::cluster
4 changes: 4 additions & 0 deletions src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
Expand Down Expand Up @@ -122,6 +123,9 @@ enum class MigrationState : uint8_t {

SlotId KeySlot(std::string_view key);

// return error message if slot doesn't belong to this node
std::optional<std::string> SlotOwnershipErrorStr(SlotId slot_id);

void InitializeCluster();
bool IsClusterEnabled();
bool IsClusterEmulated();
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ClusterFamily {
void Register(CommandRegistry* registry);

// Returns a thread-local pointer.
ClusterConfig* cluster_config();
static ClusterConfig* cluster_config();

void ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots,
bool is_outgoing);
Expand Down
1 change: 0 additions & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/compact_object.h"
#include "server/cluster/cluster_defs.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
Expand Down
8 changes: 5 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,11 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED:
// TODO: proper error for moved
return cntx->SendError("-MOVED");
case OpStatus::KEY_MOVED: {
auto error = cluster::SlotOwnershipErrorStr(*transaction->GetUniqueSlotId());
CHECK(error.has_value());
return cntx->SendError(std::move(*error));
}
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();
}
Expand Down
18 changes: 6 additions & 12 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,10 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send,
}

if (cluster::IsClusterEnabled()) {
if (cluster_family->cluster_config() == nullptr) {
if (cluster::ClusterFamily::cluster_config() == nullptr) {
resp.body() += "<h2>Not yet configured.</h2>\n";
} else {
auto config = cluster_family->cluster_config()->GetConfig();
auto config = cluster::ClusterFamily::cluster_config()->GetConfig();
for (const auto& shard : config) {
resp.body() += "<div class='master'>\n";
resp.body() += "<h3>Master</h3>\n";
Expand Down Expand Up @@ -958,16 +958,10 @@ optional<ErrorReply> Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis
return ErrorReply{"-CROSSSLOT Keys in request don't hash to the same slot"};
}

// Check keys slot is in my ownership
const cluster::ClusterConfig* cluster_config = cluster_family_.cluster_config();
if (cluster_config == nullptr) {
return ErrorReply{kClusterNotConfigured};
}

if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot);
return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)};
if (keys_slot.has_value()) {
if (auto error_str = cluster::SlotOwnershipErrorStr(*keys_slot); error_str) {
return ErrorReply{std::move(*error_str)};
}
Comment on lines +961 to +964
Copy link
Contributor

@dranikpg dranikpg Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just return an optional directly, will simplify the code around 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need it in other places

}

return nullopt;
Expand Down
5 changes: 5 additions & 0 deletions src/server/zset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,11 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED: {
auto error = cluster::SlotOwnershipErrorStr(*transaction->GetUniqueSlotId());
CHECK(error.has_value());
return cntx->SendError(std::move(*error));
}
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();
}
Expand Down
41 changes: 41 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,47 @@ async def test_cluster_blocking_command(df_server):
await close_clients(c_master, c_master_admin)


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]

df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

set_task = asyncio.create_task(nodes[0].client.execute_command("BZPOPMIN set1 0"))
list_task = asyncio.create_task(nodes[0].client.execute_command("BLPOP list1 0"))

nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.port, [(0, 16383)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")

nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
logging.debug("remove finished migrations")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

with pytest.raises(aioredis.ResponseError) as set_e_info:
await set_task
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)

with pytest.raises(aioredis.ResponseError) as list_e_info:
await list_task
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)

await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])


@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(
Expand Down
Loading