diff --git a/Cargo.lock b/Cargo.lock index a4398a7..c315a1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2309,6 +2309,7 @@ dependencies = [ "serde", "serde_with", "tokio", + "tokio-stream", "tokio-util", "tower-http", "tracing", @@ -2966,6 +2967,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 39b9f87..389d781 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ serde_json = "1.0.111" tempfile = "3.9.0" thiserror = "1.0.56" tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "process"] } +tokio-stream = { version = "0.1.14", features = ["sync"] } tokio-util = { version = "0.7.10", features = ["codec"] } toml = "0.8" tower-http = { version = "0.5.1", features = ["fs"] } diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 6ed5483..bff6d5b 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -18,6 +18,7 @@ satori-common.workspace = true serde.workspace = true serde_with.workspace = true tokio.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true tower-http.workspace = true tracing.workspace = true diff --git a/agent/src/ffmpeg/streamer.rs b/agent/src/ffmpeg/streamer.rs index 73686a5..9fe0e1f 100644 --- a/agent/src/ffmpeg/streamer.rs +++ b/agent/src/ffmpeg/streamer.rs @@ -1,4 +1,5 @@ use crate::{config::Config, jpeg_frame_decoder::JpegFrameDecoder}; +use bytes::Bytes; use futures::StreamExt; use nix::{ sys::signal::{self, Signal}, @@ -11,7 +12,7 @@ use std::{ use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, - sync::broadcast, + sync::broadcast::Sender, task::JoinHandle, }; use tokio_util::codec::FramedRead; @@ -24,26 +25,20 @@ pub(crate) struct Streamer { terminate: Arc>, ffmpeg_pid: Arc>>, handle: Option>, - jpeg_tx: broadcast::Sender, + jpeg_tx: Sender, } impl Streamer { - pub(crate) fn new(config: Config) -> Self { - let (tx, _) = broadcast::channel(8); - + pub(crate) fn new(config: Config, jpeg_tx: Sender) -> Self { Self { config, terminate: Arc::new(Mutex::new(false)), ffmpeg_pid: Default::default(), handle: None, - jpeg_tx: tx, + jpeg_tx, } } - pub(crate) fn jpeg_subscribe(&self) -> broadcast::Receiver { - self.jpeg_tx.subscribe() - } - #[tracing::instrument(skip_all)] pub(crate) async fn start(&mut self) { let config = self.config.clone(); diff --git a/agent/src/main.rs b/agent/src/main.rs index 998dcd7..afa9b99 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -4,12 +4,13 @@ mod jpeg_frame_decoder; mod utils; use axum::{ + body::Body, http::header, response::{Html, IntoResponse}, routing::get, Router, }; -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use clap::Parser; use metrics_exporter_prometheus::PrometheusBuilder; use std::{ @@ -19,7 +20,8 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; -use tokio::{net::TcpListener, task::JoinSet}; +use tokio::net::TcpListener; +use tokio_stream::wrappers::BroadcastStream; use tower_http::services::ServeDir; use tracing::{debug, info, warn}; @@ -85,27 +87,30 @@ async fn main() { // Create video output directory fs::create_dir_all(&config.video_directory).expect("should be able to create output directory"); + // Channel for JPEG frames + let (jpeg_tx, mut jpeg_rx) = tokio::sync::broadcast::channel(8); + // Start streamer - let mut streamer = ffmpeg::Streamer::new(config.clone()); + let mut streamer = ffmpeg::Streamer::new(config.clone(), jpeg_tx); streamer.start().await; - let mut tasks = JoinSet::<()>::new(); - // Configure HTTP server listener let listener = TcpListener::bind(&cli.http_server_address) .await .unwrap_or_else(|_| panic!("tcp listener should bind to {}", cli.http_server_address)); + // Configure HTTP server endpoints let frame_image = SharedImageData::default(); + let (jpeg_multipart_tx, _) = tokio::sync::broadcast::channel::(8); - // Configure HTTP server routes let app = { let frame_image = frame_image.clone(); + let jpeg_multipart_tx = jpeg_multipart_tx.clone(); Router::new() .route("/player", get(Html(include_str!("player.html")))) .route( - "/frame.jpg", + "/jpeg", get(move || async move { match frame_image.lock().unwrap().as_ref() { Some(image) => { @@ -115,34 +120,49 @@ async fn main() { } }), ) + .route( + "/mjpeg", + get(move || async move { + let stream = BroadcastStream::new(jpeg_multipart_tx.subscribe()); + let body = Body::from_stream(stream); + + ( + [( + header::CONTENT_TYPE, + "multipart/x-mixed-replace; boundary=frame", + )], + body, + ) + .into_response() + }), + ) .nest_service("/", ServeDir::new(config.video_directory.clone())) }; // Start HTTP server info!("Starting HTTP server on {}", cli.http_server_address); - tasks.spawn(async move { + let server_handle = tokio::spawn(async move { axum::serve(listener, app).await.unwrap(); }); let mut metrics_interval = tokio::time::interval(Duration::from_secs(30)); - let mut jpeg_rx = streamer.jpeg_subscribe(); loop { tokio::select! { Ok(image) = jpeg_rx.recv() => { + let mut body = bytes::BytesMut::new(); + body.put_slice(b"--frame\r\n"); + body.put_slice(format!("{}: image/jpeg\r\n", header::CONTENT_TYPE).as_bytes()); + body.put_slice(format!("{}: {}\r\n", header::CONTENT_LENGTH, image.len()).as_bytes()); + body.put_slice(b"\r\n"); + body.put_slice(&image); + let _ = jpeg_multipart_tx.send(body.into()); + frame_image.lock().unwrap().replace(image); } _ = metrics_interval.tick() => { update_segment_count_metric(&config); update_disk_usage_metric(&config); } - task = tasks.join_next() => { - match task { - None => tokio::time::sleep(Duration::from_secs(5)).await, - Some(task) => { - info!("Task exits: {:?}", task); - } - } - } _ = tokio::signal::ctrl_c() => { info!("Exiting"); break; @@ -153,7 +173,10 @@ async fn main() { // Stop streamer streamer.stop().await; - tasks.shutdown().await; + // Stop server + info!("Stopping HTTP server"); + server_handle.abort(); + let _ = server_handle.await; } #[tracing::instrument(skip_all)]