diff --git a/modules/rtpengine/doc/rtpengine_admin.xml b/modules/rtpengine/doc/rtpengine_admin.xml
index d60d534b69..006a5fcf03 100644
--- a/modules/rtpengine/doc/rtpengine_admin.xml
+++ b/modules/rtpengine/doc/rtpengine_admin.xml
@@ -1349,6 +1349,72 @@ rtpengine_play_dtmf("0"); # send the 0 code upstream
+
+ Exported Asyncronous Functions
+
+ rtpengine_offer([flags[, sock_pvar[, sdp_pvar[, body]]]])
+
+ The asynchronous flavor of the
+ function. It receives the same parameters, with the same meanings.
+
+
+ Example of async rtpengine_offer() usage
+
+...
+if (is_method("ACK") && has_totag() && has_body_part("application/sdp")) {
+ async(rtpengine_offer(), resume_invite);
+}
+...
+route[resume_invite] {
+ t_relay();
+}
+...
+
+
+
+
+ rtpengine_answer([flags[, sock_pvar[, sdp_pvar[, body]]]])
+
+ The asynchronous flavor of the
+ function. It receives the same parameters, with the same meanings.
+
+
+ Example of async rtpengine_answer() usage
+
+...
+if (is_method("ACK") && has_body_part("application/sdp")) {
+ # late negotiation
+ async(rtpengine_answer(), resume_ack);
+}
+...
+route[resume_ack] {
+ t_relay();
+}
+...
+
+
+
+
+
+ rtpengine_delete([flags[, sock_var]])
+
+
+ The asynchronous flavor of the
+ function. It receives the same parameters, with the same meanings.
+
+
+ Example of async rtpengine_delete() usage
+
+...
+if (is_method("BYE")) {
+ launch(rtpengine_delete());
+}
+...
+
+
+
+
+
Exported Pseudo-Variables
diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c
index 99b094adbf..23f9432a1d 100644
--- a/modules/rtpengine/rtpengine.c
+++ b/modules/rtpengine/rtpengine.c
@@ -228,6 +228,15 @@ struct rtpe_ignore_node {
struct rtpe_ignore_node *next;
};
+typedef struct rtpe_async_param_ {
+ bencode_buffer_t *bencbuf;
+ enum rtpe_operation op;
+ struct rtpe_node *node;
+ char* cookie;
+ pv_spec_t *spvar;
+ pv_spec_t *bpvar;
+} rtpe_async_param;
+
static const char *command_strings[] = {
[OP_OFFER] = "offer",
[OP_ANSWER] = "answer",
@@ -273,6 +282,12 @@ static const str stat_maps[] = {
[STAT_PACKETLOSS_MAX_AT] = str_init("packetloss-max-at")
};
+static int rtpengine_offer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar,
+ pv_spec_t *bpvar, str *body);
+static int rtpengine_answer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar,
+ pv_spec_t *bpvar, str *body);
+static int rtpengine_delete_af(struct sip_msg* msg, async_ctx *ctx, str *flags, pv_spec_t *spvar);
+
static char *gencookie();
static int rtpe_test(struct rtpe_node*, int, int);
static int stop_recording_f(struct sip_msg* msg, str *flags, pv_spec_t *spvar);
@@ -436,6 +451,23 @@ static struct rtp_relay_hooks rtp_relay;
static pv_elem_t *extra_id_pv = NULL;
+static acmd_export_t acmds[] = {
+ {"rtpengine_offer", (acmd_function)rtpengine_offer_af, {
+ {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}},
+ {"rtpengine_answer", (acmd_function)rtpengine_answer_af, {
+ {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}},
+ {"rtpengine_delete", (acmd_function)rtpengine_delete_af, {
+ {CMD_PARAM_STR | CMD_PARAM_OPT, 0, 0},
+ {CMD_PARAM_VAR | CMD_PARAM_OPT, 0, 0}, {0,0,0}}},
+ {0, 0, {{0, 0, 0}}}
+};
+
static const cmd_export_t cmds[] = {
{"rtpengine_use_set", (cmd_function)set_rtpengine_set_f, {
{CMD_PARAM_INT, fixup_set_id, fixup_free_set_id}, {0,0,0}},
@@ -773,7 +805,7 @@ struct module_exports exports = {
0, /* load function */
&deps, /* OpenSIPS module dependencies */
cmds,
- 0,
+ acmds,
params,
0, /* exported statistics */
mi_cmds, /* exported MI functions */
@@ -1814,6 +1846,10 @@ static inline int rtpengine_connect_node(struct rtpe_node *pnode)
freeaddrinfo(res);
return 0;
}
+
+ pnode->ai_addrlen = res->ai_addrlen;
+ memcpy(&(pnode->ai_addr), res->ai_addr, res->ai_addrlen);
+
freeaddrinfo(res);
return 1;
}
@@ -2589,129 +2625,117 @@ static int rtpe_check_ignore_node(str *error)
return ret;
}
-static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg,
- enum rtpe_operation op, str *flags_str, str *body_in, pv_spec_t *spvar,
- struct rtpe_set *set, str *snode, bencode_item_t *extra_dict)
+static int rtpe_function_call_prepare(bencode_buffer_t *bencbuf, struct sip_msg *msg, enum rtpe_operation op,
+ struct ng_flags_parse *ng_flags, str *flags_str, str *body_in, bencode_item_t *extra_dict, char **err)
{
- struct ng_flags_parse ng_flags;
- bencode_item_t *item, *resp;
- str viabranch, error;
+ bencode_item_t *item;
+ str viabranch;
int ret, flags_exist = 0, callid_exist = 0, from_tag_exist = 0, to_tag_exist = 0;
- struct rtpe_node *node, *failed_node;
- char *cp, *err = NULL;
- pv_value_t val;
str flags_nt = {0,0};
- struct rtpe_ignore_node *ignore_list = NULL;
-
- /*** get & init basic stuff needed ***/
- memset(&ng_flags, 0, sizeof(ng_flags));
- error.len = 0;
- error.s = "";
+ if (bencode_buffer_init(bencbuf)) {
+ LM_ERR("could not initialize bencode_buffer_t\n");
+ return -1;
+ }
if (!extra_dict) {
- if (bencode_buffer_init(bencbuf)) {
- err = "could not initialize bencode_buffer_t";
- bencbuf = NULL;
- goto error;
- }
- ng_flags.dict = bencode_dictionary(bencbuf);
+ ng_flags->dict = bencode_dictionary(bencbuf);
} else {
- ng_flags.dict = extra_dict;
+ ng_flags->dict = extra_dict;
- ng_flags.flags = bencode_dictionary_get(ng_flags.dict, "flags");
- if (ng_flags.flags)
+ ng_flags->flags = bencode_dictionary_get(ng_flags->dict, "flags");
+ if (ng_flags->flags)
flags_exist = 1;
- bencode_dictionary_get_str(ng_flags.dict, "call-id", &ng_flags.call_id);
- bencode_dictionary_get_str(ng_flags.dict, "from-tag", &ng_flags.from_tag);
- bencode_dictionary_get_str(ng_flags.dict, "to-tag", &ng_flags.to_tag);
- if (ng_flags.call_id.len)
+ bencode_dictionary_get_str(ng_flags->dict, "call-id", &ng_flags->call_id);
+ bencode_dictionary_get_str(ng_flags->dict, "from-tag", &ng_flags->from_tag);
+ bencode_dictionary_get_str(ng_flags->dict, "to-tag", &ng_flags->to_tag);
+ if (ng_flags->call_id.len)
callid_exist = 1;
- if (ng_flags.from_tag.len)
+ if (ng_flags->from_tag.len)
from_tag_exist = 1;
- if (ng_flags.to_tag.len)
+ if (ng_flags->to_tag.len)
to_tag_exist = 1;
}
if (!flags_exist)
- ng_flags.flags = bencode_list(bencbuf);
+ ng_flags->flags = bencode_list(bencbuf);
if (op == OP_OFFER || op == OP_ANSWER) {
- ng_flags.direction = bencode_list(bencbuf);
- ng_flags.replace = bencode_list(bencbuf);
- ng_flags.rtcp_mux = bencode_list(bencbuf);
+ ng_flags->direction = bencode_list(bencbuf);
+ ng_flags->replace = bencode_list(bencbuf);
+ ng_flags->rtcp_mux = bencode_list(bencbuf);
- bencode_dictionary_add_str(ng_flags.dict, "sdp", body_in);
+ bencode_dictionary_add_str(ng_flags->dict, "sdp", body_in);
} else if (op == OP_SUBSCRIBE_ANSWER) {
- bencode_dictionary_add_str(ng_flags.dict, "sdp", body_in);
+ bencode_dictionary_add_str(ng_flags->dict, "sdp", body_in);
}
/*** parse flags & build dictionary ***/
- ng_flags.to = (op == OP_DELETE) ? 0 : 1;
+ ng_flags->to = (op == OP_DELETE) ? 0 : 1;
if (flags_str && pkg_nt_str_dup(&flags_nt, flags_str) < 0) {
- err = "No more pkg mem";
+ *err = "No more pkg mem";
goto error;
}
- if (parse_flags(&ng_flags, msg, &op, flags_nt.s)) {
- err = "could not parse flags";
+ if (parse_flags(ng_flags, msg, &op, flags_nt.s)) {
+ *err = "could not parse flags";
goto error;
}
- if (!ng_flags.call_id.len &&
- (get_callid(msg, &ng_flags.call_id) == -1 || ng_flags.call_id.len == 0)) {
- err = "can't get Call-Id field";
+ if (!ng_flags->call_id.len &&
+ (get_callid(msg, &ng_flags->call_id) == -1 || ng_flags->call_id.len == 0)) {
+ *err = "can't get Call-Id field";
goto error;
}
- if (!ng_flags.to_tag.len &&
- get_to_tag(msg, &ng_flags.to_tag) == -1) {
- err = "can't get To tag";
+ if (!ng_flags->to_tag.len &&
+ get_to_tag(msg, &ng_flags->to_tag) == -1) {
+ *err = "can't get To tag";
goto error;
}
- if (!ng_flags.from_tag.len &&
- (get_from_tag(msg, &ng_flags.from_tag) == -1 || ng_flags.from_tag.len == 0)) {
- err = "can't get From tag";
+ if (!ng_flags->from_tag.len &&
+ (get_from_tag(msg, &ng_flags->from_tag) == -1 || ng_flags->from_tag.len == 0)) {
+ *err = "can't get From tag";
goto error;
}
/* only add those if any flags were given at all */
- if (ng_flags.direction && ng_flags.direction->child)
- bencode_dictionary_add(ng_flags.dict, "direction", ng_flags.direction);
- if (!flags_exist && ng_flags.flags && ng_flags.flags->child)
- bencode_dictionary_add(ng_flags.dict, "flags", ng_flags.flags);
- if (ng_flags.replace && ng_flags.replace->child)
- bencode_dictionary_add(ng_flags.dict, "replace", ng_flags.replace);
- if ((ng_flags.transport & 0x100))
- bencode_dictionary_add_string(ng_flags.dict, "transport-protocol",
- transports[ng_flags.transport & 0x007]);
- if (ng_flags.rtcp_mux && ng_flags.rtcp_mux->child)
- bencode_dictionary_add(ng_flags.dict, "rtcp-mux", ng_flags.rtcp_mux);
+ if (ng_flags->direction && ng_flags->direction->child)
+ bencode_dictionary_add(ng_flags->dict, "direction", ng_flags->direction);
+ if (!flags_exist && ng_flags->flags && ng_flags->flags->child)
+ bencode_dictionary_add(ng_flags->dict, "flags", ng_flags->flags);
+ if (ng_flags->replace && ng_flags->replace->child)
+ bencode_dictionary_add(ng_flags->dict, "replace", ng_flags->replace);
+ if ((ng_flags->transport & 0x100))
+ bencode_dictionary_add_string(ng_flags->dict, "transport-protocol",
+ transports[ng_flags->transport & 0x007]);
+ if (ng_flags->rtcp_mux && ng_flags->rtcp_mux->child)
+ bencode_dictionary_add(ng_flags->dict, "rtcp-mux", ng_flags->rtcp_mux);
if (!callid_exist)
- bencode_dictionary_add_str(ng_flags.dict, "call-id", &ng_flags.call_id);
+ bencode_dictionary_add_str(ng_flags->dict, "call-id", &ng_flags->call_id);
- if (ng_flags.via) {
- if (ng_flags.via == 1 || ng_flags.via == 2)
- ret = get_via_branch(msg, ng_flags.via, &viabranch);
- else if (ng_flags.via == -1 && extra_id_pv)
+ if (ng_flags->via) {
+ if (ng_flags->via == 1 || ng_flags->via == 2)
+ ret = get_via_branch(msg, ng_flags->via, &viabranch);
+ else if (ng_flags->via == -1 && extra_id_pv)
ret = get_extra_id(msg, &viabranch);
- else if (ng_flags.via == 4 && ng_flags.viabranch.len) {
- viabranch = ng_flags.viabranch;
+ else if (ng_flags->via == 4 && ng_flags->viabranch.len) {
+ viabranch = ng_flags->viabranch;
ret = 1;
} else
ret = -1;
if (ret == -1 || viabranch.len == 0) {
- err = "can't get Via branch/extra ID";
+ *err = "can't get Via branch/extra ID";
goto error;
}
- bencode_dictionary_add_str(ng_flags.dict, "via-branch", &viabranch);
+ bencode_dictionary_add_str(ng_flags->dict, "via-branch", &viabranch);
}
item = bencode_list(bencbuf);
- bencode_dictionary_add(ng_flags.dict, "received-from", item);
+ bencode_dictionary_add(ng_flags->dict, "received-from", item);
if (msg) {
- if (!ng_flags.received_from.len) {
+ if (!ng_flags->received_from.len) {
bencode_list_add_string(item, (msg->rcv.src_ip.af == AF_INET) ? "IP4" : (
(msg->rcv.src_ip.af == AF_INET6) ? "IP6" :
"?"
@@ -2720,11 +2744,11 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
} else {
struct ip_addr *ip_tmp = NULL;
- ip_tmp = str2ip(&ng_flags.received_from);
+ ip_tmp = str2ip(&ng_flags->received_from);
if (!ip_tmp) {
- ip_tmp = str2ip6(&ng_flags.received_from);
+ ip_tmp = str2ip6(&ng_flags->received_from);
if (!ip_tmp) {
- err = "received-from value is not an IP";
+ *err = "received-from value is not an IP";
goto error;
}
}
@@ -2741,41 +2765,72 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
op == OP_BLOCK_MEDIA || op == OP_UNBLOCK_MEDIA ||
op == OP_BLOCK_DTMF || op == OP_UNBLOCK_DTMF ||
op == OP_START_FORWARD || op == OP_STOP_FORWARD) {
- if (ng_flags.directional && !from_tag_exist)
- bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.from_tag);
- } else if (ng_flags.directional
+ if (ng_flags->directional && !from_tag_exist)
+ bencode_dictionary_add_str(ng_flags->dict, "from-tag", &ng_flags->from_tag);
+ } else if (ng_flags->directional
|| (msg && ((msg->first_line.type == SIP_REQUEST && op != OP_ANSWER)
|| (msg->first_line.type == SIP_REPLY && op == OP_DELETE)
|| (msg->first_line.type == SIP_REPLY && op == OP_ANSWER))))
{
if (!from_tag_exist)
- bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.from_tag);
+ bencode_dictionary_add_str(ng_flags->dict, "from-tag", &ng_flags->from_tag);
if (op != OP_START_MEDIA && op != OP_STOP_MEDIA) {
/* no need of to-tag if we are just playing media */
- if (ng_flags.to && ng_flags.to_tag.s && ng_flags.to_tag.len && !to_tag_exist && !extra_dict)
- bencode_dictionary_add_str(ng_flags.dict, "to-tag", &ng_flags.to_tag);
+ if (ng_flags->to && ng_flags->to_tag.s && ng_flags->to_tag.len && !to_tag_exist && !extra_dict)
+ bencode_dictionary_add_str(ng_flags->dict, "to-tag", &ng_flags->to_tag);
}
}
else {
- if (!ng_flags.to_tag.s || !ng_flags.to_tag.len) {
- err = "No to-tag present";
+ if (!ng_flags->to_tag.s || !ng_flags->to_tag.len) {
+ *err = "No to-tag present";
goto error;
}
if (!from_tag_exist)
- bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.to_tag);
+ bencode_dictionary_add_str(ng_flags->dict, "from-tag", &ng_flags->to_tag);
if (!to_tag_exist && !extra_dict)
- bencode_dictionary_add_str(ng_flags.dict, "to-tag", &ng_flags.from_tag);
+ bencode_dictionary_add_str(ng_flags->dict, "to-tag", &ng_flags->from_tag);
}
- bencode_dictionary_add_string(ng_flags.dict, "command", command_strings[op]);
-
- /*** send it out ***/
+ bencode_dictionary_add_string(ng_flags->dict, "command", command_strings[op]);
if (bencbuf->error) {
- err = "out of memory - bencode failed";
+ LM_ERR("bencode failed\n");
goto error;
}
+ if (flags_nt.s)
+ pkg_free(flags_nt.s);
+
+ return 1;
+
+error:
+ if (flags_nt.s)
+ pkg_free(flags_nt.s);
+ return -1;
+}
+
+static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg,
+ enum rtpe_operation op, str *flags_str, str *body_in, pv_spec_t *spvar,
+ struct rtpe_set *set, str *snode, bencode_item_t *extra_dict)
+{
+ struct ng_flags_parse ng_flags;
+ bencode_item_t *resp;
+ str error;
+ struct rtpe_node *node, *failed_node;
+ char *cp, *err = NULL;
+ pv_value_t val;
+ struct rtpe_ignore_node *ignore_list = NULL;
+ int ret;
+
+ memset(&ng_flags, 0, sizeof(ng_flags));
+ error.len = 0;
+ error.s = "";
+
+ /*** get & init basic stuff needed ***/
+ if (rtpe_function_call_prepare(bencbuf, msg, op, &ng_flags, flags_str, body_in, extra_dict,&err) < 0)
+ goto error;
+
+ /*** send it out ***/
if (!set && (set=rtpe_ctx_set_get())==NULL )
set = *default_rtpe_set;
@@ -2853,15 +2908,10 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
LM_ERR("setting rtpengine pvar failed\n");
}
- if (flags_nt.s)
- pkg_free(flags_nt.s);
-
return resp;
error:
rtpe_free_ignore_node(ignore_list);
- if (flags_nt.s)
- pkg_free(flags_nt.s);
if (err) {
LM_ERR("%s\n", err);
init_str(&error, err);
@@ -3311,6 +3361,425 @@ rtpengine_delete_f(struct sip_msg* msg, str *flags, pv_spec_t *spvar)
return rtpengine_delete(msg, flags, NULL, NULL, spvar);
}
+static bencode_item_t *rtpe_function_call_process(bencode_buffer_t *bencbuf, char* cp, int ret)
+{
+ bencode_item_t *resp;
+ str error;
+
+ resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY);
+ if (!resp) {
+ LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
+ return NULL;
+ }
+ if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
+ if (!bencode_dictionary_get_str(resp, "error-reason", &error))
+ LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
+ else
+ LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
+ return NULL;
+ }
+
+ return resp;
+}
+
+enum async_ret_code resume_async_send_rtpe_command(int fd, struct sip_msg *msg, void *_param)
+{
+ int len = 0, cookielen = 0;
+ static char buf[0x10000];
+ char* cp = buf;
+ bencode_item_t *dict;
+ str oldbody = { 0, 0 };
+ str newbody;
+ struct lump *anchor;
+ pv_value_t val;
+ struct rtpe_ctx *ctx;
+ rtpe_async_param *param = (rtpe_async_param *)_param;
+
+ LM_DBG("Need to resume async rtpe call \n");
+
+ if (param->node->rn_umode == 0) {
+ do {
+ len = read(fd, buf, sizeof(buf) - 1);
+ } while (len == -1 && errno == EINTR);
+ close(fd);
+ if (len <= 0) {
+ LM_ERR("can't read reply from a RTP Engine\n");
+ goto error;
+ }
+ } else {
+ /* UDP got triggered, wait for 1second max to read now */
+ struct timeval tv;
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
+
+ len = recv(fd, buf, sizeof(buf)-1, 0);
+ if (len <= 0) {
+ LM_ERR("can't read reply from a RTP Engine (%d, %d)\n", len, errno);
+ RTPE_IO_ERROR_CLOSE(param->node->idx);
+ goto error;
+ }
+ cookielen = strlen(param->cookie);
+ if (len >= (cookielen - 1) &&
+ memcmp(buf, param->cookie, (cookielen - 1)) == 0) {
+ len -= (cookielen - 1);
+ cp += (cookielen - 1);
+ if (len != 0) {
+ len--;
+ cp++;
+ }
+ } else {
+ LM_ERR("read reply from a RTP Ending FD %d. Cookie does NOT match (%.*s <> %.*s)\n",
+ fd, cookielen - 1, param->cookie, cookielen - 1, buf);
+ goto error;
+ }
+ }
+
+ /* store the value of the selected node */
+ if (param->spvar) {
+ memset(&val, 0, sizeof(pv_value_t));
+ val.flags = PV_VAL_STR;
+ val.rs = param->node->rn_url;
+ if(pv_set_value(msg, param->spvar, (int)EQ_T, &val)<0)
+ LM_ERR("setting rtpengine pvar failed\n");
+ }
+
+ /* process reply */
+ dict = rtpe_function_call_process(param->bencbuf, cp, len);
+ if(!dict) {
+ goto error;
+ }
+
+ if (param->op != OP_DELETE) {
+ LM_DBG("Got reply for a non-delete - yay \n");
+ if (!bencode_dictionary_get_str_dup(dict, "sdp", &newbody)) {
+ LM_ERR("failed to extract sdp body from proxy reply\n");
+ goto error;
+ }
+
+ /* if we have a variable to store into, use it */
+ if (param->bpvar) {
+ memset(&val, 0, sizeof(pv_value_t));
+ val.flags = PV_VAL_STR;
+ val.rs = newbody;
+ if(pv_set_value(msg, param->bpvar, (int)EQ_T, &val)<0)
+ LM_ERR("setting PV failed\n");
+ pkg_free(newbody.s);
+ } else if (extract_body(msg, &oldbody) > 0) {
+ /* otherwise directly set the body of the message */
+ anchor = del_lump(msg, oldbody.s - msg->buf, oldbody.len, 0);
+ if (!anchor) {
+ LM_ERR("del_lump failed\n");
+ goto error;
+ }
+ if (!insert_new_lump_after(anchor, newbody.s, newbody.len, 0)) {
+ LM_ERR("insert_new_lump_after failed\n");
+ goto error;
+ }
+ } else {
+ LM_ERR("cannot parse old body!\n");
+ goto error;
+ }
+ }
+
+ if (param->op == OP_DELETE && rtpengine_stats_used) {
+ /* if statistics are to be used, store stats in the ctx, if possible */
+ if ((ctx = rtpe_ctx_get())) {
+ if (ctx->stats) {
+ rtpe_stats_free(ctx->stats); /* release the buffer */
+ pkg_free(&(ctx->stats->buf));
+ } else
+ ctx->stats = pkg_malloc(sizeof *ctx->stats);
+ if (ctx->stats) {
+ ctx->stats->buf = *(param->bencbuf);
+ ctx->stats->dict = dict;
+ ctx->stats->json.s = 0;
+ /* return here to prevent buffer from being freed */
+ pkg_free(param->bencbuf);
+ pkg_free(param->cookie);
+ pkg_free(param);
+ async_status = ASYNC_DONE_CLOSE_FD;
+ return 1;
+ } else
+ LM_WARN("no more pkg memory - cannot cache stats!\n");
+ }
+ }
+
+ pkg_free(param->cookie);
+ bencode_buffer_free(param->bencbuf);
+ pkg_free(param->bencbuf);
+ pkg_free(param);
+ async_status = ASYNC_DONE_CLOSE_FD;
+ return 1;
+
+error:
+ pkg_free(param->cookie);
+ bencode_buffer_free(param->bencbuf);
+ pkg_free(param->bencbuf);
+ pkg_free(param);
+ async_status = ASYNC_DONE_CLOSE_FD;
+ return -1;
+}
+
+enum async_ret_code timeout_async_send_rtpe_command(int fd, struct sip_msg *msg, void *_param)
+{
+ rtpe_async_param *param = (rtpe_async_param *)_param;
+ LM_ERR("can't read reply from a RTP proxy - TIMEOUT on %s\n",param->node->rn_url.s);
+
+ param->node->rn_disabled = 1;
+ param->node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
+
+ pkg_free(param->cookie);
+ bencode_buffer_free(param->bencbuf);
+ pkg_free(param->bencbuf);
+ pkg_free(param);
+ async_status = ASYNC_DONE_CLOSE_FD;
+ return -1;
+}
+
+static int start_async_send_rtpe_command(struct rtpe_node *node, bencode_item_t *dict, char* cookie, enum async_ret_code *out_fd)
+{
+ struct sockaddr_un addr;
+ int fd=-1, len, vcnt;
+ char buf[0x10000];
+ struct pollfd fds[1];
+ struct iovec *v;
+
+ v = bencode_iovec(dict, &vcnt, 1, 0);
+ if (!v) {
+ LM_ERR("error converting bencode to iovec\n");
+ goto error;
+ }
+
+ len = 0;
+ if (node->rn_umode == 0) {
+ memset(&addr, 0, sizeof(addr));
+ addr.sun_family = AF_LOCAL;
+ strncpy(addr.sun_path, node->rn_address,
+ sizeof(addr.sun_path) - 1);
+#ifdef HAVE_SOCKADDR_SA_LEN
+ addr.sun_len = strlen(addr.sun_path);
+#endif
+
+ fd = socket(AF_LOCAL, SOCK_STREAM, 0);
+ if (fd < 0) {
+ LM_ERR("can't create socket %d \n",errno);
+ goto badproxy;
+ }
+ if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
+ LM_ERR("can't connect to RTP proxy %s (%d:%s)\n",node->rn_url.s,errno,strerror(errno));
+ close(fd);
+ goto badproxy;
+ }
+
+ do {
+ len = writev(fd, v + 1, vcnt);
+ } while (len == -1 && errno == EINTR);
+ if (len <= 0) {
+ LM_ERR("can't send command to RTP proxy %s (%d:%s)\n",node->rn_url.s,
+ errno, strerror(errno));
+ close(fd);
+ goto badproxy;
+ }
+ *out_fd = fd;
+ } else {
+ if (rtpe_socks[node->idx] != -1) {
+ fds[0].fd = rtpe_socks[node->idx];
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ /* Drain input buffer */
+ while ((poll(fds, 1, 0) == 1) &&
+ ((fds[0].revents & POLLIN) != 0)) {
+ if (fds[0].revents & (POLLERR|POLLNVAL|POLLHUP)) {
+ LM_WARN("error on rtpengine socket %d!\n", rtpe_socks[node->idx]);
+ RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]);
+ break;
+ }
+ fds[0].revents = 0;
+ if (recv(rtpe_socks[node->idx], buf, sizeof(buf) - 1, 0) < 0 && errno != EINTR) {
+ LM_WARN("error while draining rtpengine %d!\n", errno);
+ RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]);
+ break;
+ }
+ }
+ }
+
+ v[0].iov_base = cookie;
+ v[0].iov_len = strlen(v[0].iov_base);
+ if (rtpe_socks[node->idx] == -1 && !rtpengine_connect_node(node)) {
+ LM_ERR("cannot reconnect RTP engine socket!\n");
+ goto badproxy;
+ }
+
+ fd = socket((node->rn_umode == 6) ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
+ if (fd < 0) {
+ LM_ERR("can't create socket %d \n",errno);
+ goto badproxy;
+ }
+ if (connect(fd, &(node->ai_addr), node->ai_addrlen) < 0) {
+ LM_ERR("can't connect to RTP proxy %s (%d:%s)\n",node->rn_url.s,errno,strerror(errno));
+ close(fd);
+ goto badproxy;
+ }
+ do {
+ len = writev(fd, v, vcnt + 1);
+ } while (len == -1 && (errno == EINTR || errno == ENOBUFS || errno == EMSGSIZE));
+
+ if (len <= 0) {
+ LM_ERR("can't send command to RTP proxy %s (%d:%s)\n",node->rn_url.s,
+ errno, strerror(errno));
+
+ RTPE_IO_ERROR_CLOSE(fd);
+ goto badproxy;
+ }
+ *out_fd = fd;
+ }
+
+ return 1;
+
+badproxy:
+ LM_ERR("proxy <%s> does not respond, disable it\n", node->rn_url.s);
+ node->rn_disabled = 1;
+ node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;
+error:
+ if (fd>0)
+ close(fd);
+
+ *out_fd = ASYNC_NO_IO;
+ return -1;
+}
+
+
+static int rtpe_function_call_async(struct sip_msg *msg, async_ctx *ctx, str *flags_str,
+ pv_spec_t *spvar, pv_spec_t *bpvar, str *body, enum rtpe_operation op)
+{
+ struct ng_flags_parse ng_flags;
+ str oldbody;
+ struct rtpe_node *node;
+ struct rtpe_set *set;
+ int ret, read_fd;
+ rtpe_async_param *param;
+ char* cookie = NULL;
+ char *err;
+
+ bencode_buffer_t *bencbuf = pkg_malloc(sizeof(bencode_buffer_t));
+ memset(&ng_flags, 0, sizeof(ng_flags));
+
+ /*** get & init basic stuff needed ***/
+
+ if (op != OP_DELETE) {
+ if (!body) {
+ if (extract_body(msg, &oldbody) == -1) {
+ LM_ERR("can't extract body from the message\n");
+ goto error;
+ }
+ } else {
+ oldbody = *body;
+ }
+ }
+
+ if(rtpe_function_call_prepare(bencbuf, msg, op, &ng_flags, flags_str, &oldbody, NULL,&err) < 0)
+ goto error;
+
+ /*** send it out ***/
+ if ( (set=rtpe_ctx_set_get())==NULL )
+ set = *default_rtpe_set;
+
+ RTPE_START_READ();
+
+ /* FIXME - failover logic for async is disabled for now */
+ node = select_rtpe_node(ng_flags.call_id, set,NULL);
+ if (!node) {
+ LM_ERR("no available proxies\n");
+ RTPE_STOP_READ();
+ goto error;
+ }
+
+ cookie = gencookie();
+ ret = start_async_send_rtpe_command(node, ng_flags.dict, cookie, &read_fd);
+
+ RTPE_STOP_READ();
+ LM_DBG("async proxy reply: %d\n", ret);
+
+ if (read_fd == ASYNC_NO_IO) {
+ ctx->resume_f = NULL;
+ ctx->resume_param = NULL;
+ bencode_buffer_free(bencbuf);
+ pkg_free(bencbuf);
+ return ret;
+ } else if (read_fd == ASYNC_SYNC) {
+ /* no need for async - transfer already completed! */
+ async_status = ASYNC_SYNC;
+ bencode_buffer_free(bencbuf);
+ pkg_free(bencbuf);
+ return ret;
+ }
+
+ param = pkg_malloc(sizeof(rtpe_async_param));
+ if (!param) {
+ LM_ERR("no more pkg mem\n");
+ goto error;
+ }
+ memset(param, 0, sizeof(rtpe_async_param));
+
+ param->bencbuf = bencbuf;
+ param->op = op;
+ param->node = node;
+ param->cookie = pkg_strdup(cookie);
+ param->bpvar = bpvar;
+ param->spvar = spvar;
+
+ ctx->resume_f = resume_async_send_rtpe_command;
+ ctx->timeout_f = timeout_async_send_rtpe_command;
+ ctx->resume_param = param;
+
+ /* async started with success */
+ async_status = read_fd;
+ return 1;
+
+error:
+ bencode_buffer_free(bencbuf);
+ pkg_free(bencbuf);
+ return -1;
+}
+
+static int
+rtpengine_offer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar, pv_spec_t *bpvar, str *body)
+{
+ LM_DBG("Async rtpengine_offer\n");
+
+ if (set_rtpengine_set_from_avp(msg) == -1)
+ return -1;
+
+ return rtpe_function_call_async(msg, ctx, flags, spvar, bpvar, body, OP_OFFER);
+}
+
+static int
+rtpengine_answer_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar, pv_spec_t *bpvar, str *body)
+{
+ LM_DBG("Async rtpengine_answer\n");
+
+ if (set_rtpengine_set_from_avp(msg) == -1)
+ return -1;
+
+ if (msg->first_line.type == SIP_REQUEST)
+ if (msg->first_line.u.request.method_value != METHOD_ACK)
+ return -1;
+
+ return rtpe_function_call_async(msg, ctx, flags, spvar, bpvar, body, OP_ANSWER);
+}
+
+static int
+rtpengine_delete_af(struct sip_msg *msg, async_ctx *ctx, str *flags, pv_spec_t *spvar)
+{
+ LM_DBG("Async rtpengine_delete\n");
+
+ if (set_rtpengine_set_from_avp(msg) == -1)
+ return -1;
+
+ return rtpe_function_call_async(msg, ctx, flags, spvar, NULL, NULL, OP_DELETE);
+}
+
/* This function assumes p points to a line of requested type. */
static int
diff --git a/modules/rtpengine/rtpengine.h b/modules/rtpengine/rtpengine.h
index a8c2bdd747..17241ae213 100644
--- a/modules/rtpengine/rtpengine.h
+++ b/modules/rtpengine/rtpengine.h
@@ -45,6 +45,9 @@ struct rtpe_node {
unsigned int rn_recheck_ticks;
unsigned int rn_last_ticks;
int rn_flags;
+ socklen_t ai_addrlen;
+ struct sockaddr ai_addr;
+
struct rtpe_node *rn_next;
};