From d9aa1cec84c662f2f7fda025ee023f6dec927bdc Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Mon, 6 Apr 2020 18:28:44 +0200 Subject: [PATCH] Added support for static Opus files to Streaming plugin (#2040) --- Makefile.am | 6 +- conf/janus.plugin.streaming.jcfg.sample.in | 35 +- configure.ac | 1 + html/streamingtest.js | 16 + plugins/janus_streaming.c | 410 ++++++++++++++++++--- 5 files changed, 402 insertions(+), 66 deletions(-) diff --git a/Makefile.am b/Makefile.am index 097a4d8be6..6c0f5f9200 100644 --- a/Makefile.am +++ b/Makefile.am @@ -464,9 +464,9 @@ endif if ENABLE_PLUGIN_STREAMING plugin_LTLIBRARIES += plugins/libjanus_streaming.la plugins_libjanus_streaming_la_SOURCES = plugins/janus_streaming.c -plugins_libjanus_streaming_la_CFLAGS = $(plugins_cflags) $(LIBCURL_CFLAGS) -plugins_libjanus_streaming_la_LDFLAGS = $(plugins_ldflags) $(LIBCURL_LDFLAGS) $(LIBCURL_LIBS) -plugins_libjanus_streaming_la_LIBADD = $(plugins_libadd) $(LIBCURL_LIBADD) +plugins_libjanus_streaming_la_CFLAGS = $(plugins_cflags) $(LIBCURL_CFLAGS) $(OGG_CFLAGS) +plugins_libjanus_streaming_la_LDFLAGS = $(plugins_ldflags) $(LIBCURL_LDFLAGS) $(LIBCURL_LIBS) $(OGG_LDFLAGS) $(OGG_LIBS) +plugins_libjanus_streaming_la_LIBADD = $(plugins_libadd) $(LIBCURL_LIBADD) $(OGG_LIBADD) conf_DATA += conf/janus.plugin.streaming.jcfg.sample stream_DATA += \ plugins/streams/music.mulaw \ diff --git a/conf/janus.plugin.streaming.jcfg.sample.in b/conf/janus.plugin.streaming.jcfg.sample.in index 03a48b58d1..f19c89c4b9 100644 --- a/conf/janus.plugin.streaming.jcfg.sample.in +++ b/conf/janus.plugin.streaming.jcfg.sample.in @@ -94,10 +94,19 @@ general: { #string_ids = true } -gstreamer-sample: { +# +# This is an example of an RTP source stream, which is what you'll need +# in the vast majority of cases: here, the Streaming plugin will bind to +# some ports, and expect media to be sent by an external source (e.g., +# FFmpeg or Gstreamer). This sample listens on 5002 for audio (Opus) and +# 5004 for video (VP8), which is what the sample gstreamer script in the +# plugins/streams folder sends to. Whatever is sent to those ports will +# be the source of a WebRTC broadcast users can subscribe to. +# +rtp-sample: { type = "rtp" id = 1 - description = "Opus/VP8 live stream coming from gstreamer" + description = "Opus/VP8 live stream coming from external source" audio = true video = true audioport = 5002 @@ -109,6 +118,13 @@ gstreamer-sample: { secret = "adminpwd" } +# +# This is a sample of the file-based streaming support. Specifically, +# this simulates a radio broadcast by streaming (in a loop) raw a-Law +# (that is, G.711) frames. Since type is "live", anyone subscribing to +# this mountpoint will listen to the same broadcast as if it were live. +# Notice that file-based streaming supports Opus files too, but no video. +# file-live-sample: { type = "live" id = 2 @@ -119,6 +135,14 @@ file-live-sample: { secret = "adminpwd" } +# +# This is another sample of the file-based streaming support, but using +# the "ondemand" type instead. In this case, the file we're streaming +# contains raw mu-Law (still G.711) frames. Since this is "ondemand", +# anyone subscribing to this mountpoint will listen to their own version +# of the stream, meaning that it will start from the beginning and then +# loop when it's over. On-demand streaming supports Opus files as well. +# file-ondemand-sample: { type = "ondemand" id = 3 @@ -171,10 +195,11 @@ file-ondemand-sample: { #} # -# This is a sample configuration for Opus/VP8 multicast streams. You +# This is a variation of the rtp-sample configuration for Opus/VP8 shown +# before, where multicast support is used to receive the streams. You # need an external script to feed data on those ports, of course. # -#gstreamer-multicast: { +#rtp-multicast: { #type = "rtp" #id = 20 #description = "Opus/VP8 live multicast stream sample" @@ -197,7 +222,7 @@ file-ondemand-sample: { # authentication will only work if you installed libcurl >= 7.45.0) # NOTE WELL: the plugin does NOT transcode, so the RTSP stream MUST be # in a format the browser can digest (e.g., VP8 or H.264 baseline for video) -# Again, you can override payload type, rtpmap and/or fmtp, if needed +# Again, you can override payload type, rtpmap and/or fmtp, if needed. # #rtsp-test: { #type = "rtsp" diff --git a/configure.ac b/configure.ac index f2c49c6f6d..6010f106f5 100644 --- a/configure.ac +++ b/configure.ac @@ -758,6 +758,7 @@ AC_SUBST([OPUS_LIBS]) PKG_CHECK_MODULES([OGG], [ogg], [ + AC_DEFINE(HAVE_LIBOGG) AS_IF([test "x$enable_plugin_voicemail" = "xmaybe"], [enable_plugin_voicemail=yes]) ], diff --git a/html/streamingtest.js b/html/streamingtest.js index 2137f8aeb1..52d6629fab 100644 --- a/html/streamingtest.js +++ b/html/streamingtest.js @@ -147,12 +147,19 @@ $(document).ready(function() { if(jsep !== undefined && jsep !== null) { Janus.debug("Handling SDP as well..."); Janus.debug(jsep); + var stereo = (jsep.sdp.indexOf("stereo=1") !== -1); // Offer from the plugin, let's answer streaming.createAnswer( { jsep: jsep, // We want recvonly audio/video and, if negotiated, datachannels media: { audioSend: false, videoSend: false, data: true }, + customizeSdp: function(jsep) { + if(stereo && jsep.sdp.indexOf("stereo=1") == -1) { + // Make sure that our offer contains stereo too + jsep.sdp = jsep.sdp.replace("useinbandfec=1", "useinbandfec=1;stereo=1"); + } + }, success: function(jsep) { Janus.debug("Got SDP!"); Janus.debug(jsep); @@ -298,6 +305,15 @@ function updateStreamsList() { $('#watch').attr('disabled', true).unbind('click'); var list = result["list"]; Janus.log("Got a list of available streams"); + if(list && Array.isArray(list)) { + list.sort(function(a, b) { + if(!a || a.id < (b ? b.id : 0)) + return -1; + if(!b || b.id < (a ? a.id : 0)) + return 1; + return 0; + }); + } Janus.debug(list); for(var mp in list) { Janus.debug(" >> [" + list[mp]["id"] + "] " + list[mp]["description"] + " (" + list[mp]["type"] + ")"); diff --git a/plugins/janus_streaming.c b/plugins/janus_streaming.c index 08b9826a8e..b5007c5f95 100644 --- a/plugins/janus_streaming.c +++ b/plugins/janus_streaming.c @@ -690,6 +690,10 @@ rtspiface = network interface IP address or device name to listen on when receiv #endif #endif +#ifdef HAVE_LIBOGG +#include +#endif + #include "../debug.h" #include "../apierror.h" #include "../config.h" @@ -816,9 +820,15 @@ static struct janus_json_parameter rtp_parameters[] = { }; static struct janus_json_parameter live_parameters[] = { {"filename", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, + {"audiortpmap", JSON_STRING, 0}, + {"audiofmtp", JSON_STRING, 0}, + {"audiopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} }; static struct janus_json_parameter ondemand_parameters[] = { {"filename", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}, + {"audiortpmap", JSON_STRING, 0}, + {"audiofmtp", JSON_STRING, 0}, + {"audiopt", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE} }; #ifdef HAVE_LIBCURL static struct janus_json_parameter rtsp_parameters[] = { @@ -1053,6 +1063,7 @@ typedef struct janus_streaming_rtp_source { typedef struct janus_streaming_file_source { char *filename; + gboolean opus; } janus_streaming_file_source; /* used for audio/video fd and RTCP fd */ @@ -1124,8 +1135,8 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( gboolean dodata, const janus_network_address *diface, uint16_t dport, gboolean textdata, gboolean buffermsg); /* Helper to create a file/ondemand live source */ janus_streaming_mountpoint *janus_streaming_create_file_source( - uint64_t id, char *id_str, char *name, char *desc, char *filename, - gboolean live, gboolean doaudio, gboolean dovideo); + uint64_t id, char *id_str, char *name, char *desc, char *filename, gboolean live, + gboolean doaudio, uint8_t acodec, char *artpmap, char *afmtp, gboolean dovideo); /* Helper to create a rtsp live source */ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( uint64_t id, char *id_str, char *name, char *desc, @@ -1263,6 +1274,151 @@ static void janus_streaming_message_free(janus_streaming_message *msg) { g_free(msg); } +#ifdef HAVE_LIBOGG +/* Helper struct to handle the playout of Opus files */ +typedef struct janus_streaming_opus_context { + char *name, *filename; + FILE *file; + ogg_sync_state sync; + ogg_stream_state stream; + ogg_page page; + ogg_packet pkt; + char *oggbuf; + gint state, headers; +} janus_streaming_opus_context; +/* Helper method to open an Opus file, and make sure it's valid */ +static int janus_streaming_opus_context_init(janus_streaming_opus_context *ctx) { + if(ctx == NULL || ctx->file == NULL) + return -1; + fseek(ctx->file, 0, SEEK_SET); + ogg_stream_clear(&ctx->stream); + ogg_sync_clear(&ctx->sync); + if(ogg_sync_init(&ctx->sync) < 0) { + JANUS_LOG(LOG_ERR, "[%s] Error re-initializing Ogg sync state...\n", ctx->name); + return -1; + } + ctx->headers = 0; + return 0; +} +/* Helper method to check if an Ogg page begins with an Ogg stream */ +static gboolean janus_streaming_ogg_is_opus(ogg_page *page) { + ogg_stream_state state; + ogg_packet pkt; + ogg_stream_init(&state, ogg_page_serialno(page)); + ogg_stream_pagein(&state, page); + if(ogg_stream_packetout(&state, &pkt) == 1) { + if(pkt.bytes >= 19 && !memcmp(pkt.packet, "OpusHead", 8)) { + ogg_stream_clear(&state); + return 1; + } + } + ogg_stream_clear(&state); + return FALSE; +} +/* Helper method to traverse the Opus file until we get a packet we can send */ +static int janus_streaming_opus_context_read(janus_streaming_opus_context *ctx, char *buffer, int length) { + if(ctx == NULL || ctx->file == NULL || buffer == NULL) + return -1; + /* Check our current state in processing the Ogg file */ + int read = 0; + if(ctx->state == 0) { + /* Prepare a buffer, and read from the Ogg file... */ + ctx->oggbuf = ogg_sync_buffer(&ctx->sync, 8192); + if(ctx->oggbuf == NULL) { + JANUS_LOG(LOG_ERR, "[%s] ogg_sync_buffer failed...\n", ctx->name); + return -2; + } + read = fread(ctx->oggbuf, 1, 8192, ctx->file); + if(feof(ctx->file)) { + /* FIXME We're doing this forever... should this be configurable? */ + JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", ctx->name, ctx->filename); + if(janus_streaming_opus_context_init(ctx) < 0) + return -3; + return janus_streaming_opus_context_read(ctx, buffer, length); + } + if(ogg_sync_wrote(&ctx->sync, read) < 0) { + JANUS_LOG(LOG_ERR, "[%s] ogg_sync_wrote failed...\n", ctx->name); + return -4; + } + /* Next state: sync pageout */ + ctx->state = 1; + } + if(ctx->state == 1) { + /* Prepare an ogg_page out of the buffer */ + while((read = ogg_sync_pageout(&ctx->sync, &ctx->page)) == 1) { + /* Let's look for an Opus stream, first of all */ + if(ctx->headers == 0) { + if(janus_streaming_ogg_is_opus(&ctx->page)) { + /* This is the start of an Opus stream */ + if(ogg_stream_init(&ctx->stream, ogg_page_serialno(&ctx->page)) < 0) { + JANUS_LOG(LOG_ERR, "[%s] ogg_stream_init failed...\n", ctx->name); + return -5; + } + ctx->headers++; + } else if(!ogg_page_bos(&ctx->page)) { + /* No Opus stream? */ + JANUS_LOG(LOG_ERR, "[%s] No Opus stream...\n", ctx->name); + return -6; + } else { + /* Still waiting for an Opus stream */ + return janus_streaming_opus_context_read(ctx, buffer, length); + } + } + /* Submit the page for packetization */ + if(ogg_stream_pagein(&ctx->stream, &ctx->page) < 0) { + JANUS_LOG(LOG_ERR, "[%s] ogg_stream_pagein failed...\n", ctx->name); + return -7; + } + /* Time to start reading packets */ + ctx->state = 2; + break; + } + if(read != 1) { + /* Go back to reading from the file */ + ctx->state = 0; + return janus_streaming_opus_context_read(ctx, buffer, length); + } + } + if(ctx->state == 2) { + /* Read and process available packets */ + if(ogg_stream_packetout(&ctx->stream, &ctx->pkt) != 1) { + /* Go back to reading pages */ + ctx->state = 1; + return janus_streaming_opus_context_read(ctx, buffer, length); + } else { + /* Skip header packets */ + if(ctx->headers == 1 && ctx->pkt.bytes >= 19 && !memcmp(ctx->pkt.packet, "OpusHead", 8)) { + ctx->headers++; + return janus_streaming_opus_context_read(ctx, buffer, length); + } + if(ctx->headers == 2 && ctx->pkt.bytes >= 16 && !memcmp(ctx->pkt.packet, "OpusTags", 8)) { + ctx->headers++; + return janus_streaming_opus_context_read(ctx, buffer, length); + } + /* Get the packet duration */ + if(length < ctx->pkt.bytes) { + JANUS_LOG(LOG_WARN, "[%s] Buffer too short for Opus packet (%d < %ld)\n", + ctx->name, length, ctx->pkt.bytes); + return -8; + } + memcpy(buffer, ctx->pkt.packet, ctx->pkt.bytes); + length = ctx->pkt.bytes; + return length; + } + } + /* If we got here, continue with the iteration */ + return -9; +} +/* Helper method to cleanup an Opus context */ +static void janus_streaming_opus_context_cleanup(janus_streaming_opus_context *ctx) { + if(ctx == NULL) + return; + if(ctx->headers > 0) + ogg_stream_clear(&ctx->stream); + ogg_sync_clear(&ctx->sync); +} +#endif + /* Helper method to send an RTCP PLI */ static void janus_streaming_rtcp_pli_send(janus_streaming_rtp_source *source) { @@ -1341,6 +1497,11 @@ static void janus_streaming_rtcp_remb_send(janus_streaming_rtp_source *source) { int janus_streaming_init(janus_callbacks *callback, const char *config_path) { #ifdef HAVE_LIBCURL curl_global_init(CURL_GLOBAL_ALL); +#else + JANUS_LOG(LOG_WARN, "libcurl not available, Streaming plugin will not have RTSP support\n"); +#endif +#ifndef HAVE_LIBOGG + JANUS_LOG(LOG_WARN, "libogg not available, Streaming plugin will not have file-based Opus streaming\n"); #endif if(g_atomic_int_get(&stopping)) { /* Still stopping from before */ @@ -1715,13 +1876,16 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { if(pin && pin->value) mp->pin = g_strdup(pin->value); } else if(!strcasecmp(type->value, "live")) { - /* File live source */ + /* File-based live source */ janus_config_item *desc = janus_config_get(config, cat, janus_config_type_item, "description"); janus_config_item *priv = janus_config_get(config, cat, janus_config_type_item, "is_private"); janus_config_item *secret = janus_config_get(config, cat, janus_config_type_item, "secret"); janus_config_item *pin = janus_config_get(config, cat, janus_config_type_item, "pin"); janus_config_item *file = janus_config_get(config, cat, janus_config_type_item, "filename"); janus_config_item *audio = janus_config_get(config, cat, janus_config_type_item, "audio"); + janus_config_item *acodec = janus_config_get(config, cat, janus_config_type_item, "audiopt"); + janus_config_item *artpmap = janus_config_get(config, cat, janus_config_type_item, "audiortpmap"); + janus_config_item *afmtp = janus_config_get(config, cat, janus_config_type_item, "audiofmtp"); janus_config_item *video = janus_config_get(config, cat, janus_config_type_item, "video"); if(file == NULL || file->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add 'live' mountpoint '%s', missing mandatory information...\n", cat->name); @@ -1731,14 +1895,20 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { gboolean is_private = priv && priv->value && janus_is_true(priv->value); gboolean doaudio = audio && audio->value && janus_is_true(audio->value); gboolean dovideo = video && video->value && janus_is_true(video->value); - /* TODO We should support something more than raw a-Law and mu-Law streams... */ + /* We only support audio for file-based streaming at the moment: for streaming + * files using other codecs/formats an external tools should feed us RTP instead */ if(!doaudio || dovideo) { JANUS_LOG(LOG_ERR, "Can't add 'live' mountpoint '%s', we only support audio file streaming right now...\n", cat->name); cl = cl->next; continue; } +#ifdef HAVE_LIBOGG + if(!strstr(file->value, ".opus") && !strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) { + JANUS_LOG(LOG_ERR, "Can't add 'live' mountpoint '%s', unsupported format (we only support Opus and raw mu-Law/a-Law files right now)\n", cat->name); +#else if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) { JANUS_LOG(LOG_ERR, "Can't add 'live' mountpoint '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name); +#endif cl = cl->next; continue; } @@ -1755,8 +1925,12 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { mpid, (char *)(id ? id->value : NULL), (char *)cat->name, desc ? (char *)desc->value : NULL, - (char *)file->value, - TRUE, doaudio, dovideo)) == NULL) { + (char *)file->value, TRUE, + doaudio, + (acodec && acodec->value) ? atoi(acodec->value) : 0, + artpmap ? (char *)artpmap->value : NULL, + afmtp ? (char *)afmtp->value : NULL, + dovideo)) == NULL) { JANUS_LOG(LOG_ERR, "Error creating 'live' mountpoint '%s'...\n", cat->name); cl = cl->next; continue; @@ -1767,13 +1941,16 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { if(pin && pin->value) mp->pin = g_strdup(pin->value); } else if(!strcasecmp(type->value, "ondemand")) { - /* mu-Law file on demand source */ + /* File-based on demand source */ janus_config_item *desc = janus_config_get(config, cat, janus_config_type_item, "description"); janus_config_item *priv = janus_config_get(config, cat, janus_config_type_item, "is_private"); janus_config_item *secret = janus_config_get(config, cat, janus_config_type_item, "secret"); janus_config_item *pin = janus_config_get(config, cat, janus_config_type_item, "pin"); janus_config_item *file = janus_config_get(config, cat, janus_config_type_item, "filename"); janus_config_item *audio = janus_config_get(config, cat, janus_config_type_item, "audio"); + janus_config_item *acodec = janus_config_get(config, cat, janus_config_type_item, "audiopt"); + janus_config_item *artpmap = janus_config_get(config, cat, janus_config_type_item, "audiortpmap"); + janus_config_item *afmtp = janus_config_get(config, cat, janus_config_type_item, "audiofmtp"); janus_config_item *video = janus_config_get(config, cat, janus_config_type_item, "video"); if(file == NULL || file->value == NULL) { JANUS_LOG(LOG_ERR, "Can't add 'ondemand' mountpoint '%s', missing mandatory information...\n", cat->name); @@ -1783,14 +1960,20 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { gboolean is_private = priv && priv->value && janus_is_true(priv->value); gboolean doaudio = audio && audio->value && janus_is_true(audio->value); gboolean dovideo = video && video->value && janus_is_true(video->value); - /* TODO We should support something more than raw a-Law and mu-Law streams... */ + /* We only support audio for file-based streaming at the moment: for streaming + * files using other codecs/formats an external tools should feed us RTP instead */ if(!doaudio || dovideo) { JANUS_LOG(LOG_ERR, "Can't add 'ondemand' mountpoint '%s', we only support audio file streaming right now...\n", cat->name); cl = cl->next; continue; } +#ifdef HAVE_LIBOGG + if(!strstr(file->value, ".opus") && !strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) { + JANUS_LOG(LOG_ERR, "Can't add 'live' mountpoint '%s', unsupported format (we only support Opus and raw mu-Law/a-Law files right now)\n", cat->name); +#else if(!strstr(file->value, ".alaw") && !strstr(file->value, ".mulaw")) { JANUS_LOG(LOG_ERR, "Can't add 'ondemand' mountpoint '%s', unsupported format (we only support raw mu-Law and a-Law files right now)\n", cat->name); +#endif cl = cl->next; continue; } @@ -1807,8 +1990,12 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) { mpid, (char *)(id ? id->value : NULL), (char *)cat->name, desc ? (char *)desc->value : NULL, - (char *)file->value, - FALSE, doaudio, dovideo)) == NULL) { + (char *)file->value, FALSE, + doaudio, + (acodec && acodec->value) ? atoi(acodec->value) : 0, + artpmap ? (char *)artpmap->value : NULL, + afmtp ? (char *)afmtp->value : NULL, + dovideo)) == NULL) { JANUS_LOG(LOG_ERR, "Error creating 'ondemand' mountpoint '%s'...\n", cat->name); cl = cl->next; continue; @@ -2643,7 +2830,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi } mp->is_private = is_private ? json_is_true(is_private) : FALSE; } else if(!strcasecmp(type_text, "live")) { - /* File live source */ + /* File-based live source */ JANUS_VALIDATE_JSON_OBJECT(root, live_parameters, error_code, error_cause, TRUE, JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT); @@ -2660,8 +2847,19 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *audio = json_object_get(root, "audio"); json_t *video = json_object_get(root, "video"); gboolean doaudio = audio ? json_is_true(audio) : FALSE; + uint8_t acodec = 0; + char *artpmap = NULL, *afmtp = NULL; + if(doaudio) { + json_t *audiopt = json_object_get(root, "audiopt"); + acodec = json_integer_value(audiopt); + json_t *audiortpmap = json_object_get(root, "audiortpmap"); + artpmap = (char *)json_string_value(audiortpmap); + json_t *audiofmtp = json_object_get(root, "audiofmtp"); + afmtp = (char *)json_string_value(audiofmtp); + } gboolean dovideo = video ? json_is_true(video) : FALSE; - /* TODO We should support something more than raw a-Law and mu-Law streams... */ + /* We only support audio for file-based streaming at the moment: for streaming + * files using other codecs/formats an external tools should feed us RTP instead */ if(!doaudio || dovideo) { JANUS_LOG(LOG_ERR, "Can't add 'live' stream, we only support audio file streaming right now...\n"); error_code = JANUS_STREAMING_ERROR_CANT_CREATE; @@ -2672,8 +2870,13 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi goto prepare_response; } char *filename = (char *)json_string_value(file); +#ifdef HAVE_LIBOGG + if(!strstr(filename, ".opus") && !strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) { + JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support Opus and raw mu-Law/a-Law files right now)\n"); +#else if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) { JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n"); +#endif error_code = JANUS_STREAMING_ERROR_CANT_CREATE; g_snprintf(error_cause, 512, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)"); janus_mutex_lock(&mountpoints_mutex); @@ -2696,8 +2899,8 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi mpid, mpid_str, name ? (char *)json_string_value(name) : NULL, desc ? (char *)json_string_value(desc) : NULL, - filename, - TRUE, doaudio, dovideo); + filename, TRUE, + doaudio, acodec, artpmap, afmtp, dovideo); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, &mpid); janus_mutex_unlock(&mountpoints_mutex); @@ -2709,7 +2912,7 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi } mp->is_private = is_private ? json_is_true(is_private) : FALSE; } else if(!strcasecmp(type_text, "ondemand")) { - /* mu-Law file on demand source */ + /* File-based on demand source */ JANUS_VALIDATE_JSON_OBJECT(root, ondemand_parameters, error_code, error_cause, TRUE, JANUS_STREAMING_ERROR_MISSING_ELEMENT, JANUS_STREAMING_ERROR_INVALID_ELEMENT); @@ -2726,8 +2929,19 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi json_t *audio = json_object_get(root, "audio"); json_t *video = json_object_get(root, "video"); gboolean doaudio = audio ? json_is_true(audio) : FALSE; + uint8_t acodec = 0; + char *artpmap = NULL, *afmtp = NULL; + if(doaudio) { + json_t *audiopt = json_object_get(root, "audiopt"); + acodec = json_integer_value(audiopt); + json_t *audiortpmap = json_object_get(root, "audiortpmap"); + artpmap = (char *)json_string_value(audiortpmap); + json_t *audiofmtp = json_object_get(root, "audiofmtp"); + afmtp = (char *)json_string_value(audiofmtp); + } gboolean dovideo = video ? json_is_true(video) : FALSE; - /* TODO We should support something more than raw a-Law and mu-Law streams... */ + /* We only support audio for file-based streaming at the moment: for streaming + * files using other codecs/formats an external tools should feed us RTP instead */ if(!doaudio || dovideo) { JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, we only support audio file streaming right now...\n"); error_code = JANUS_STREAMING_ERROR_CANT_CREATE; @@ -2738,7 +2952,13 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi goto prepare_response; } char *filename = (char *)json_string_value(file); +#ifdef HAVE_LIBOGG + if(!strstr(filename, ".opus") && !strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) { + JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support Opus and raw mu-Law/a-Law files right now)\n"); +#else if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) { + JANUS_LOG(LOG_ERR, "Can't add 'live' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n"); +#endif JANUS_LOG(LOG_ERR, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n"); error_code = JANUS_STREAMING_ERROR_CANT_CREATE; g_snprintf(error_cause, 512, "Can't add 'ondemand' stream, unsupported format (we only support raw mu-Law and a-Law files right now)"); @@ -2762,8 +2982,8 @@ static json_t *janus_streaming_process_synchronous_request(janus_streaming_sessi mpid, mpid_str, name ? (char *)json_string_value(name) : NULL, desc ? (char *)json_string_value(desc) : NULL, - filename, - FALSE, doaudio, dovideo); + filename, FALSE, + doaudio, acodec, artpmap, afmtp, dovideo); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, &mpid); janus_mutex_unlock(&mountpoints_mutex); @@ -5535,8 +5755,8 @@ janus_streaming_mountpoint *janus_streaming_create_rtp_source( /* Helper to create a file/ondemand live source */ janus_streaming_mountpoint *janus_streaming_create_file_source( - uint64_t id, char *id_str, char *name, char *desc, char *filename, - gboolean live, gboolean doaudio, gboolean dovideo) { + uint64_t id, char *id_str, char *name, char *desc, char *filename, gboolean live, + gboolean doaudio, uint8_t acodec, char *artpmap, char *afmtp, gboolean dovideo) { char id_num[30]; if(!string_ids) { g_snprintf(id_num, sizeof(id_num), "%"SCNu64, id); @@ -5565,13 +5785,27 @@ janus_streaming_mountpoint *janus_streaming_create_file_source( return NULL; } /* TODO We should support something more than raw a-Law and mu-Law streams... */ +#ifdef HAVE_LIBOGG + if(!strstr(filename, ".opus") && !strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) { + JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support Opus and raw mu-Law/a-Law files right now)\n"); +#else if(!strstr(filename, ".alaw") && !strstr(filename, ".mulaw")) { JANUS_LOG(LOG_ERR, "Can't add 'file' stream, unsupported format (we only support raw mu-Law and a-Law files right now)\n"); +#endif + janus_mutex_lock(&mountpoints_mutex); + g_hash_table_remove(mountpoints_temp, &id); + janus_mutex_unlock(&mountpoints_mutex); + return NULL; + } +#ifdef HAVE_LIBOGG + if(strstr(filename, ".opus") && (artpmap == NULL || strstr(artpmap, "opus/48000") == NULL)) { + JANUS_LOG(LOG_ERR, "Can't add 'file' stream, opus file is not associated with an opus rtpmap\n"); janus_mutex_lock(&mountpoints_mutex); g_hash_table_remove(mountpoints_temp, &id); janus_mutex_unlock(&mountpoints_mutex); return NULL; } +#endif janus_streaming_mountpoint *file_source = g_malloc0(sizeof(janus_streaming_mountpoint)); file_source->id = id; file_source->id_str = g_strdup(id_str); @@ -5602,8 +5836,15 @@ janus_streaming_mountpoint *janus_streaming_create_file_source( file_source_source->filename = g_strdup(filename); file_source->source = file_source_source; file_source->source_destroy = (GDestroyNotify) janus_streaming_file_source_free; - file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0; - file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000"); + if(strstr(filename, ".opus")) { + file_source_source->opus = TRUE; + file_source->codecs.audio_pt = doaudio ? acodec : -1; + file_source->codecs.audio_rtpmap = doaudio ? g_strdup(artpmap) : NULL; + file_source->codecs.audio_fmtp = doaudio ? (afmtp ? g_strdup(afmtp) : NULL) : NULL; + } else { + file_source->codecs.audio_pt = strstr(filename, ".alaw") ? 8 : 0; + file_source->codecs.audio_rtpmap = g_strdup(strstr(filename, ".alaw") ? "PCMA/8000" : "PCMU/8000"); + } file_source->codecs.video_pt = -1; /* FIXME We don't support video for this type yet */ file_source->codecs.video_rtpmap = NULL; file_source->viewers = NULL; @@ -5993,7 +6234,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp /* Resolved */ g_snprintf(vhost, sizeof(vhost), "%s", janus_network_address_string_from_buffer(&addr_buf)); - JANUS_LOG(LOG_WARN, " -- %s\n", vhost); + JANUS_LOG(LOG_VERB, " -- %s\n", vhost); break; } info = info->ai_next; @@ -6170,7 +6411,7 @@ static int janus_streaming_rtsp_connect_to_server(janus_streaming_mountpoint *mp /* Resolved */ g_snprintf(ahost, sizeof(ahost), "%s", janus_network_address_string_from_buffer(&addr_buf)); - JANUS_LOG(LOG_WARN, " -- %s\n", ahost); + JANUS_LOG(LOG_VERB, " -- %s\n", ahost); break; } info = info->ai_next; @@ -6466,7 +6707,7 @@ janus_streaming_mountpoint *janus_streaming_create_rtsp_source( } #endif -/* FIXME Thread to send RTP packets from a file (on demand) */ +/* Thread to send RTP packets from a file (on demand) */ static void *janus_streaming_ondemand_thread(void *data) { JANUS_LOG(LOG_VERB, "Filesource (on demand) RTP thread starting...\n"); janus_streaming_session *session = (janus_streaming_session *)data; @@ -6513,10 +6754,30 @@ static void *janus_streaming_ondemand_thread(void *data) { g_thread_unref(g_thread_self()); return NULL; } - JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename); - /* Buffer */ - char *buf = g_malloc0(1024); char *name = g_strdup(mountpoint->name ? mountpoint->name : "??"); + JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", name, source->filename); + +#ifdef HAVE_LIBOGG + /* Make sure that, if this is an .opus file, we can open it */ + janus_streaming_opus_context opusctx = { 0 }; + if(source->opus) { + opusctx.name = name; + opusctx.filename = source->filename; + opusctx.file = audio; + if(janus_streaming_opus_context_init(&opusctx) < 0) { + g_free(name); + fclose(audio); + janus_refcount_decrease(&session->ref); + janus_refcount_decrease(&mountpoint->ref); + g_thread_unref(g_thread_self()); + return NULL; + } + } +#endif + + /* Buffer */ + char buf[1500]; + memset(buf, 0, sizeof(buf)); /* Set up RTP */ gint16 seq = 1; gint32 ts = 0; @@ -6534,7 +6795,7 @@ static void *janus_streaming_ondemand_thread(void *data) { now.tv_usec = before.tv_usec; time_t passed, d_s, d_us; /* Loop */ - gint read = 0; + gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); janus_streaming_rtp_relay_packet packet; while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed) && !session->stopping && !g_atomic_int_get(&session->destroyed)) { /* See if it's time to prepare a frame */ @@ -6559,22 +6820,26 @@ static void *janus_streaming_ondemand_thread(void *data) { /* If not started or paused, wait some more */ if(!session->started || session->paused || !mountpoint->enabled) continue; - /* Read frame from file... */ - read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio); - if(feof(audio)) { - /* FIXME We're doing this forever... should this be configurable? */ - JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename); - fseek(audio, 0, SEEK_SET); - continue; + if(source->opus) { +#ifdef HAVE_LIBOGG + /* Get the next frame from the Opus file */ + read = janus_streaming_opus_context_read(&opusctx, buf + RTP_HEADER_SIZE, plen); +#endif + } else { + /* Read frame from file... */ + read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio); + if(feof(audio)) { + /* FIXME We're doing this forever... should this be configurable? */ + JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename); + fseek(audio, 0, SEEK_SET); + continue; + } } if(read < 0) break; if(mountpoint->active == FALSE) mountpoint->active = TRUE; - //~ JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n", - //~ header->type, ntohs(header->seq_number), ntohl(header->timestamp)); - //~ JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read); - /* Relay on all sessions */ + /* Relay to the listener */ packet.data = header; packet.length = RTP_HEADER_SIZE + read; packet.is_rtp = TRUE; @@ -6588,13 +6853,16 @@ static void *janus_streaming_ondemand_thread(void *data) { /* Update header */ seq++; header->seq_number = htons(seq); - ts += 160; + ts += (source->opus ? 960 : 160); header->timestamp = htonl(ts); header->markerbit = 0; } JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (ondemand) thread\n", name); +#ifdef HAVE_LIBOGG + if(source->opus) + janus_streaming_opus_context_cleanup(&opusctx); +#endif g_free(name); - g_free(buf); fclose(audio); janus_refcount_decrease(&session->ref); janus_refcount_decrease(&mountpoint->ref); @@ -6602,7 +6870,7 @@ static void *janus_streaming_ondemand_thread(void *data) { return NULL; } -/* FIXME Thread to send RTP packets from a file (live) */ +/* Thread to send RTP packets from a file (live) */ static void *janus_streaming_filesource_thread(void *data) { JANUS_LOG(LOG_VERB, "Filesource (live) thread starting...\n"); janus_streaming_mountpoint *mountpoint = (janus_streaming_mountpoint *)data; @@ -6633,10 +6901,29 @@ static void *janus_streaming_filesource_thread(void *data) { janus_refcount_decrease(&mountpoint->ref); return NULL; } + char *name = g_strdup(mountpoint->name ? mountpoint->name : "??"); JANUS_LOG(LOG_VERB, "[%s] Streaming audio file: %s\n", mountpoint->name, source->filename); + +#ifdef HAVE_LIBOGG + /* Make sure that, if this is an .opus file, we can open it */ + janus_streaming_opus_context opusctx = { 0 }; + if(source->opus) { + opusctx.name = name; + opusctx.filename = source->filename; + opusctx.file = audio; + if(janus_streaming_opus_context_init(&opusctx) < 0) { + g_free(name); + fclose(audio); + janus_refcount_decrease(&mountpoint->ref); + g_thread_unref(g_thread_self()); + return NULL; + } + } +#endif + /* Buffer */ - char *buf = g_malloc0(1024); - char *name = g_strdup(mountpoint->name ? mountpoint->name : "??"); + char buf[1500]; + memset(buf, 0, sizeof(buf)); /* Set up RTP */ gint16 seq = 1; gint32 ts = 0; @@ -6654,7 +6941,7 @@ static void *janus_streaming_filesource_thread(void *data) { now.tv_usec = before.tv_usec; time_t passed, d_s, d_us; /* Loop */ - gint read = 0; + gint read = 0, plen = (sizeof(buf)-RTP_HEADER_SIZE); janus_streaming_rtp_relay_packet packet; while(!g_atomic_int_get(&stopping) && !g_atomic_int_get(&mountpoint->destroyed)) { /* See if it's time to prepare a frame */ @@ -6679,21 +6966,25 @@ static void *janus_streaming_filesource_thread(void *data) { /* If paused, wait some more */ if(!mountpoint->enabled) continue; - /* Read frame from file... */ - read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio); - if(feof(audio)) { - /* FIXME We're doing this forever... should this be configurable? */ - JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename); - fseek(audio, 0, SEEK_SET); - continue; + if(source->opus) { +#ifdef HAVE_LIBOGG + /* Get the next frame from the Opus file */ + read = janus_streaming_opus_context_read(&opusctx, buf + RTP_HEADER_SIZE, plen); +#endif + } else { + /* Read frame from file... */ + read = fread(buf + RTP_HEADER_SIZE, sizeof(char), 160, audio); + if(feof(audio)) { + /* FIXME We're doing this forever... should this be configurable? */ + JANUS_LOG(LOG_VERB, "[%s] Rewind! (%s)\n", name, source->filename); + fseek(audio, 0, SEEK_SET); + continue; + } } if(read < 0) break; if(mountpoint->active == FALSE) mountpoint->active = TRUE; - // JANUS_LOG(LOG_VERB, " ... Preparing RTP packet (pt=%u, seq=%u, ts=%u)...\n", - // header->type, ntohs(header->seq_number), ntohl(header->timestamp)); - // JANUS_LOG(LOG_VERB, " ... Read %d bytes from the audio file...\n", read); /* Relay on all sessions */ packet.data = header; packet.length = RTP_HEADER_SIZE + read; @@ -6710,13 +7001,16 @@ static void *janus_streaming_filesource_thread(void *data) { /* Update header */ seq++; header->seq_number = htons(seq); - ts += 160; + ts += (source->opus ? 960 : 160); header->timestamp = htonl(ts); header->markerbit = 0; } JANUS_LOG(LOG_VERB, "[%s] Leaving filesource (live) thread\n", name); +#ifdef HAVE_LIBOGG + if(source->opus) + janus_streaming_opus_context_cleanup(&opusctx); +#endif g_free(name); - g_free(buf); fclose(audio); janus_refcount_decrease(&mountpoint->ref); return NULL;