Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) #720

Merged
merged 9 commits into from
Jul 26, 2024
4 changes: 4 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"
#include "cluster_slot_stats.h"
#include "endianconv.h"
#include "connection.h"

Expand Down Expand Up @@ -1042,6 +1043,7 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
clusterSlotStatsReset();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -4943,6 +4945,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
clusterNodeSetSlotBit(n, slot);
server.cluster->slots[slot] = n;
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK;
}

Expand All @@ -4961,6 +4964,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK;
}

Expand Down
54 changes: 49 additions & 5 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
INVALID,
madolson marked this conversation as resolved.
Show resolved Hide resolved
KEY_COUNT,
NETWORK_BYTES_IN,
} slotStatTypes;

/* -----------------------------------------------------------------------------
Expand All @@ -24,6 +24,14 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t network_bytes_in;
} slotStat;

/* Struct used for storing slot statistics, for all slots owned by the current shard. */
struct slotStat cluster_slot_stats[CLUSTER_SLOTS];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This global is 128KB even in standalone mode. Can we store it inside the cluster struct so it only consumes this memory in cluster mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The struct has been moved under clusterState.

struct clusterState {
    ...
    slotStat slot_stats[CLUSTER_SLOTS];
}


static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand All @@ -47,6 +55,8 @@ static uint64_t getSlotStat(int slot, int stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = cluster_slot_stats[slot].network_bytes_in;
}
return slot_stat;
}
Expand Down Expand Up @@ -88,9 +98,11 @@ static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyMapLen(c, 2); /* Nested map representing slot usage statistics. */
madolson marked this conversation as resolved.
Show resolved Hide resolved
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, cluster_slot_stats[slot].network_bytes_in);
}

/* Adds reply for the SLOTSRANGE variant.
Expand Down Expand Up @@ -121,6 +133,35 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
addReplySortedSlotStats(c, slot_stats, limit);
}

static int canAddNetworkBytes(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. */
return server.cluster_enabled && c->slot != -1 && !(c->flag.blocked);
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through countKeysInSlot(). */
cluster_slot_stats[slot].network_bytes_in = 0;
}

void clusterSlotStatsReset(void) {
memset(cluster_slot_stats, 0, sizeof(cluster_slot_stats));
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytes() check failure.
* */
void clusterSlotStatsAddNetworkBytesIn(client *c) {
if (!canAddNetworkBytes(c)) return;

cluster_slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
Expand Down Expand Up @@ -149,8 +190,11 @@ void clusterSlotStatsCommand(client *c) {
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in")) {
order_by = NETWORK_BYTES_IN;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported "
"metrics are: key-count and cpu-usec.");
madolson marked this conversation as resolved.
Show resolved Hide resolved
return;
}
int i = 4; /* Next argument index, following ORDERBY */
Expand Down
12 changes: 12 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatsReset(void);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These names are too similar. Can we call the second one clusterSlotStatResetAll?

void clusterSlotStatsAddNetworkBytesIn(client *c);
3 changes: 3 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"properties": {
"key-count": {
"type": "integer"
},
"memory-bytes-in": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

network, not memory

"type": "integer"
}
}
}
Expand Down
65 changes: 62 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
Expand Down Expand Up @@ -215,6 +216,7 @@ client *createClient(connection *conn) {
if (conn) linkClient(c);
initClientMultiState(c);
c->net_input_bytes = 0;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes = 0;
c->commands_processed = 0;
return c;
Expand Down Expand Up @@ -2084,6 +2086,7 @@ void resetClient(client *c) {
c->cur_script = NULL;
c->reqtype = 0;
c->multibulklen = 0;
c->net_input_bytes_curr_cmd = 0;
c->bulklen = -1;
c->slot = -1;
c->flag.executing_command = 0;
Expand Down Expand Up @@ -2268,6 +2271,21 @@ int processInlineBuffer(client *c) {
c->argv_len_sum += sdslen(argv[j]);
}
zfree(argv);

/* Per-slot network bytes-in calculation.
*
* Within networking.c, we calculate and store the current command's ingress bytes
madolson marked this conversation as resolved.
Show resolved Hide resolved
* under c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For inline buffer, every whitespace is of length 1,
* with the exception of the trailing '\r\n' being length 2.
*
* For example;
* Command) SET key value
* Inline) SET key value\r\n
* */
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
return C_OK;
}

Expand Down Expand Up @@ -2341,7 +2359,8 @@ int processMultibulkBuffer(client *c) {
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll);
size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos);
ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll);
if (!ok || ll > INT_MAX) {
addReplyError(c, "Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count", c);
Expand All @@ -2363,6 +2382,39 @@ int processMultibulkBuffer(client *c) {
c->argv_len = min(c->multibulklen, 1024);
c->argv = zmalloc(sizeof(robj *) * c->argv_len);
c->argv_len_sum = 0;

/* Per-slot network bytes-in calculation.
*
* Within networking.c, we calculate and store the current command's ingress bytes
* under c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For multi bulk buffer, we accumulate four factors, namely;
*
* 1) multibulklen_slen + 1
* Cumulative string length (and not the value of) of multibulklen,
* including +1 from RESP first byte.
* 2) bulklen_slen + c->argc
* Cumulative string length (and not the value of) of bulklen,
* including +1 from RESP first byte per argument count.
* 3) c->argv_len_sum
* Cumulative string length of all argument vectors.
* 4) c->argc * 4 + 2
* Cumulative string length of all white-spaces, for which there exists a total of
* 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen.
*
* For example;
* Command) SET key value
* RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*
* 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1).
* 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc).
* 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum).
* 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2).
madolson marked this conversation as resolved.
Show resolved Hide resolved
*
* The 1st component is calculated within the below line.
* */
c->net_input_bytes_curr_cmd += (multibulklen_slen + 1);
}

serverAssertWithInfo(c, NULL, c->multibulklen > 0);
Expand All @@ -2388,7 +2440,8 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}

ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll);
size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1);
ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll);
if (!ok || ll < 0 || (!c->flag.primary && ll > server.proto_max_bulk_len)) {
addReplyError(c, "Protocol error: invalid bulk length");
setProtocolError("invalid bulk length", c);
Expand Down Expand Up @@ -2430,6 +2483,8 @@ int processMultibulkBuffer(client *c) {
}
}
c->bulklen = ll;
/* Per-slot network bytes-in calculation, 2nd component. */
c->net_input_bytes_curr_cmd += (bulklen_slen + c->argc);
}

/* Read bulk argument */
Expand Down Expand Up @@ -2466,7 +2521,11 @@ int processMultibulkBuffer(client *c) {
}

/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK;
if (c->multibulklen == 0) {
/* Per-slot network bytes-in calculation, 3rd and 4th components. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 4 + 2));
return C_OK;
}

/* Still not ready to process the command */
return C_ERR;
Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "server.h"
#include "monotonic.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
Expand Down Expand Up @@ -2499,6 +2500,7 @@ void resetServerStats(void) {
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
clusterSlotStatsReset();
}

/* Make the thread killable at any time, so that kill threads functions
Expand Down Expand Up @@ -3869,6 +3871,9 @@ int processCommand(client *c) {
}
}

/* Now that c->slot has been parsed, accumulate the buffered network bytes-in. */
clusterSlotStatsAddNetworkBytesIn(c);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can add condition server.cluster_enabled here, if it is easier for us to understand the logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely could, although this would incur repetitive checks, as the same condition is checked once more down its callstack;

clusterSlotStatsAddNetworkBytesIn();
|
| calls
|
canAddNetworkBytes(); // --> checks for server.cluster_enabled

Personally, I prefer to keep the top-level call as simple as possible, and embed all conditional checks inside the function body. This way, the same function can be called in multiple locations, without having to wrap them with the same conditional checks everytime.


if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
(is_write_command || (is_read_command && !c->flag.readonly))) {
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
Expand Down
8 changes: 5 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1303,9 +1303,11 @@ typedef struct client {
#ifdef LOG_REQ_RES
clientReqResInfo reqres;
#endif
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the
* execution of this client's current command. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
} client;

/* ACL information */
Expand Down
50 changes: 49 additions & 1 deletion tests/unit/cluster/slot-stats.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $slot_stats {
if {[dict exists $exception_slots $slot]} {
set expected_key_count [dict get $exception_slots $slot]
set expected_key_count [dict get $exception_slots $slot key-count]
set expected_network_bytes_in [dict get $exception_slots $slot network-bytes-in]
assert {[dict get $stats key-count] == $expected_key_count}
assert {[dict get $stats network-bytes-in] == $expected_network_bytes_in}
} else {
assert {[dict get $stats key-count] == 0}
assert {[dict get $stats network-bytes-in] == 0}
}
}
}
Expand Down Expand Up @@ -114,6 +117,51 @@ proc wait_for_replica_key_exists {key key_count} {
}
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS network-bytes-in.
# -----------------------------------------------------------------------------
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action items for integration tests

  • Add additional test cases, namely;
    1. network-bytes-in for blocking commands.
    2. network-bytes-in for transactions (MULTI/EXEC).
    3. network-bytes-in for non-slot specific commands (ex: INFO).
    4. network-bytes-in for pipeline using valkey-cli (valkey-cli --pipe)
    5. network-bytes-in for sharded pub/sub.
  • Fix existing test cases to support the newly introduced network-bytes-in parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done, or is this a followup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.

Fyi, "network-bytes-in for pipeline using valkey-cli" is not explicitly tested, as it follows the same code-path as multibulk processing, for which there already exists a test case. If an explicit test case for valkey-cli --pipe is deemed necessary, I will follow-up in the next revision.


start_cluster 1 0 {tags {external:skip cluster}} {

# Define shared variables.
set key "key"
set key_slot [R 0 cluster keyslot $key]

test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have total net-in and out metrics here, https://valkey.io/commands/client-list/. We might validate this is the same. For these cases we expect them all to be the same.

# Command) SET key value
# RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
R 0 SET $key value

set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1 network-bytes-in 33
]
]

assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." {
# Command) SET key value
# Inline) SET key value\r\n
set rd [valkey_deferring_client]
$rd write "SET $key value\r\n"
$rd flush

set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1 network-bytes-in 15
]
]

assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats
}
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS correctness, without additional arguments.
# -----------------------------------------------------------------------------
Expand Down
Loading