Skip to content

Commit

Permalink
Send camera URL with archive command
Browse files Browse the repository at this point in the history
Avoids the need to store camera configs in the archiver config.
  • Loading branch information
DanNixon committed Dec 12, 2023
1 parent f9bad44 commit 2c47c17
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 175 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions archiver/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use satori_common::{camera_config::CamerasConfig, mqtt::MqttConfig};
use satori_common::mqtt::MqttConfig;
use satori_storage::StorageConfig;
use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds};
Expand All @@ -14,8 +14,5 @@ pub(crate) struct Config {

pub(crate) mqtt: MqttConfig,

#[serde(flatten)]
pub(crate) cameras: CamerasConfig,

pub(crate) storage: StorageConfig,
}
13 changes: 5 additions & 8 deletions archiver/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
#[derive(thiserror::Error, Debug)]
pub(crate) enum ArchiverError {
#[error("Storage error: {0}")]
StorageError(#[from] satori_storage::StorageError),

#[error("Camera not found")]
CameraNotFound,
Storage(#[from] satori_storage::StorageError),

#[error("Network error: {0}")]
NetworkError(#[from] reqwest::Error),
Network(#[from] reqwest::Error),

#[error("IO error: {0}")]
IOError(#[from] std::io::Error),
IO(#[from] std::io::Error),

#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
Serialization(#[from] serde_json::Error),

#[error("URL manipulation error")]
UrlError,
Url,
}

pub(crate) type ArchiverResult<T> = Result<T, ArchiverError>;
4 changes: 1 addition & 3 deletions archiver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod task;

use crate::config::Config;
use clap::Parser;
use satori_common::{camera_config::CamerasConfig, mqtt::MqttClient};
use satori_common::mqtt::MqttClient;
use std::{net::SocketAddr, path::PathBuf};
use tracing::{debug, info};

Expand All @@ -24,7 +24,6 @@ pub(crate) struct Cli {

struct Context {
storage: satori_storage::Provider,
cameras: CamerasConfig,
http_client: reqwest::Client,
}

Expand All @@ -39,7 +38,6 @@ async fn main() -> Result<(), ()> {

let context = Context {
storage: config.storage.create_provider(),
cameras: config.cameras,
http_client: reqwest::Client::new(),
};

Expand Down
67 changes: 32 additions & 35 deletions archiver/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{error::ArchiverResult, task::ArchiveTask, Context};
use futures::StreamExt;
use kagiyama::prometheus::registry::Registry;
use satori_common::{mqtt::PublishExt, ArchiveCommand, CameraSegments, Event};
use satori_common::{mqtt::PublishExt, ArchiveCommand, ArchiveSegmentsCommand, Event};
use std::{
collections::VecDeque,
fs::File,
Expand Down Expand Up @@ -141,13 +141,14 @@ impl ArchiveTaskQueue {
}

#[tracing::instrument(skip_all)]
fn handle_archive_segments_message(&mut self, segments: CameraSegments) {
fn handle_archive_segments_message(&mut self, msg: ArchiveSegmentsCommand) {
info!("Queueing archive video segments command");
for segment in segments.segment_list {
for segment in msg.segment_list {
debug!("Adding video segment to queue: {}", segment.display());
self.queue
.push_back(ArchiveTask::CameraSegment(crate::task::CameraSegment {
camera_name: segments.name.clone(),
camera_name: msg.camera_name.clone(),
camera_url: msg.camera_url.clone(),
filename: segment,
}));
}
Expand Down Expand Up @@ -265,7 +266,8 @@ mod test {
use super::*;
use chrono::Utc;
use rumqttc::{Publish, QoS};
use satori_common::EventMetadata;
use satori_common::{ArchiveCommand, ArchiveSegmentsCommand, EventMetadata, Message};
use url::Url;

#[test]
fn test_load_bad_file_gives_empty_queue() {
Expand All @@ -279,12 +281,11 @@ mod test {
let mut queue = ArchiveTaskQueue::default();
assert!(queue.queue.is_empty());

let msg = satori_common::Message::ArchiveCommand(satori_common::ArchiveCommand::Segments(
satori_common::CameraSegments {
name: "camera-1".into(),
segment_list: vec![],
},
));
let msg = Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: "camera-1".into(),
camera_url: Url::parse("http://localhost:8080/stream.m3u8").unwrap(),
segment_list: vec![],
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);
assert!(queue.queue.is_empty());
Expand All @@ -295,12 +296,11 @@ mod test {
let mut queue = ArchiveTaskQueue::default();
assert!(queue.queue.is_empty());

let msg = satori_common::Message::ArchiveCommand(satori_common::ArchiveCommand::Segments(
satori_common::CameraSegments {
name: "camera-1".into(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
},
));
let msg = Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: "camera-1".into(),
camera_url: Url::parse("http://localhost:8080/stream.m3u8").unwrap(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);
assert_eq!(queue.queue.len(), 2);
Expand Down Expand Up @@ -330,28 +330,25 @@ mod test {
let mut queue = ArchiveTaskQueue::default();

// Add an event to the queue
let msg = satori_common::Message::ArchiveCommand(
satori_common::ArchiveCommand::EventMetadata(Event {
metadata: EventMetadata {
id: "test-1".into(),
timestamp: Utc::now().into(),
},
start: Utc::now().into(),
end: Utc::now().into(),
reasons: Default::default(),
cameras: Default::default(),
}),
);
let msg = Message::ArchiveCommand(ArchiveCommand::EventMetadata(Event {
metadata: EventMetadata {
id: "test-1".into(),
timestamp: Utc::now().into(),
},
start: Utc::now().into(),
end: Utc::now().into(),
reasons: Default::default(),
cameras: Default::default(),
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);

// Add two segments to the queue
let msg = satori_common::Message::ArchiveCommand(satori_common::ArchiveCommand::Segments(
satori_common::CameraSegments {
name: "camera-1".into(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
},
));
let msg = Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: "camera-1".into(),
camera_url: Url::parse("http://localhost:8080/stream.m3u8").unwrap(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);

Expand Down
11 changes: 4 additions & 7 deletions archiver/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,14 @@ impl ArchiveTask {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct CameraSegment {
pub(crate) camera_name: String,
pub(crate) camera_url: Url,
pub(crate) filename: PathBuf,
}

impl CameraSegment {
#[tracing::instrument(skip_all)]
pub(crate) async fn get(&self, context: &Context) -> ArchiverResult<Bytes> {
let url = context
.cameras
.get_url(&self.camera_name)
.ok_or(ArchiverError::CameraNotFound)?;
let url = get_segment_url(url, &self.filename)?;
let url = get_segment_url(self.camera_url.clone(), &self.filename)?;
debug!("Segment URL: {url}");

let req = context.http_client.get(url).send().await?;
Expand All @@ -66,9 +63,9 @@ impl CameraSegment {
fn get_segment_url(hls_url: Url, segment_filename: &Path) -> ArchiverResult<Url> {
let mut url = hls_url;
url.path_segments_mut()
.map_err(|_| ArchiverError::UrlError)?
.map_err(|_| ArchiverError::Url)?
.pop()
.push(segment_filename.to_str().ok_or(ArchiverError::UrlError)?);
.push(segment_filename.to_str().ok_or(ArchiverError::Url)?);
Ok(url)
}

Expand Down
50 changes: 2 additions & 48 deletions common/src/camera_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use url::Url;

Expand All @@ -8,13 +8,6 @@ pub struct CamerasConfig {
}

impl CamerasConfig {
pub fn get_url(&self, camera_name: &str) -> Option<Url> {
self.cameras
.iter()
.find(|c| c.name == camera_name)
.map(|c| c.url.clone())
}

pub fn into_map(self) -> HashMap<String, Url> {
let mut ret = HashMap::new();
for c in self.cameras {
Expand All @@ -24,47 +17,8 @@ impl CamerasConfig {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CameraConfig {
name: String,
url: Url,
}

#[cfg(test)]
mod test {
use super::*;

fn get_test_cameras_config() -> CamerasConfig {
CamerasConfig {
cameras: vec![
CameraConfig {
name: "camera-1".into(),
url: Url::parse("http://camera-1/stream.m3u8").unwrap(),
},
CameraConfig {
name: "camera-2".into(),
url: Url::parse("http://camera-2/stream.m3u8").unwrap(),
},
CameraConfig {
name: "camera-3".into(),
url: Url::parse("http://camera-3/stream.m3u8").unwrap(),
},
],
}
}

#[test]
fn test_cameras_config_get_url() {
let config = get_test_cameras_config();
assert_eq!(
config.get_url("camera-1"),
Some(Url::parse("http://camera-1/stream.m3u8").unwrap())
);
}

#[test]
fn test_cameras_config_get_url_fail() {
let config = get_test_cameras_config();
assert_eq!(config.get_url("camera-nope"), None);
}
}
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod event;
pub use self::event::{CameraSegments, Event, EventMetadata, EventReason};

mod message_schema;
pub use self::message_schema::{ArchiveCommand, Message, TriggerCommand};
pub use self::message_schema::{ArchiveCommand, ArchiveSegmentsCommand, Message, TriggerCommand};

pub mod mqtt;

Expand Down
12 changes: 10 additions & 2 deletions common/src/message_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use chrono::{DateTime, FixedOffset};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use std::time::Duration;
use std::{path::PathBuf, time::Duration};
use url::Url;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
Expand Down Expand Up @@ -37,5 +38,12 @@ pub struct TriggerCommand {
#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
pub enum ArchiveCommand {
EventMetadata(crate::event::Event),
Segments(crate::event::CameraSegments),
Segments(ArchiveSegmentsCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchiveSegmentsCommand {
pub camera_name: String,
pub camera_url: Url,
pub segment_list: Vec<PathBuf>,
}
1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ satori-storage.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
18 changes: 12 additions & 6 deletions ctl/src/cli/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use async_trait::async_trait;
use clap::{Parser, Subcommand};
use satori_common::{
mqtt::{AsyncClientExt, MqttClient, MqttConfig, PublishExt},
ArchiveCommand, CameraSegments, Event, EventMetadata, Message, Trigger,
ArchiveCommand, ArchiveSegmentsCommand, Event, EventMetadata, Message, Trigger,
};
use std::{path::PathBuf, time::Duration};
use tracing::{info, warn};
use url::Url;

/// Debugging operations.
#[derive(Debug, Clone, Parser)]
Expand Down Expand Up @@ -70,11 +71,12 @@ impl CliExecute for DebugCommand {
mqtt_client.poll_until_message_is_sent().await;
}
DebugSubcommand::ArchiveSegments(cmd) => {
let segments = CameraSegments {
name: cmd.camera.clone(),
segment_list: cmd.filename.clone(),
};
let message = Message::ArchiveCommand(ArchiveCommand::Segments(segments));
let message =
Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: cmd.camera.clone(),
camera_url: cmd.url.clone(),
segment_list: cmd.filename.clone(),
}));

let mut client = mqtt_client.client();
let topic = mqtt_client.topic();
Expand Down Expand Up @@ -111,6 +113,10 @@ pub(crate) struct DebugArchiveSegmentsCommand {
#[arg(long)]
camera: String,

/// URL of the camera's HLS stream.
#[arg(long)]
url: Url,

/// Filenames of segments to archive.
filename: Vec<PathBuf>,
}
Loading

0 comments on commit 2c47c17

Please sign in to comment.