Skip to content

Commit

Permalink
add publish api for gb(/gb/v1/publish)
Browse files Browse the repository at this point in the history
  • Loading branch information
duiniuluantanqin committed Feb 19, 2024
1 parent d768cbf commit 2cdeb6f
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 46 deletions.
262 changes: 221 additions & 41 deletions trunk/src/app/srs_app_gb28181.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_protocol_raw_avc.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_app_server.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_http_hooks.hpp>

#include <sstream>
using namespace std;
Expand Down Expand Up @@ -218,11 +224,6 @@ void SrsLazyGbSession::on_media_transport(SrsLazyObjectWrapper<SrsLazyGbMediaTcp
media_->resource()->set_cid(cid_);
}

SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn> *SrsLazyGbSession::media_transport()
{
return media_;
}

std::string SrsLazyGbSession::pip()
{
return pip_;
Expand Down Expand Up @@ -455,6 +456,24 @@ srs_error_t SrsGbListener::listen()
return srs_error_wrap(err, "listen");
}

if ((err = listen_api()) != srs_success) {
return srs_error_wrap(err, "listen api");
}

return err;
}

srs_error_t SrsGbListener::listen_api()
{
srs_error_t err = srs_success;

// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
ISrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();

if ((err = http_api_mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

return err;
}

Expand All @@ -481,7 +500,7 @@ srs_error_t SrsGbListener::on_tcp_client(ISrsListener* listener, srs_netfd_t stf
} else if (listener == media_listener_) {
SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>* conn = new SrsLazyObjectWrapper<SrsLazyGbMediaTcpConn>();
SrsLazyGbMediaTcpConn* resource = dynamic_cast<SrsLazyGbMediaTcpConn*>(conn->resource());
resource->setup(conf_, stfd);
resource->setup(stfd);

if ((err = resource->start()) != srs_success) {
srs_freep(conn);
Expand Down Expand Up @@ -546,6 +565,11 @@ std::string SrsLazyGbSipTcpConn::device_id()
return register_->device_id();
}

void SrsLazyGbSipTcpConn::set_device_id(const std::string &id)
{
register_->from_address_user_ = id;
}

void SrsLazyGbSipTcpConn::set_cid(const SrsContextId& cid)
{
trd_->set_cid(cid);
Expand Down Expand Up @@ -1231,12 +1255,10 @@ SrsLazyGbMediaTcpConn::SrsLazyGbMediaTcpConn(SrsLazyObjectWrapper<SrsLazyGbMedia
trd_ = new SrsSTCoroutine("media", this);
buffer_ = new uint8_t[65535];
conn_ = NULL;
conf_ = NULL;

session_ = NULL;
connected_ = false;
nn_rtcp_ = 0;
ssrc_ = 0;
}

SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn()
Expand All @@ -1246,16 +1268,12 @@ SrsLazyGbMediaTcpConn::~SrsLazyGbMediaTcpConn()
srs_freepa(buffer_);
srs_freep(pack_);
srs_freep(session_);
srs_freep(conf_);
}

void SrsLazyGbMediaTcpConn::setup(SrsConfDirective* conf, srs_netfd_t stfd)
void SrsLazyGbMediaTcpConn::setup(srs_netfd_t stfd)
{
srs_freep(conn_);
conn_ = new SrsTcpConnection(stfd);

srs_freep(conf_);
conf_ = conf->copy();
}

bool SrsLazyGbMediaTcpConn::is_connected()
Expand Down Expand Up @@ -1283,11 +1301,6 @@ std::string SrsLazyGbMediaTcpConn::desc()
return "GB-Media-TCP";
}

uint32_t SrsLazyGbMediaTcpConn::ssrc()
{
return ssrc_;
}

srs_error_t SrsLazyGbMediaTcpConn::start()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -1477,28 +1490,12 @@ srs_error_t SrsLazyGbMediaTcpConn::bind_session(uint32_t ssrc, SrsLazyObjectWrap
srs_assert(wrapper); // It MUST never be NULL, because this method is in the cycle of coroutine.

// Find exists session for register, might be created by another object and still alive.
SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (!session) {
// Create new GB session.
session = new SrsLazyObjectWrapper<SrsLazyGbSession>();

if ((err = session->resource()->initialize(conf_)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "initialize");
}

if ((err = session->resource()->start()) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "start");
}

_srs_gb_manager->add_with_id(std::to_string(ssrc), session);
}
SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc)); // TODO: same ssrc?
if (!session) return err;

_srs_gb_manager->add_with_fast_id(ssrc, session);
session->resource()->on_media_transport(wrapper);
*psession = session->copy();
ssrc_ = ssrc;

return err;
}
Expand Down Expand Up @@ -2127,11 +2124,7 @@ srs_error_t SrsGbMuxer::connect()
// Cleanup the data before connect again.
close();

string stream = session_->sip_transport()->resource()->device_id();
if (stream.empty()) {
stream = std::to_string(session_->media_transport()->resource()->ssrc());
}
string url = srs_string_replace(output_, "[stream]", stream);
string url = srs_string_replace(output_, "[stream]", session_->sip_transport()->resource()->device_id());
srs_trace("Muxer: Convert GB to RTMP %s", url.c_str());

srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
Expand Down Expand Up @@ -2734,3 +2727,190 @@ void srs_sip_parse_address(const std::string& address, std::string& user, std::s

SrsResourceManager* _srs_gb_manager = NULL;

SrsGoApiGbPublish::SrsGoApiGbPublish(SrsConfDirective* conf)
{
conf_ = conf->copy();
security_ = new SrsSecurity();
}

SrsGoApiGbPublish::~SrsGoApiGbPublish()
{
srs_freep(conf_);
srs_freep(security_);
}

// Request:
// POST /gb/v1/publish/
// {
// "clientip": "...","
// "stream": "...",
// "ssrc": "..."
// }
// Response:
// {"port":9000, "is_tcp": true}
srs_error_t SrsGoApiGbPublish::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r)
{
srs_error_t err = srs_success;

SrsJsonObject* res = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, res);

if ((err = do_serve_http(w, r, res)) != srs_success) {
srs_warn("GB error %s", srs_error_desc(err).c_str()); srs_freep(err);
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}

return srs_api_response(w, r, res->dumps());
}

srs_error_t SrsGoApiGbPublish::do_serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, SrsJsonObject *res)
{
srs_error_t err = srs_success;

// For each GB session, we use short-term HTTP connection.
w->header()->set("Connection", "Close");

// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
SrsAutoFree(SrsJsonObject, req);
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
return srs_error_wrap(err, "read body");
}

SrsJsonAny* json = SrsJsonAny::loads(req_json);
if (!json || !json->is_object()) {
return srs_error_new(ERROR_HTTP_DATA_INVALID, "invalid body %s", req_json.c_str());
}

req = json->to_object();
}

// Fetch params from req object.
SrsJsonAny* prop = NULL;
if ((prop = req->ensure_property_string("stream")) == NULL) {
return srs_error_wrap(err, "not stream");
}
string stream = prop->to_str();

if ((prop = req->ensure_property_string("ssrc")) == NULL) {
return srs_error_wrap(err, "not ssrc");
}
uint64_t ssrc = atoi(prop->to_str().c_str());

string clientip;
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
if (clientip.empty()){
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
// Overwrite by ip from proxy.
string oip = srs_get_original_ip(r);
if (!oip.empty()) {
clientip = oip;
}
}
// create temp Request, for security check and http hooks.
SrsRequest* request = new SrsRequest();
SrsAutoFree(SrsRequest, request);
request->ip = clientip;

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(request->vhost);
if (parsed_vhost) {
request->vhost = parsed_vhost->arg0();
}

if ((err = security_->check(SrsRtmpConnFlashPublish, clientip, request)) != srs_success) {
return srs_error_wrap(err, "security check");
}

// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_publish(request)) != srs_success) {
return srs_error_wrap(err, "http_hooks_on_publish");
}

if ((err = bind_session(stream, ssrc)) != srs_success) {
return srs_error_wrap(err, "bind session");
}

srs_trace("GB publish stream: %s, ssrc=%lu, clientip=%s", stream.c_str(), ssrc, clientip.c_str());

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
res->set("server", SrsJsonAny::str(SrsStatistic::instance()->server_id().c_str()));
res->set("service", SrsJsonAny::str(SrsStatistic::instance()->service_id().c_str()));
res->set("pid", SrsJsonAny::str(SrsStatistic::instance()->service_pid().c_str()));

int port = _srs_config->get_stream_caster_listen(conf_);
res->set("port", SrsJsonAny::integer(port));
res->set("is_tcp", SrsJsonAny::boolean(true)); // only tcp supported

return err;
}

srs_error_t SrsGoApiGbPublish::bind_session(std::string stream, uint64_t ssrc)
{
srs_error_t err = srs_success;

SrsLazyObjectWrapper<SrsLazyGbSession>* session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_id(stream));
if (session) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "stream already exists");
}

session = dynamic_cast<SrsLazyObjectWrapper<SrsLazyGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (session) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "ssrc already exists");
}

// Create new GB session.
session = new SrsLazyObjectWrapper<SrsLazyGbSession>();

if ((err = session->resource()->initialize(conf_)) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "initialize");
}

if ((err = session->resource()->start()) != srs_success) {
srs_freep(session);
return srs_error_wrap(err, "start");
}

session->resource()->sip_transport()->resource()->set_device_id(stream);

_srs_gb_manager->add_with_id(stream, session);
_srs_gb_manager->add_with_fast_id(ssrc, session);

return err;
}

srs_error_t SrsGoApiGbPublish::http_hooks_on_publish(SrsRequest *req)
{
srs_error_t err = srs_success;

if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}

// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;

if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_publish(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}

for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = SrsHttpHooks::on_publish(url, req)) != srs_success) {
return srs_error_wrap(err, "on_publish %s", url.c_str());
}
}

return err;
}
Loading

0 comments on commit 2cdeb6f

Please sign in to comment.