Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis read-only replica support #1019

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 164 additions & 60 deletions cachedb/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,38 @@
#include "hiredis/hiredis.h"

struct redis_moddata {
redisContext** ctxs; /* thread-specific redis contexts */
int numctxs; /* number of ctx entries */
const char* server_host; /* server's IP address or host name */
int server_port; /* server's TCP port */
const char* server_path; /* server's unix path, or "", NULL if unused */
const char* server_password; /* server's AUTH password, or "", NULL if unused */
struct timeval command_timeout; /* timeout for commands */
struct timeval connect_timeout; /* timeout for connect */
int logical_db; /* the redis logical database to use */
int setex_available; /* if the SETEX command is supported */
/* thread-specific redis contexts */
redisContext** ctxs;
redisContext** replica_ctxs;
/* number of ctx entries */
int numctxs;
/* server's IP address or host name */
const char* server_host;
const char* replica_server_host;
/* server's TCP port */
int server_port;
int replica_server_port;
/* server's unix path, or "", NULL if unused */
const char* server_path;
const char* replica_server_path;
/* server's AUTH password, or "", NULL if unused */
const char* server_password;
const char* replica_server_password;
/* timeout for commands */
struct timeval command_timeout;
struct timeval replica_command_timeout;
/* timeout for connection setup */
struct timeval connect_timeout;
struct timeval replica_connect_timeout;
/* the redis logical database to use */
int logical_db;
int replica_logical_db;
/* if the SETEX command is supported */
int setex_available;
};

static redisReply* redis_command(struct module_env*, struct cachedb_env*,
const char*, const uint8_t*, size_t);
const char*, const uint8_t*, size_t, int);

static void
moddata_clean(struct redis_moddata** moddata) {
Expand All @@ -79,21 +97,30 @@ moddata_clean(struct redis_moddata** moddata) {
}
free((*moddata)->ctxs);
}
if((*moddata)->replica_ctxs) {
int i;
for(i = 0; i < (*moddata)->numctxs; i++) {
if((*moddata)->replica_ctxs[i])
redisFree((*moddata)->replica_ctxs[i]);
}
free((*moddata)->replica_ctxs);
}
free(*moddata);
*moddata = NULL;
}

static redisContext*
redis_connect(const struct redis_moddata* moddata)
redis_connect(const char* host, int port, const char* path,
const char* password, int logical_db,
const struct timeval connect_timeout,
const struct timeval command_timeout)
{
redisContext* ctx;

if(moddata->server_path && moddata->server_path[0]!=0) {
ctx = redisConnectUnixWithTimeout(moddata->server_path,
moddata->connect_timeout);
if(path && path[0]!=0) {
ctx = redisConnectUnixWithTimeout(path, connect_timeout);
} else {
ctx = redisConnectWithTimeout(moddata->server_host,
moddata->server_port, moddata->connect_timeout);
ctx = redisConnectWithTimeout(host, port, connect_timeout);
}
if(!ctx || ctx->err) {
const char *errstr = "out of memory";
Expand All @@ -102,32 +129,39 @@ redis_connect(const struct redis_moddata* moddata)
log_err("failed to connect to redis server: %s", errstr);
goto fail;
}
if(redisSetTimeout(ctx, moddata->command_timeout) != REDIS_OK) {
if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) {
log_err("failed to set redis timeout, %s", ctx->errstr);
goto fail;
}
if(moddata->server_password && moddata->server_password[0]!=0) {
if(password && password[0]!=0) {
redisReply* rep;
rep = redisCommand(ctx, "AUTH %s", moddata->server_password);
rep = redisCommand(ctx, "AUTH %s", password);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to authenticate with password");
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
if(moddata->logical_db > 0) {
if(logical_db > 0) {
redisReply* rep;
rep = redisCommand(ctx, "SELECT %d", moddata->logical_db);
rep = redisCommand(ctx, "SELECT %d", logical_db);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to set logical database (%d)",
moddata->logical_db);
logical_db);
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
verbose(VERB_OPS, "Connection to Redis established");
if(verbosity >= VERB_OPS) {
char port_str[6+1];
port_str[0] = ' ';
(void)snprintf(port_str+1, sizeof(port_str-1), "%d", port);
verbose(VERB_OPS, "Connection to Redis established (%s%s)",
path&&path[0]!=0?path:host,
path&&path[0]!=0?"":port_str);
}
return ctx;

fail:
Expand All @@ -136,6 +170,14 @@ redis_connect(const struct redis_moddata* moddata)
return NULL;
}

static void
set_timeout(struct timeval* timeout, int value, int explicit_value)
{
int v = explicit_value != 0 ? explicit_value : value;
timeout->tv_sec = v / 1000;
timeout->tv_usec = (v % 1000) * 1000;
}

static int
redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
{
Expand All @@ -150,38 +192,60 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
goto fail;
}
moddata->numctxs = env->cfg->num_threads;
moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->ctxs) {
log_err("out of memory");
goto fail;
}
/* note: server_host is a shallow reference to configured string.
* we don't have to free it in this module. */
/* note: server_host and similar string configuration options are
* shallow references to configured strings; we don't have to free them
* in this module. */
moddata->server_host = env->cfg->redis_server_host;
moddata->replica_server_host = env->cfg->redis_replica_server_host;

moddata->server_port = env->cfg->redis_server_port;
moddata->replica_server_port = env->cfg->redis_replica_server_port;

moddata->server_path = env->cfg->redis_server_path;
moddata->replica_server_path = env->cfg->redis_replica_server_path;

moddata->server_password = env->cfg->redis_server_password;
moddata->command_timeout.tv_sec = env->cfg->redis_timeout / 1000;
moddata->command_timeout.tv_usec =
(env->cfg->redis_timeout % 1000) * 1000;
moddata->connect_timeout.tv_sec = env->cfg->redis_timeout / 1000;
moddata->connect_timeout.tv_usec =
(env->cfg->redis_timeout % 1000) * 1000;
if(env->cfg->redis_command_timeout != 0) {
moddata->command_timeout.tv_sec =
env->cfg->redis_command_timeout / 1000;
moddata->command_timeout.tv_usec =
(env->cfg->redis_command_timeout % 1000) * 1000;
moddata->replica_server_password = env->cfg->redis_replica_server_password;

set_timeout(&moddata->command_timeout,
env->cfg->redis_timeout,
env->cfg->redis_command_timeout);
set_timeout(&moddata->replica_command_timeout,
env->cfg->redis_replica_timeout,
env->cfg->redis_replica_command_timeout);
set_timeout(&moddata->connect_timeout,
env->cfg->redis_timeout,
env->cfg->redis_connect_timeout);
set_timeout(&moddata->replica_connect_timeout,
env->cfg->redis_replica_timeout,
env->cfg->redis_replica_connect_timeout);

moddata->logical_db = env->cfg->redis_logical_db;
moddata->replica_logical_db = env->cfg->redis_replica_logical_db;

moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->ctxs) {
log_err("out of memory");
goto fail;
}
if(env->cfg->redis_connect_timeout != 0) {
moddata->connect_timeout.tv_sec =
env->cfg->redis_connect_timeout / 1000;
moddata->connect_timeout.tv_usec =
(env->cfg->redis_connect_timeout % 1000) * 1000;
if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
|| (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
/* There is a replica configured, allocate ctxs */
moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->replica_ctxs) {
log_err("out of memory");
goto fail;
}
}
moddata->logical_db = env->cfg->redis_logical_db;
for(i = 0; i < moddata->numctxs; i++) {
redisContext* ctx = redis_connect(moddata);
redisContext* ctx = redis_connect(
moddata->server_host,
moddata->server_port,
moddata->server_path,
moddata->server_password,
moddata->logical_db,
moddata->connect_timeout,
moddata->command_timeout);
if(!ctx) {
log_err("redis_init: failed to init redis "
"(for thread %d)", i);
Expand All @@ -190,14 +254,33 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
}
moddata->ctxs[i] = ctx;
}
if(moddata->replica_ctxs) {
for(i = 0; i < moddata->numctxs; i++) {
redisContext* ctx = redis_connect(
moddata->replica_server_host,
moddata->replica_server_port,
moddata->replica_server_path,
moddata->replica_server_password,
moddata->replica_logical_db,
moddata->replica_connect_timeout,
moddata->replica_command_timeout);
if(!ctx) {
log_err("redis_init: failed to init redis "
"replica (for thread %d)", i);
/* And continue, the context can be established
* later, just like after a disconnect. */
}
moddata->replica_ctxs[i] = ctx;
}
}
cachedb_env->backend_data = moddata;
if(env->cfg->redis_expire_records &&
moddata->ctxs[env->alloc->thread_num] != NULL) {
redisReply* rep = NULL;
int redis_reply_type = 0;
/** check if setex command is supported */
rep = redis_command(env, cachedb_env,
"SETEX __UNBOUND_REDIS_CHECK__ 1 none", NULL, 0);
"SETEX __UNBOUND_REDIS_CHECK__ 1 none", NULL, 0, 1);
if(!rep) {
/** init failed, no response from redis server*/
goto setex_fail;
Expand Down Expand Up @@ -250,9 +333,9 @@ redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
*/
static redisReply*
redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
const char* command, const uint8_t* data, size_t data_len)
const char* command, const uint8_t* data, size_t data_len, int write)
{
redisContext* ctx;
redisContext* ctx, **ctx_selector;
redisReply* rep;
struct redis_moddata* d = (struct redis_moddata*)
cachedb_env->backend_data;
Expand All @@ -263,17 +346,38 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
* assumption throughout the unbound architecture, so we simply assert
* it. */
log_assert(env->alloc->thread_num < d->numctxs);
ctx = d->ctxs[env->alloc->thread_num];

ctx_selector = !write && d->replica_ctxs
?d->replica_ctxs
:d->ctxs;
ctx = ctx_selector[env->alloc->thread_num];

/* If we've not established a connection to the server or we've closed
* it on a failure, try to re-establish a new one. Failures will be
* logged in redis_connect(). */
if(!ctx) {
ctx = redis_connect(d);
d->ctxs[env->alloc->thread_num] = ctx;
if(d->replica_ctxs) {
ctx = redis_connect(
d->replica_server_host,
d->replica_server_port,
d->replica_server_path,
d->replica_server_password,
d->replica_logical_db,
d->replica_connect_timeout,
d->replica_command_timeout);
} else {
ctx = redis_connect(
d->server_host,
d->server_port,
d->server_path,
d->server_password,
d->logical_db,
d->connect_timeout,
d->command_timeout);
}
ctx_selector[env->alloc->thread_num] = ctx;
}
if(!ctx)
return NULL;
if(!ctx) return NULL;

/* Send the command and get a reply, synchronously. */
rep = (redisReply*)redisCommand(ctx, command, data, data_len);
Expand All @@ -283,7 +387,7 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
log_err("redis_command: failed to receive a reply, "
"closing connection: %s", ctx->errstr);
redisFree(ctx);
d->ctxs[env->alloc->thread_num] = NULL;
ctx_selector[env->alloc->thread_num] = NULL;
return NULL;
}

Expand Down Expand Up @@ -313,7 +417,7 @@ redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
return 0;
}

rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0);
rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
if(!rep)
return 0;
switch(rep->type) {
Expand Down Expand Up @@ -381,7 +485,7 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
return;
}

rep = redis_command(env, cachedb_env, cmdbuf, data, data_len);
rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
if(rep) {
verbose(VERB_ALGO, "redis_store set completed");
if(rep->type != REDIS_REPLY_STATUS &&
Expand Down
21 changes: 19 additions & 2 deletions doc/example.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -1305,9 +1305,9 @@ remote-control:
# # redis server's TCP port
# redis-server-port: 6379
# # if the server uses a unix socket, set its path, or "" when not used.
# # redis-server-path: "/var/lib/redis/redis-server.sock"
# redis-server-path: "/var/lib/redis/redis-server.sock"
# # if the server uses an AUTH password, specify here, or "" when not used.
# # redis-server-password: ""
# redis-server-password: ""
# # timeout (in ms) for communication with the redis server
# redis-timeout: 100
# # timeout (in ms) for commands, if 0, uses redis-timeout.
Expand All @@ -1318,6 +1318,23 @@ remote-control:
# redis-expire-records: no
# # redis logical database to use, 0 is the default database.
# redis-logical-db: 0
# # redis replica server's IP address or host name
# redis-replica-server-host: 127.0.0.1
# # redis replica server's TCP port
# redis-replica-server-port: 6379
# # if the replica server uses a unix socket, set its path, or "" when not used.
# redis-replica-server-path: "/var/lib/redis/redis-server.sock"
# # if the replica server uses an AUTH password, specify here, or "" when not used.
# redis-replica-server-password: ""
# # timeout (in ms) for communication with the redis replica server
# redis-replica-timeout: 100
# # timeout (in ms) for redis replica commands, if 0, uses redis-replica-timeout.
# redis-command-timeout: 0
# # timeout (in ms) for redis replica connection set up, if 0, uses redis-replica-timeout.
# redis-connect-timeout: 0
# # set timeout on redis records based on DNS response TTL
# # redis logical database to use for the replica server, 0 is the default database.
# redis-replica-logical-db: 0

# IPSet
# Add specify domain into set via ipset.
Expand Down
Loading
Loading