Skip to content

Commit

Permalink
Minor revision.
Browse files Browse the repository at this point in the history
- Added more integration tests.
- Updated canAddCpuDuration() condition to avoid duplicate accounting
  upon exec, eval and fcall commands.

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 13, 2024
1 parent 5ec9293 commit c7f1ae3
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_err
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
/* Populate per-slot statistics for cpu time. */
clusterSlotStatsAddCpuDuration(c->slot, total_cmd_duration);
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
Expand Down
2 changes: 0 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -4945,7 +4945,6 @@ int clusterAddSlot(clusterNode *n, int slot) {
clusterNodeSetSlotBit(n, slot);
server.cluster->slots[slot] = n;
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
/* Clear per-slot statistics. */
clusterSlotStatReset(slot);
return C_OK;
}
Expand All @@ -4965,7 +4964,6 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
/* Clear per-slot statistics. */
clusterSlotStatReset(slot);
return C_OK;
}
Expand Down
24 changes: 15 additions & 9 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ typedef struct {

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu;
uint64_t cpu_usec;
} slotStat;

/* Struct used for storing slot statistics, for all slots owned by the current shard. */
Expand Down Expand Up @@ -56,7 +56,7 @@ static uint64_t getSlotStat(int slot, int stat_type) {
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == CPU_USEC) {
slot_stat = cluster_slot_stats[slot].cpu;
slot_stat = cluster_slot_stats[slot].cpu_usec;
}
return slot_stat;
}
Expand Down Expand Up @@ -94,7 +94,7 @@ static void addReplySlotStat(client *c, int slot) {
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, cluster_slot_stats[slot].cpu);
addReplyLongLong(c, cluster_slot_stats[slot].cpu_usec);
}

/* Adds reply for the SLOTSRANGE variant.
Expand Down Expand Up @@ -128,17 +128,23 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
cluster_slot_stats[slot].cpu = 0;
memset(&cluster_slot_stats[slot], 0, sizeof(slotStat));
}

void clusterSlotStatsReset(void) {
memset(cluster_slot_stats, 0, sizeof(cluster_slot_stats));
}

void clusterSlotStatsAddCpuDuration(int slot, long duration) {
if (!server.execution_nesting && server.cluster_enabled && slot != -1) {
cluster_slot_stats[slot].cpu += duration;
}
static int canAddCpuDuration(client *c) {
return server.cluster_enabled && c->slot != -1 && c->cmd->proc != execCommand && c->cmd->proc != fcallCommand &&
c->cmd->proc != evalCommand;
}

void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
if (!canAddCpuDuration(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
cluster_slot_stats[c->slot].cpu_usec += duration;
}

void clusterSlotStatsCommand(client *c) {
Expand Down Expand Up @@ -172,7 +178,7 @@ void clusterSlotStatsCommand(client *c) {
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec")) {
order_by = CPU_USEC;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported "
addReplyError(c, "Unrecognized sort metric for ORDERBY. The supported "
"metrics are: key-count and cpu-usec.");
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@

void clusterSlotStatReset(int slot);
void clusterSlotStatsReset(void);
void clusterSlotStatsAddCpuDuration(int slot, long duration);
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
4 changes: 1 addition & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3523,11 +3523,9 @@ void call(client *c, int flags) {
real_cmd->microseconds += c->duration;
if (server.latency_tracking_enabled && !(c->flags & CLIENT_BLOCKED))
updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration * 1000);
clusterSlotStatsAddCpuDuration(c, c->duration);
}

/* Populate per-slot statistics for cpu time. */
clusterSlotStatsAddCpuDuration(c->slot, c->duration);

/* The duration needs to be reset after each call except for a blocked command,
* which is expected to record and reset the duration after unblocking. */
if (!(c->flags & CLIENT_BLOCKED)) {
Expand Down
194 changes: 186 additions & 8 deletions tests/unit/cluster/slot-stats.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ proc convert_array_into_dict {slot_stats} {
return $res
}

proc get_cmdstat_usec {cmd r} {
set cmdstatline [cmdrstat $cmd r]
regexp "usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline -> usec _
return $usec
}

proc initialize_expected_slots_dict {} {
set expected_slots [dict create]
for {set i 0} {$i < 16384} {incr i 1} {
Expand All @@ -36,18 +42,24 @@ proc initialize_expected_slots_dict_with_range {start_slot end_slot} {
proc assert_empty_slot_stats {slot_stats} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $slot_stats {
assert {[dict get $stats key-count] == 0}
dict for {metric value} $stats {
assert {$value == 0}
}
}
}

proc assert_empty_slot_stats_with_exception {slot_stats exception_slots} {
proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_to_assert} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $slot_stats {
if {[dict exists $exception_slots $slot]} {
set expected_key_count [dict get $exception_slots $slot]
assert {[dict get $stats key-count] == $expected_key_count}
foreach metric_name $metrics_to_assert {
set metric_value [dict get $exception_slots $slot $metric_name]
assert {[dict get $stats $metric_name] == $metric_value}
}
} else {
assert {[dict get $stats key-count] == 0}
dict for {metric value} $stats {
assert {$value == 0}
}
}
}
}
Expand Down Expand Up @@ -114,6 +126,167 @@ proc wait_for_replica_key_exists {key key_count} {
}
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS cpu-usec metric correctness.
# -----------------------------------------------------------------------------

start_cluster 1 0 {tags {external:skip cluster}} {

# Define shared variables.
set key "FOO"
set key_slot [R 0 cluster keyslot $key]
set key_secondary "FOO2"
set key_secondary_slot [R 0 cluster keyslot $key_secondary]
set metrics_to_assert [list cpu-usec]

test "CLUSTER SLOT-STATS cpu-usec reset." {
R 0 SET $key VALUE
R 0 DEL $key
R 0 CONFIG RESETSTAT
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for non-slot specific commands." {
R 0 INFO
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for slot specific commands." {
R 0 SET $key VALUE
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set usec [get_cmdstat_usec set r]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on keyspace update." {
# Blocking command with no timeout. Only keyspace update can unblock this client.
set rd [valkey_deferring_client]
$rd BLPOP $key 0
wait_for_blocked_clients_count 1
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
# When the client is blocked, no accumulation is made. This behaviour is identical to INFO COMMANDSTATS.
assert_empty_slot_stats $slot_stats

# Unblocking command.
R 0 LPUSH $key value
wait_for_blocked_clients_count 0

set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set lpush_usec [get_cmdstat_usec lpush r]
set blpop_usec [get_cmdstat_usec blpop r]

# Assert that both blocking and non-blocking command times have been accumulated.
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec [expr $lpush_usec + $blpop_usec]
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on timeout." {
# Blocking command with 1 second timeout.
set rd [valkey_deferring_client]
$rd BLPOP $key 1

# Confirm that the client is blocked, then unblocked after 1 second timeout.
wait_for_blocked_clients_count 1
wait_for_blocked_clients_count 0

# Assert that the blocking command time has been accumulated.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set blpop_usec [get_cmdstat_usec blpop r]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $blpop_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for transactions." {
set r1 [valkey_client]
$r1 MULTI
$r1 SET $key value
$r1 GET $key

# CPU metric is not accumulated until EXEC is reached. This behaviour is identical to INFO COMMANDSTATS.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats

# Execute transaction, and assert that all nested command times have been accumulated.
$r1 EXEC
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set get_usec [get_cmdstat_usec set r]
set set_usec [get_cmdstat_usec get r]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec [expr $get_usec + $set_usec]
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, without cross-slot keys." {
r eval [format "redis.call('set', '%s', 'bar'); redis.call('get', '%s')" $key $key] 0

set get_usec [get_cmdstat_usec set r]
set set_usec [get_cmdstat_usec get r]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]

set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec [expr $get_usec + $set_usec]
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, with cross-slot keys." {
r eval [format "#!lua flags=allow-cross-slot-keys
redis.call('set', '%s', 'bar'); redis.call('get', '%s');
" $key $key_secondary] 0

set set_usec [get_cmdstat_usec set r]
set get_usec [get_cmdstat_usec get r]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]

set expected_slot_stats [
dict create \
$key_slot [
dict create cpu-usec $set_usec
] \
$key_secondary_slot [
dict create cpu-usec $get_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS correctness, without additional arguments.
# -----------------------------------------------------------------------------
Expand All @@ -123,7 +296,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {
# Define shared variables.
set key "FOO"
set key_slot [R 0 cluster keyslot $key]
set expected_slots_to_key_count [dict create $key_slot 1]
set metrics_to_assert [list key-count]
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1
]
]

test "CLUSTER SLOT-STATS contains default value upon valkey-server startup" {
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
Expand All @@ -133,13 +311,13 @@ start_cluster 1 0 {tags {external:skip cluster}} {
test "CLUSTER SLOT-STATS contains correct metrics upon key introduction" {
R 0 SET $key TEST
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats $expected_slots_to_key_count
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}

test "CLUSTER SLOT-STATS contains correct metrics upon key mutation" {
R 0 SET $key NEW_VALUE
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats $expected_slots_to_key_count
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}

test "CLUSTER SLOT-STATS contains correct metrics upon key deletion" {
Expand Down

0 comments on commit c7f1ae3

Please sign in to comment.