Skip to content

Commit

Permalink
RDMA: Use valkey.conf style instead of module parameters
Browse files Browse the repository at this point in the history
Move 4 parameters from valkey-rdma.so to valkey-server, keep RDMA
listener similar to TCP/TLS. Also prepare to build Valkey Over RDMA
into builtin.

Signed-off-by: zhenwei pi <[email protected]>
  • Loading branch information
pizhenwei committed Nov 21, 2024
1 parent feeb589 commit b8378df
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 156 deletions.
64 changes: 64 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2958,6 +2958,66 @@ static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc
return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count);
}

static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);

return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count);
}

static sds getConfigRdmaBindOption(standardConfig *config) {
UNUSED(config);
return sdsjoin(server.rdma_ctx_config.bindaddr, server.rdma_ctx_config.bindaddr_count, " ");
}

static void rewriteConfigRdmaBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
UNUSED(config);

if (server.rdma_ctx_config.bindaddr_count) {
rewriteConfigBindOption(config, name, state, server.rdma_ctx_config.bindaddr,
server.rdma_ctx_config.bindaddr_count);
}
}

static int applyRdmaBind(const char **err) {
connListener *rdma_listener = listenerByType(CONN_TYPE_RDMA);

if (!rdma_listener) {
*err = "No RDMA building support.";
return 0;
}

rdma_listener->bindaddr = server.rdma_ctx_config.bindaddr;
rdma_listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
rdma_listener->port = server.rdma_ctx_config.port;
rdma_listener->ct = connectionByType(CONN_TYPE_RDMA);
if (changeListener(rdma_listener) == C_ERR) {
*err = "Failed to bind to specified addresses for RDMA.";
return 0;
}

return 1;
}

static int updateRdmaPort(const char **err) {
connListener *listener = listenerByType(CONN_TYPE_RDMA);

if (listener != NULL) {
*err = "No RDMA building support.";
return 0;
}

listener->bindaddr = server.rdma_ctx_config.bindaddr;
listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
listener->port = server.rdma_ctx_config.port;
listener->ct = connectionByType(CONN_TYPE_RDMA);
if (changeListener(listener) == C_ERR) {
*err = "Unable to listen on this port for RDMA. Check server logs.";
return 0;
}

return 1;
}

static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);

Expand Down Expand Up @@ -3251,6 +3311,9 @@ standardConfig static_configs[] = {
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-comp-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.comp_vector, -1, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down Expand Up @@ -3331,6 +3394,7 @@ standardConfig static_configs[] = {
createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj),
createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL),
createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind),
createSpecialConfig("rdma-bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigRdmaBindOption, getConfigRdmaBindOption, rewriteConfigRdmaBindOption, applyRdmaBind),
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),

Expand Down
1 change: 1 addition & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef enum {
#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_RDMA "rdma"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */

typedef void (*ConnectionCallbackFunc)(struct connection *conn);
Expand Down
169 changes: 14 additions & 155 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ typedef struct rdma_listener {
static list *pending_list;

static rdma_listener *rdma_listeners;
static serverRdmaContextConfig *rdma_config;

static ConnectionType CT_RDMA;

static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE;
static int valkey_rdma_comp_vector = -1; /* -1 means a random one */

static void serverRdmaError(char *err, const char *fmt, ...) {
va_list ap;

Expand Down Expand Up @@ -272,7 +270,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {

/* setup recv buf & MR */
access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
length = valkey_rdma_rx_size;
length = rdma_config->rx_size;
ctx->rx.addr = page_aligned_zalloc(length);
ctx->rx.length = length;
ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access);
Expand All @@ -295,6 +293,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
struct ibv_comp_channel *comp_channel = NULL;
struct ibv_cq *cq = NULL;
struct ibv_pd *pd = NULL;
int comp_vector = rdma_config->comp_vector;

if (ibv_query_device(cm_id->verbs, &device_attr)) {
serverLog(LL_WARNING, "RDMA: ibv ibv query device failed");
Expand All @@ -317,8 +316,13 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) {

ctx->comp_channel = comp_channel;

/* negative number means a random one */
if (comp_vector < 0) {
comp_vector = abs((int)random());
}

cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, NULL, comp_channel,
valkey_rdma_comp_vector % cm_id->verbs->num_comp_vectors);
comp_vector % cm_id->verbs->num_comp_vectors);
if (!cq) {
serverLog(LL_WARNING, "RDMA: ibv create cq failed");
return C_ERR;
Expand Down Expand Up @@ -1610,6 +1614,7 @@ int connRdmaListen(connListener *listener) {
rdma_listener++;
}

rdma_config = listener->priv;
return C_OK;
}

Expand All @@ -1628,6 +1633,7 @@ static void connRdmaCloseListener(connListener *listener) {
listener->count = 0;
zfree(rdma_listeners);
rdma_listeners = NULL;
rdma_config = NULL;
}

static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
Expand Down Expand Up @@ -1787,17 +1793,6 @@ static ConnectionType CT_RDMA = {
.process_pending_data = rdmaProcessPendingData,
};

static struct connListener *rdmaListener(void) {
static struct connListener *listener = NULL;

if (listener) return listener;

listener = listenerByType(CONN_TYPE_RDMA);
serverAssert(listener != NULL);

return listener;
}

ConnectionType *connectionTypeRdma(void) {
static ConnectionType *ct_rdma = NULL;

Expand All @@ -1809,117 +1804,15 @@ ConnectionType *connectionTypeRdma(void) {
return ct_rdma;
}

/* rdma listener has different create/close logic from TCP, we can't re-use 'int changeListener(connListener *listener)'
* directly */
static int rdmaChangeListener(void) {
struct connListener *listener = rdmaListener();

connRdmaCloseListener(listener);
/* Just close the server if port disabled */
if (listener->port == 0) {
if (server.set_proc_title) serverSetProcTitle(NULL);
return VALKEYMODULE_OK;
}

/* Re-create listener */
if (connListen(listener) != C_OK) {
return VALKEYMODULE_ERR;
}

/* Create event handlers */
if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) {
serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL));
}

if (server.set_proc_title) serverSetProcTitle(NULL);

return VALKEYMODULE_OK;
}

#ifdef BUILD_RDMA_MODULE

#include "release.h"

static long long rdmaGetPort(const char *name, void *privdata) {
UNUSED(name);
UNUSED(privdata);
struct connListener *listener = rdmaListener();

return listener->port;
}

static int rdmaSetPort(const char *name, long long val, void *privdata, ValkeyModuleString **err) {
UNUSED(name);
UNUSED(privdata);
UNUSED(err);
struct connListener *listener = rdmaListener();
listener->port = val;

return VALKEYMODULE_OK;
}

static ValkeyModuleString *rdma_bind;

static void rdmaBuildBind(void *ctx) {
struct connListener *listener = rdmaListener();

if (rdma_bind) ValkeyModule_FreeString(NULL, rdma_bind);

sds rdma_bind_str = sdsjoin(listener->bindaddr, listener->bindaddr_count, " ");
rdma_bind = ValkeyModule_CreateString(ctx, rdma_bind_str, sdslen(rdma_bind_str));
}

static ValkeyModuleString *rdmaGetBind(const char *name, void *privdata) {
UNUSED(name);
UNUSED(privdata);

return rdma_bind;
}

static int rdmaSetBind(const char *name, ValkeyModuleString *val, void *privdata, ValkeyModuleString **err) {
UNUSED(name);
UNUSED(err);
struct connListener *listener = rdmaListener();
const char *bind = ValkeyModule_StringPtrLen(val, NULL);
int nexts;
sds *exts = sdssplitlen(bind, strlen(bind), " ", 1, &nexts);

if (nexts > CONFIG_BINDADDR_MAX) {
serverLog(LL_WARNING, "RDMA: Unsupported bind ( > %d)", CONFIG_BINDADDR_MAX);
return VALKEYMODULE_ERR;
}

/* Free old bind addresses */
for (int j = 0; j < listener->bindaddr_count; j++) {
zfree(listener->bindaddr[j]);
}

for (int j = 0; j < nexts; j++) listener->bindaddr[j] = zstrdup(exts[j]);
listener->bindaddr_count = nexts;

sdsfreesplitres(exts, nexts);
rdmaBuildBind(privdata);

return VALKEYMODULE_OK;
}

static int rdmaApplyListener(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err) {
UNUSED(ctx);
UNUSED(privdata);
UNUSED(err);

return rdmaChangeListener();
}

static void rdmaListenerAddConfig(void *ctx) {
serverAssert(ValkeyModule_RegisterNumericConfig(ctx, "port", 0, VALKEYMODULE_CONFIG_DEFAULT, 0, 65535, rdmaGetPort,
rdmaSetPort, rdmaApplyListener, NULL) == VALKEYMODULE_OK);
serverAssert(ValkeyModule_RegisterStringConfig(ctx, "bind", "", VALKEYMODULE_CONFIG_DEFAULT, rdmaGetBind,
rdmaSetBind, rdmaApplyListener, ctx) == VALKEYMODULE_OK);
serverAssert(ValkeyModule_LoadConfigs(ctx) == VALKEYMODULE_OK);
}

int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);

/* Connection modules MUST be part of the same build as valkey. */
if (strcmp(REDIS_BUILD_ID_RAW, serverBuildIdRaw())) {
serverLog(LL_NOTICE, "Connection type %s was not built together with the valkey-server used.", CONN_TYPE_RDMA);
Expand All @@ -1938,40 +1831,6 @@ int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) {

if (connTypeRegister(&CT_RDMA) != C_OK) return VALKEYMODULE_ERR;

rdmaListenerAddConfig(ctx);

struct connListener *listener = rdmaListener();
listener->ct = connectionTypeRdma();
listener->bindaddr = zcalloc_num(CONFIG_BINDADDR_MAX, sizeof(listener->bindaddr[0]));

for (int i = 0; i < argc; i++) {
robj *str = (robj *)argv[i];
int nexts;
sds *exts = sdssplitlen(str->ptr, strlen(str->ptr), "=", 1, &nexts);
if (nexts != 2) {
serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr);
return VALKEYMODULE_ERR;
}

if (!strcasecmp(exts[0], "bind")) {
listener->bindaddr[listener->bindaddr_count++] = zstrdup(exts[1]);
} else if (!strcasecmp(exts[0], "port")) {
listener->port = atoi(exts[1]);
} else if (!strcasecmp(exts[0], "rx-size")) {
valkey_rdma_rx_size = atoi(exts[1]);
} else if (!strcasecmp(exts[0], "comp-vector")) {
valkey_rdma_comp_vector = atoi(exts[1]);
} else {
serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr);
return VALKEYMODULE_ERR;
}

sdsfreesplitres(exts, nexts);
}

rdmaBuildBind(ctx);
if (valkey_rdma_comp_vector == -1) valkey_rdma_comp_vector = abs((int)random());

return VALKEYMODULE_OK;
}

Expand Down
11 changes: 11 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2884,6 +2884,17 @@ void initListeners(void) {
listener->priv = &server.unix_ctx_config; /* Unix socket specified */
}

if (server.rdma_ctx_config.port != 0) {
conn_index = connectionIndexByType(CONN_TYPE_RDMA);
if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_RDMA);
listener = &server.listeners[conn_index];
listener->bindaddr = server.rdma_ctx_config.bindaddr;
listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
listener->port = server.rdma_ctx_config.port;
listener->ct = connectionByType(CONN_TYPE_RDMA);
listener->priv = &server.rdma_ctx_config;
}

/* create all the configured listener, and add handler to start to accept */
int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
Expand Down
12 changes: 12 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,17 @@ typedef struct serverUnixContextConfig {
unsigned int perm; /* UNIX socket permission (see mode_t) */
} serverUnixContextConfig;

/*-----------------------------------------------------------------------------
* RDMA Context Configuration
*----------------------------------------------------------------------------*/
typedef struct serverRdmaContextConfig {
char *bindaddr[CONFIG_BINDADDR_MAX];
int bindaddr_count;
int port;
int rx_size;
int comp_vector;
} serverRdmaContextConfig;

/*-----------------------------------------------------------------------------
* AOF manifest definition
*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -2225,6 +2236,7 @@ struct valkeyServer {
int tls_auth_clients;
serverTLSContextConfig tls_ctx_config;
serverUnixContextConfig unix_ctx_config;
serverRdmaContextConfig rdma_ctx_config;
/* cpu affinity */
char *server_cpulist; /* cpu affinity list of server main/io thread. */
char *bio_cpulist; /* cpu affinity list of bio thread. */
Expand Down
2 changes: 1 addition & 1 deletion tests/rdma/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_rdma(ipaddr):
rdmapath = valkeydir + "/src/valkey-rdma.so"
svrcmd = [svrpath, "--port", "0", "--loglevel", "verbose", "--protected-mode", "yes",
"--appendonly", "no", "--daemonize", "no", "--dir", valkeydir + "/tests/rdma/tmp",
"--loadmodule", rdmapath, "port=6379", "bind=" + ipaddr]
"--loadmodule", rdmapath, "--rdma-port", "6379", "--rdma-bind", ipaddr]

svr = subprocess.Popen(svrcmd, shell=False, stdout=subprocess.PIPE)
try:
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ start_server {tags {"introspection"}} {
req-res-logfile
client-default-resp
dual-channel-replication-enabled
rdma-comp-vector
rdma-rx-size
rdma-bind
rdma-port
}

if {!$::tls} {
Expand Down
Loading

0 comments on commit b8378df

Please sign in to comment.