Skip to content

Commit

Permalink
RabbitMQ-Event - Add heartbeat option and create logic to reconnect t…
Browse files Browse the repository at this point in the history
…o rabbitmq (#2267)
  • Loading branch information
david-goncalves authored Jul 13, 2020
1 parent c2f25a2 commit 08ce704
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 70 deletions.
1 change: 1 addition & 0 deletions conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ general: {
#exchange = "janus-exchange"
route_key = "janus-events" # Name of the queue for event messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection has unreachable.

#ssl_enable = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
Expand Down
206 changes: 136 additions & 70 deletions events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ janus_eventhandler *create(void) {
/* Useful stuff */
static volatile gint initialized = 0, stopping = 0;
static GThread *handler_thread;
static void *janus_rabbitmqevh_handler(void *data);
static GThread *in_thread;
static void *jns_rmqevh_hdlr(void *data);
static void *jns_rmqevh_hrtbt(void *data);
int janus_rabbitmqevh_connect(void);

/* Queue of events to handle */
static GAsyncQueue *events = NULL;
Expand All @@ -99,6 +102,17 @@ static amqp_channel_t rmq_channel = 0;
static amqp_bytes_t rmq_exchange;
static amqp_bytes_t rmq_route_key;

static char *rmqhost = NULL;
static const char *vhost = NULL, *username = NULL, *password = NULL;
static const char *ssl_cacert_file = NULL;
static const char *ssl_cert_file = NULL;
static const char *ssl_key_file = NULL;
static gboolean ssl_enable = FALSE;
static gboolean ssl_verify_peer = FALSE;
static gboolean ssl_verify_hostname = FALSE;
static const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ;
static uint16_t heartbeat = 0;
static uint16_t rmqport = AMQP_PROTOCOL_PORT;

/* Parameter validation (for tweaking via Admin API) */
static struct janus_json_parameter request_parameters[] = {
Expand Down Expand Up @@ -141,16 +155,6 @@ int janus_rabbitmqevh_init(const char *config_path) {
janus_config_print(config);
janus_config_category *config_general = janus_config_get_create(config, NULL, janus_config_type_category, "general");

char *rmqhost = NULL;
const char *vhost = NULL, *username = NULL, *password = NULL;
const char *ssl_cacert_file = NULL;
const char *ssl_cert_file = NULL;
const char *ssl_key_file = NULL;
gboolean ssl_enable = FALSE;
gboolean ssl_verify_peer = FALSE;
gboolean ssl_verify_hostname = FALSE;
const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ;

/* Setup the event handler, if required */
janus_config_item *item = janus_config_get(config, config_general, janus_config_type_item, "enabled");
if(!item || !item->value || !janus_is_true(item->value)) {
Expand Down Expand Up @@ -192,7 +196,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
rmqhost = g_strdup(item->value);
else
rmqhost = g_strdup("localhost");
uint16_t rmqport = AMQP_PROTOCOL_PORT;

item = janus_config_get(config, config_general, janus_config_type_item, "port");
if(item && item->value && janus_string_to_uint16(item->value, &rmqport) < 0) {
JANUS_LOG(LOG_ERR, "Invalid port (%s), falling back to default\n", item->value);
Expand All @@ -216,6 +220,12 @@ int janus_rabbitmqevh_init(const char *config_path) {
else
password = g_strdup("guest");

item = janus_config_get(config, config_general, janus_config_type_item, "heartbeat");
if(item && item->value && janus_string_to_uint16(item->value, &heartbeat) < 0) {
JANUS_LOG(LOG_ERR, "Invalid heartbeat timeout (%s), falling back to default\n", item->value);
heartbeat = 0;
}

/* SSL config*/
item = janus_config_get(config, config_general, janus_config_type_item, "ssl_enable");
if(!item || !item->value || !janus_is_true(item->value)) {
Expand Down Expand Up @@ -267,6 +277,59 @@ int janus_rabbitmqevh_init(const char *config_path) {
}

/* Connect */
int result = janus_rabbitmqevh_connect();
if(result < 0) {
goto error;
}

/* Initialize the events queue */
events = g_async_queue_new_full((GDestroyNotify) janus_rabbitmqevh_event_free);
g_atomic_int_set(&initialized, 1);

GError *error = NULL;
handler_thread = g_thread_try_new("janus rabbitmqevh handler", jns_rmqevh_hdlr, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
goto error;
}
if(heartbeat > 0) {
in_thread = g_thread_try_new("janus rabbitmqevh heartbeat handler", jns_rmqevh_hrtbt, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler heartbeat thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
goto error;
}
}

/* Done */
JANUS_LOG(LOG_INFO, "Setup of RabbitMQ event handler completed\n");
goto done;

error:
/* If we got here, something went wrong */
success = FALSE;
if(route_key)
g_free((char *)route_key);
if(exchange)
g_free((char *)exchange);
/* Fall through */
done:
if(config)
janus_config_destroy(config);

if(!success) {
return -1;
}
JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RABBITMQEVH_NAME);
return 0;
}

int janus_rabbitmqevh_connect(void) {
rmq_conn = amqp_new_connection();
amqp_socket_t *socket = NULL;
int status = AMQP_STATUS_OK;
Expand All @@ -275,7 +338,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
socket = amqp_ssl_socket_new(rmq_conn);
if(socket == NULL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
goto error;
return -1;
}
if(ssl_verify_peer) {
amqp_ssl_socket_set_verify_peer(socket, 1);
Expand All @@ -291,43 +354,44 @@ int janus_rabbitmqevh_init(const char *config_path) {
status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
goto error;
return -1;
}
}
if(ssl_cert_file && ssl_key_file) {
amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
goto error;
return -1;
}
}
} else {
socket = amqp_tcp_socket_new(rmq_conn);
if(socket == NULL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
goto error;
return -1;
}
}

JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Connecting to RabbitMQ server...\n");
status = amqp_socket_open(socket, rmqhost, rmqport);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
goto error;
return -1;
}
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Logging in...\n");
amqp_rpc_reply_t result = amqp_login(rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
amqp_rpc_reply_t result = amqp_login(rmq_conn, vhost, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, username, password);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
return -1;
}

rmq_channel = 1;
JANUS_LOG(LOG_VERB, "Opening channel...\n");
amqp_channel_open(rmq_conn, rmq_channel);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
return -1;
}
rmq_exchange = amqp_empty_bytes;
if(exchange != NULL) {
Expand All @@ -337,7 +401,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
return -1;
}
}
JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
Expand All @@ -346,56 +410,8 @@ int janus_rabbitmqevh_init(const char *config_path) {
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}

/* Initialize the events queue */
events = g_async_queue_new_full((GDestroyNotify) janus_rabbitmqevh_event_free);
g_atomic_int_set(&initialized, 1);

GError *error = NULL;
handler_thread = g_thread_try_new("janus rabbitmqevh handler", janus_rabbitmqevh_handler, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n",
error->code, error->message ? error->message : "??");
g_error_free(error);
goto error;
}

/* Done */
JANUS_LOG(LOG_INFO, "Setup of RabbitMQ event handler completed\n");
goto done;

error:
/* If we got here, something went wrong */
success = FALSE;
if(route_key)
g_free((char *)route_key);
if(exchange)
g_free((char *)exchange);
/* Fall through */
done:
if(rmqhost)
g_free((char *)rmqhost);
if(vhost)
g_free((char *)vhost);
if(username)
g_free((char *)username);
if(password)
g_free((char *)password);
if(ssl_cacert_file)
g_free((char *)ssl_cacert_file);
if(ssl_cert_file)
g_free((char *)ssl_cert_file);
if(ssl_key_file)
g_free((char *)ssl_key_file);
if(config)
janus_config_destroy(config);
if(!success) {
return -1;
}
JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RABBITMQEVH_NAME);
return 0;
}

Expand All @@ -422,6 +438,20 @@ void janus_rabbitmqevh_destroy(void) {
g_free((char *)rmq_exchange.bytes);
if(rmq_route_key.bytes)
g_free((char *)rmq_route_key.bytes);
if(rmqhost)
g_free((char *)rmqhost);
if(vhost)
g_free((char *)vhost);
if(username)
g_free((char *)username);
if(password)
g_free((char *)password);
if(ssl_cacert_file)
g_free((char *)ssl_cacert_file);
if(ssl_cert_file)
g_free((char *)ssl_cert_file);
if(ssl_key_file)
g_free((char *)ssl_key_file);

g_atomic_int_set(&initialized, 0);
g_atomic_int_set(&stopping, 0);
Expand Down Expand Up @@ -523,7 +553,7 @@ json_t *janus_rabbitmqevh_handle_request(json_t *request) {
}

/* Thread to handle incoming events */
static void *janus_rabbitmqevh_handler(void *data) {
static void *jns_rmqevh_hdlr(void *data) {
JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n");
json_t *event = NULL, *output = NULL;
char *event_text = NULL;
Expand Down Expand Up @@ -586,3 +616,39 @@ static void *janus_rabbitmqevh_handler(void *data) {
JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n");
return NULL;
}


/* Thread to handle heartbeats */
static void *jns_rmqevh_hrtbt(void *data) {
JANUS_LOG(LOG_VERB, "Monitoring RabbitMQ HeartBeat\n");
struct timeval timeout;
timeout.tv_sec = heartbeat/2;
timeout.tv_usec = 20000;
amqp_frame_t frame;

while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
amqp_maybe_release_buffers(rmq_conn);
/* Wait for a frame */
int res = amqp_simple_wait_frame_noblock(rmq_conn, &frame, &timeout);
if(res != AMQP_STATUS_OK) {
/* No data */
if(res == AMQP_STATUS_TIMEOUT || res == AMQP_STATUS_SSL_ERROR)
continue;
JANUS_LOG(LOG_VERB, "Error on amqp_simple_wait_frame_noblock: %d (%s)\n", res, amqp_error_string2(res));

if(rmq_conn && rmq_channel) {
amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(rmq_conn);
}
JANUS_LOG(LOG_VERB, "Trying to reconnect with RabbitMQ Server\n");
int result = janus_rabbitmqevh_connect();
if(result < 0) {
g_usleep(5000000);
}
}
}

JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler HeartBeat thread\n");
return NULL;
}

0 comments on commit 08ce704

Please sign in to comment.