-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RabbitMQ-Event - Add heartbeat option and create logic to reconnect to rabbitmq #2267
RabbitMQ-Event - Add heartbeat option and create logic to reconnect to rabbitmq #2267
Conversation
Thanks for your contribution, @david-goncalves! Please make sure you sign our CLA, as it's a required step before we can merge this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Just did a superficial review for now, so I added some notes. Any chance you can do something similar for the RMQ transport as well?
static gboolean ssl_verify_peer = FALSE; | ||
static gboolean ssl_verify_hostname = FALSE; | ||
static const char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ; | ||
static uint16_t heartbeat = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't there be a reasonable default? I assume 0
disables it, meaning by default it's not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the heartbeat equals to 0 the heartbeat monitoring process is disabled. Was decided to default to 0 to maintain the behavior previously existent on the event handler.
events/janus_rabbitmqevh.c
Outdated
@@ -267,6 +277,59 @@ int janus_rabbitmqevh_init(const char *config_path) { | |||
} | |||
|
|||
/* Connect */ | |||
int result=janus_rabbitmqevh_connect(); | |||
if(result < 0){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of code style things to fix:
- please insert spaces where the equal sign is;
- please insert a space between the closing curve bracket and the opening curly bracket too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style updated
events/janus_rabbitmqevh.c
Outdated
g_atomic_int_set(&initialized, 1); | ||
|
||
GError *error = NULL; | ||
handler_thread = g_thread_try_new("janus rabbitmqevh handler", janus_rabbitmqevh_handler, NULL, &error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread names are 16 characters maximum, please keep it shorter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style updated
events/janus_rabbitmqevh.c
Outdated
goto error; | ||
} | ||
if(heartbeat > 0){ | ||
in_thread = g_thread_try_new("janus rabbitmqevh heartbeat handler", janus_rabbitmqevh_heartbeat_handler, NULL, &error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above on thread name length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style updated
events/janus_rabbitmqevh.c
Outdated
g_error_free(error); | ||
goto error; | ||
} | ||
if(heartbeat > 0){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See code style nit above (space between brackets).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style updated
events/janus_rabbitmqevh.c
Outdated
/* Thread to handle heartbeats */ | ||
static void *janus_rabbitmqevh_heartbeat_handler(void *data) { | ||
JANUS_LOG(LOG_VERB, "Monitoring RabbitMQ HeartBeat\n"); | ||
struct timeval timeout; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation broken here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style updated
events/janus_rabbitmqevh.c
Outdated
if(result < 0){ | ||
g_usleep(5000000); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove empty line here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the same note on space between brackets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style updated
JANUS_LOG(LOG_VERB, "Monitoring RabbitMQ HeartBeat\n"); | ||
struct timeval timeout; | ||
timeout.tv_sec = 0; | ||
timeout.tv_usec = 20000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be set to whatever the heartbeat was configured to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I thought that timeout.tv values should be shorter to identify connection issues with RabbitMQ immediately. However, after more tests, were noticed that even with larger timeout.tv values defined when an error occurs the module tries to reconnect.
Since the heartbeat frames are sent about half of the time defined on the heartbeat I decided to change the value to a bit more than half of the heartbeat.
I can try to update the RMQ transport to have the heartbeat. I never used it, so I would like to test it as is first. |
Thanks for the quick fixes! I think this is good, so I'll merge 👍 |
As discussed on post RabbitMQ Event Handler - socket error occurred , we notice that, if the TCP connection with RabbitMQ was closed because connection idle for too long or restart of the RabbitMQ service. The socket wasn't established again.
To address this issue, we implemented the heartbeat option on rabbitmqevh configs to monitoring the connection and try to reestablish it.