Skip to content

Commit

Permalink
rtpengine: provide failover on errors returned from engine
Browse files Browse the repository at this point in the history
Feature sponsored by Five9 https://www.five9.com
  • Loading branch information
razvancrainea committed Sep 2, 2024
1 parent e2b8eef commit 3d0959a
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 26 deletions.
45 changes: 45 additions & 0 deletions modules/rtpengine/doc/rtpengine_admin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@
If the set was selected using setid_avp, the avp needs to be
set only once before rtpengine_offer() or rtpengine_manage() call.
</para>
<para id="param_failover" xreflabel="Failover">
The module is able to failover to a new node within a set, if a chosen
one has communication issues. Moreover, it will also failover if the node
returns one of the following errors:
<itemizedlist>
<listitem><para>
Parallel session limit reached
</para></listitem>
<listitem><para>
Ran out of ports
</para></listitem>
</itemizedlist>
You can use the <xref linkend="param_extra_failover_error"/> parameter
to extend the above list.
</para>
</section>

<section id="dependencies" xreflabel="Dependencies">
Expand Down Expand Up @@ -805,6 +820,36 @@ rtpengine_offer("... codec-mask-PCMA codec-strip-opus transcode-opus ...");
</programlisting>
</example>

<section id="param_extra_failover_error" xreflabel="extra_failover_error">
<title><varname>extra_failover_error</varname> (string)</title>
<para>
Contains a (XDB) regular expression that can be
used to match an error received from a RTPEngine node. If matched
the module tries to use a new node to handle the affected command.
</para>
<para>
This parameter can be used to extend the list
(see <xref linkend="para_failover"/> of errors the module
implicitely fails over.
</para>
<para>
<emphasis>Note</emphasis> each declaration will define a single
expression/matching rule. If you want to define multiple rules, you
need to define the parameter multiple times.
</para>
<para>
Default value is empty, no extra errors are being used.
</para>
<example>
<title>Set <varname>extra_failover_error</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("rtpengine", "extra_failover_error", "Parallel session limit reached")
...
</programlisting>
</example>
</section>

</section>
<section id="func_rtpengine_answer" xreflabel="rtpengine_answer()">
<title>
Expand Down
188 changes: 162 additions & 26 deletions modules/rtpengine/rtpengine.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <regex.h>

#include "../../str.h"
#include "../../flags.h"
Expand Down Expand Up @@ -221,6 +222,11 @@ typedef struct rtpe_set_link {
} v;
} rtpe_set_link_t;

struct rtpe_ignore_node {
struct rtpe_node *node;
struct rtpe_ignore_node *next;
};

static const char *command_strings[] = {
[OP_OFFER] = "offer",
[OP_ANSWER] = "answer",
Expand Down Expand Up @@ -321,7 +327,7 @@ static int fixup_set_id(void ** param);
static int fixup_free_set_id(void ** param);
static int set_rtpengine_set_f(struct sip_msg * msg, rtpe_set_link_t *set_param);
static struct rtpe_set * select_rtpe_set(int id_set);
static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *);
static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *, struct rtpe_ignore_node *);
static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe_url);
static void free_rtpe_set(int);
static void free_rtpe_node(struct rtpe_set *, str *);
Expand All @@ -335,6 +341,7 @@ static int update_rtpengines(int);
static int _add_rtpengine_from_database(void);
static int rtpengine_set_store(modparam_t type, void * val);
static int rtpengine_set_notify(modparam_t type, void * val);
static int rtpengine_extra_failover(modparam_t type, void * val);
static int rtpengine_add_rtpengine_set( char * rtp_proxies, int set_id);

static int mod_init(void);
Expand Down Expand Up @@ -699,6 +706,8 @@ static const param_export_t params[] = {
{"db_table", STR_PARAM, &db_table.s },
{"socket_column", STR_PARAM, &db_rtpe_sock_col.s },
{"set_column", STR_PARAM, &db_rtpe_set_col.s },
{"extra_failover_error", STR_PARAM|USE_FUNC_PARAM,
(void *)rtpengine_extra_failover},
{"notification_sock", STR_PARAM|USE_FUNC_PARAM,
(void *)rtpengine_set_notify},
{"ping_enabled", INT_PARAM, &rtpengine_ping_enabled },
Expand Down Expand Up @@ -770,6 +779,11 @@ struct module_exports exports = {
0 /* reload confirm function */
};

static char *rtpe_default_failover_errors[] = {
"Parallel session limit reached",
"Ran out of ports",
};

static void rtpe_stats_free(struct rtpe_stats *stats)
{
if (stats->json.s)
Expand Down Expand Up @@ -968,7 +982,6 @@ static int rtpengine_set_notify(modparam_t type, void * val)
return 0;
}


static int add_rtpengine_socks(struct rtpe_set * rtpe_list,
char * rtpengine){
/* Make rtp proxies list. */
Expand Down Expand Up @@ -2469,6 +2482,97 @@ static struct rtpe_node *get_rtpe_node(str *node, struct rtpe_set *set)
return NULL;
}

static int rtpe_add_ignore_node(struct rtpe_ignore_node **list, struct rtpe_node *node)
{
struct rtpe_ignore_node *new = pkg_malloc(sizeof *new);
if (!new)
return 0;

new->node = node;
new->next = *list;
*list = new;
LM_INFO("temporary ignoring %.*s node for this attempt\n", node->rn_url.len, node->rn_url.s);
return 1;
}

static int rtpe_is_ignore_node(struct rtpe_ignore_node *list, struct rtpe_node *node)
{
struct rtpe_ignore_node *it;
for (it = list; it; it = it->next)
if (it->node == node)
return 1;
return 0;
}

static void rtpe_free_ignore_node(struct rtpe_ignore_node *list)
{
struct rtpe_ignore_node *next, *it;
for (it = list; it; it = next) {
next = it->next;
pkg_free(it);
}
}

OSIPS_LIST_HEAD(rtpe_failover_errors);
struct rtpe_failover_regex {
regex_t re;
struct list_head list;
};

static int rtpengine_extra_failover(modparam_t type, void * val)
{
char *p;
struct rtpe_failover_regex *re;

p = (char* )val;

if(p==0 || *p=='\0')
return 0;
re = pkg_malloc(sizeof(*re));
if (!re) {
LM_ERR("no more memory for regular expression!\n");
return -1;
}
memset(re, 0, sizeof *re);
if (regcomp(&re->re, p, (REG_EXTENDED|REG_ICASE|REG_NEWLINE))) {
LM_ERR("could not compile regex [%s]\n", p);
pkg_free(re);
return -1;
}
list_add(&re->list, &rtpe_failover_errors);
return 0;
}

static int rtpe_check_ignore_node(str *error)
{
int ret;
str error_nt;
regmatch_t pmatch;
struct list_head *it;
struct rtpe_failover_regex *re;

int i, size = sizeof(rtpe_default_failover_errors)/sizeof(rtpe_default_failover_errors[0]);
for (i = 0; i < size; i++) {
if (str_casematch_nt(error, rtpe_default_failover_errors[i]))
return 1;
}
if (list_empty(&rtpe_failover_errors))
return 0;
if (pkg_nt_str_dup(&error_nt, error) < 0) {
LM_ERR("could not duplicate error!\n");
return 0;
}
ret = 1;
list_for_each(it, &rtpe_failover_errors) {
re = list_entry(it, struct rtpe_failover_regex, list);
if (regexec(&re->re, error_nt.s, 1, &pmatch, 0) == 0)
goto end;
}
ret = 0;
end:
pkg_free(error_nt.s);
return ret;
}

static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg,
enum rtpe_operation op, str *flags_str, str *body_in, pv_spec_t *spvar,
Expand All @@ -2478,15 +2582,17 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
bencode_item_t *item, *resp;
str viabranch, error;
int ret, flags_exist = 0, callid_exist = 0, from_tag_exist = 0, to_tag_exist = 0;
struct rtpe_node *node;
struct rtpe_node *node, *failed_node;
char *cp, *err = NULL;
pv_value_t val;
str flags_nt = {0,0};
struct rtpe_ignore_node *ignore_list = NULL;

/*** get & init basic stuff needed ***/

memset(&ng_flags, 0, sizeof(ng_flags));
error.len = 0;
error.s = "";

if (!extra_dict) {
if (bencode_buffer_init(bencbuf)) {
Expand Down Expand Up @@ -2658,24 +2764,69 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
if (!set && (set=rtpe_ctx_set_get())==NULL )
set = *default_rtpe_set;

failed_node = NULL;

RTPE_START_READ();
do {
if (snode && snode->s) {
if ((node = get_rtpe_node(snode, set)) == NULL && op == OP_OFFER)
node = select_rtpe_node(ng_flags.call_id, set);
node = select_rtpe_node(ng_flags.call_id, set, ignore_list);
snode = NULL;
} else {
node = select_rtpe_node(ng_flags.call_id, set);
node = select_rtpe_node(ng_flags.call_id, set, ignore_list);
}
if (!node) {
err = "no available proxies";
if (!err && !error.len)
err = "no available proxies";
RTPE_STOP_READ();
goto error;
}

cp = send_rtpe_command(node, ng_flags.dict, &ret);
if (cp) {
/*** process reply ***/
resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY);
if (resp) {
if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
error.s = "";
error.len = 0;
} else {
LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
}
if (rtpe_check_ignore_node(&error)) {
cp = NULL;
if (!rtpe_add_ignore_node(&ignore_list, node))
LM_ERR("could not add node to ignore list!\n");
else
continue; /* one more loop */
} else {
break; /* break the loop and exit with error */
}
}
} else {
LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
err = "failed to decode bencoded reply";
cp = NULL;
}
}
if (!cp) {
if (node == failed_node) {
/* this is the 2nd error this server generated at this round,
* so we should ignore it and try another one */
if (!rtpe_add_ignore_node(&ignore_list, node)) {
LM_ERR("could not add node to ignore list!\n");
goto error;
}
failed_node = NULL;
} else {
failed_node = node;
}
}
} while (cp == NULL);
RTPE_STOP_READ();
rtpe_free_ignore_node(ignore_list);
LM_DBG("proxy reply: %.*s\n", ret, cp);

/* store the value of the selected node */
Expand All @@ -2687,28 +2838,13 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
LM_ERR("setting rtpengine pvar failed\n");
}

/*** process reply ***/

resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY);
if (!resp) {
LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
err = "failed to decode bencoded reply";
goto error;
}
if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
if (!bencode_dictionary_get_str(resp, "error-reason", &error))
LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
else
LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
goto error;
}

if (flags_nt.s)
pkg_free(flags_nt.s);

return resp;

error:
rtpe_free_ignore_node(ignore_list);
if (flags_nt.s)
pkg_free(flags_nt.s);
if (err) {
Expand Down Expand Up @@ -3063,7 +3199,7 @@ static struct rtpe_set * select_rtpe_set(int id_set )
* too expensive here.
*/
static struct rtpe_node *
select_rtpe_node(str callid, struct rtpe_set *set)
select_rtpe_node(str callid, struct rtpe_set *set, struct rtpe_ignore_node *ignore_list)
{
unsigned sum, weight_sum;
struct rtpe_node* node;
Expand All @@ -3083,7 +3219,7 @@ select_rtpe_node(str callid, struct rtpe_set *set)
/* Most popular case: 1 proxy, nothing to calculate */
if (set->rtpe_node_count == 1) {
node = set->rn_first;
if (node->rn_disabled)
if (node->rn_disabled || rtpe_is_ignore_node(ignore_list, node))
return NULL;
return node;
}
Expand All @@ -3099,7 +3235,7 @@ select_rtpe_node(str callid, struct rtpe_set *set)
found = 0;
for (node=set->rn_first; node!=NULL; node=node->rn_next) {
constant_weight_sum += node->rn_weight;
if (!node->rn_disabled) {
if (!node->rn_disabled && !rtpe_is_ignore_node(ignore_list, node)) {
weight_sum += node->rn_weight;
found = 1;
}
Expand All @@ -3115,7 +3251,7 @@ select_rtpe_node(str callid, struct rtpe_set *set)
was_forced = 0;
for (node=set->rn_first; node!=NULL;) {
if (sumcut < (int)node->rn_weight) {
if (!node->rn_disabled)
if (!node->rn_disabled && !rtpe_is_ignore_node(ignore_list, node))
return node;
if (was_forced == 0) {
/* appropriate proxy is disabled : redistribute on enabled ones */
Expand Down

0 comments on commit 3d0959a

Please sign in to comment.