Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send camera URL in archive command #63

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions archiver/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use satori_common::{camera_config::CamerasConfig, mqtt::MqttConfig};
use satori_common::mqtt::MqttConfig;
use satori_storage::StorageConfig;
use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds};
Expand All @@ -14,8 +14,5 @@ pub(crate) struct Config {

pub(crate) mqtt: MqttConfig,

#[serde(flatten)]
pub(crate) cameras: CamerasConfig,

pub(crate) storage: StorageConfig,
}
13 changes: 5 additions & 8 deletions archiver/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
#[derive(thiserror::Error, Debug)]
pub(crate) enum ArchiverError {
#[error("Storage error: {0}")]
StorageError(#[from] satori_storage::StorageError),

#[error("Camera not found")]
CameraNotFound,
Storage(#[from] satori_storage::StorageError),

#[error("Network error: {0}")]
NetworkError(#[from] reqwest::Error),
Network(#[from] reqwest::Error),

#[error("IO error: {0}")]
IOError(#[from] std::io::Error),
IO(#[from] std::io::Error),

#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
Serialization(#[from] serde_json::Error),

#[error("URL manipulation error")]
UrlError,
Url,
}

pub(crate) type ArchiverResult<T> = Result<T, ArchiverError>;
4 changes: 1 addition & 3 deletions archiver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod task;

use crate::config::Config;
use clap::Parser;
use satori_common::{camera_config::CamerasConfig, mqtt::MqttClient};
use satori_common::mqtt::MqttClient;
use std::{net::SocketAddr, path::PathBuf};
use tracing::{debug, info};

Expand All @@ -24,7 +24,6 @@ pub(crate) struct Cli {

struct Context {
storage: satori_storage::Provider,
cameras: CamerasConfig,
http_client: reqwest::Client,
}

Expand All @@ -39,7 +38,6 @@ async fn main() -> Result<(), ()> {

let context = Context {
storage: config.storage.create_provider(),
cameras: config.cameras,
http_client: reqwest::Client::new(),
};

Expand Down
67 changes: 32 additions & 35 deletions archiver/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{error::ArchiverResult, task::ArchiveTask, Context};
use futures::StreamExt;
use kagiyama::prometheus::registry::Registry;
use satori_common::{mqtt::PublishExt, ArchiveCommand, CameraSegments, Event};
use satori_common::{mqtt::PublishExt, ArchiveCommand, ArchiveSegmentsCommand, Event};
use std::{
collections::VecDeque,
fs::File,
Expand Down Expand Up @@ -141,13 +141,14 @@ impl ArchiveTaskQueue {
}

#[tracing::instrument(skip_all)]
fn handle_archive_segments_message(&mut self, segments: CameraSegments) {
fn handle_archive_segments_message(&mut self, msg: ArchiveSegmentsCommand) {
info!("Queueing archive video segments command");
for segment in segments.segment_list {
for segment in msg.segment_list {
debug!("Adding video segment to queue: {}", segment.display());
self.queue
.push_back(ArchiveTask::CameraSegment(crate::task::CameraSegment {
camera_name: segments.name.clone(),
camera_name: msg.camera_name.clone(),
camera_url: msg.camera_url.clone(),
filename: segment,
}));
}
Expand Down Expand Up @@ -265,7 +266,8 @@ mod test {
use super::*;
use chrono::Utc;
use rumqttc::{Publish, QoS};
use satori_common::EventMetadata;
use satori_common::{ArchiveCommand, ArchiveSegmentsCommand, EventMetadata, Message};
use url::Url;

#[test]
fn test_load_bad_file_gives_empty_queue() {
Expand All @@ -279,12 +281,11 @@ mod test {
let mut queue = ArchiveTaskQueue::default();
assert!(queue.queue.is_empty());

let msg = satori_common::Message::ArchiveCommand(satori_common::ArchiveCommand::Segments(
satori_common::CameraSegments {
name: "camera-1".into(),
segment_list: vec![],
},
));
let msg = Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: "camera-1".into(),
camera_url: Url::parse("http://localhost:8080/stream.m3u8").unwrap(),
segment_list: vec![],
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);
assert!(queue.queue.is_empty());
Expand All @@ -295,12 +296,11 @@ mod test {
let mut queue = ArchiveTaskQueue::default();
assert!(queue.queue.is_empty());

let msg = satori_common::Message::ArchiveCommand(satori_common::ArchiveCommand::Segments(
satori_common::CameraSegments {
name: "camera-1".into(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
},
));
let msg = Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: "camera-1".into(),
camera_url: Url::parse("http://localhost:8080/stream.m3u8").unwrap(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);
assert_eq!(queue.queue.len(), 2);
Expand Down Expand Up @@ -330,28 +330,25 @@ mod test {
let mut queue = ArchiveTaskQueue::default();

// Add an event to the queue
let msg = satori_common::Message::ArchiveCommand(
satori_common::ArchiveCommand::EventMetadata(Event {
metadata: EventMetadata {
id: "test-1".into(),
timestamp: Utc::now().into(),
},
start: Utc::now().into(),
end: Utc::now().into(),
reasons: Default::default(),
cameras: Default::default(),
}),
);
let msg = Message::ArchiveCommand(ArchiveCommand::EventMetadata(Event {
metadata: EventMetadata {
id: "test-1".into(),
timestamp: Utc::now().into(),
},
start: Utc::now().into(),
end: Utc::now().into(),
reasons: Default::default(),
cameras: Default::default(),
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);

// Add two segments to the queue
let msg = satori_common::Message::ArchiveCommand(satori_common::ArchiveCommand::Segments(
satori_common::CameraSegments {
name: "camera-1".into(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
},
));
let msg = Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: "camera-1".into(),
camera_url: Url::parse("http://localhost:8080/stream.m3u8").unwrap(),
segment_list: vec!["one.ts".into(), "two.ts".into()],
}));
let msg = Publish::new("", QoS::ExactlyOnce, serde_json::to_string(&msg).unwrap());
queue.handle_mqtt_message(msg);

Expand Down
11 changes: 4 additions & 7 deletions archiver/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,14 @@ impl ArchiveTask {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct CameraSegment {
pub(crate) camera_name: String,
pub(crate) camera_url: Url,
pub(crate) filename: PathBuf,
}

impl CameraSegment {
#[tracing::instrument(skip_all)]
pub(crate) async fn get(&self, context: &Context) -> ArchiverResult<Bytes> {
let url = context
.cameras
.get_url(&self.camera_name)
.ok_or(ArchiverError::CameraNotFound)?;
let url = get_segment_url(url, &self.filename)?;
let url = get_segment_url(self.camera_url.clone(), &self.filename)?;
debug!("Segment URL: {url}");

let req = context.http_client.get(url).send().await?;
Expand All @@ -66,9 +63,9 @@ impl CameraSegment {
fn get_segment_url(hls_url: Url, segment_filename: &Path) -> ArchiverResult<Url> {
let mut url = hls_url;
url.path_segments_mut()
.map_err(|_| ArchiverError::UrlError)?
.map_err(|_| ArchiverError::Url)?
.pop()
.push(segment_filename.to_str().ok_or(ArchiverError::UrlError)?);
.push(segment_filename.to_str().ok_or(ArchiverError::Url)?);
Ok(url)
}

Expand Down
50 changes: 2 additions & 48 deletions common/src/camera_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use url::Url;

Expand All @@ -8,13 +8,6 @@ pub struct CamerasConfig {
}

impl CamerasConfig {
pub fn get_url(&self, camera_name: &str) -> Option<Url> {
self.cameras
.iter()
.find(|c| c.name == camera_name)
.map(|c| c.url.clone())
}

pub fn into_map(self) -> HashMap<String, Url> {
let mut ret = HashMap::new();
for c in self.cameras {
Expand All @@ -24,47 +17,8 @@ impl CamerasConfig {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CameraConfig {
name: String,
url: Url,
}

#[cfg(test)]
mod test {
use super::*;

fn get_test_cameras_config() -> CamerasConfig {
CamerasConfig {
cameras: vec![
CameraConfig {
name: "camera-1".into(),
url: Url::parse("http://camera-1/stream.m3u8").unwrap(),
},
CameraConfig {
name: "camera-2".into(),
url: Url::parse("http://camera-2/stream.m3u8").unwrap(),
},
CameraConfig {
name: "camera-3".into(),
url: Url::parse("http://camera-3/stream.m3u8").unwrap(),
},
],
}
}

#[test]
fn test_cameras_config_get_url() {
let config = get_test_cameras_config();
assert_eq!(
config.get_url("camera-1"),
Some(Url::parse("http://camera-1/stream.m3u8").unwrap())
);
}

#[test]
fn test_cameras_config_get_url_fail() {
let config = get_test_cameras_config();
assert_eq!(config.get_url("camera-nope"), None);
}
}
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod event;
pub use self::event::{CameraSegments, Event, EventMetadata, EventReason};

mod message_schema;
pub use self::message_schema::{ArchiveCommand, Message, TriggerCommand};
pub use self::message_schema::{ArchiveCommand, ArchiveSegmentsCommand, Message, TriggerCommand};

pub mod mqtt;

Expand Down
12 changes: 10 additions & 2 deletions common/src/message_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use chrono::{DateTime, FixedOffset};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use std::time::Duration;
use std::{path::PathBuf, time::Duration};
use url::Url;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
Expand Down Expand Up @@ -37,5 +38,12 @@ pub struct TriggerCommand {
#[serde(tag = "kind", content = "data", rename_all = "snake_case")]
pub enum ArchiveCommand {
EventMetadata(crate::event::Event),
Segments(crate::event::CameraSegments),
Segments(ArchiveSegmentsCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchiveSegmentsCommand {
pub camera_name: String,
pub camera_url: Url,
pub segment_list: Vec<PathBuf>,
}
1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ satori-storage.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
18 changes: 12 additions & 6 deletions ctl/src/cli/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use async_trait::async_trait;
use clap::{Parser, Subcommand};
use satori_common::{
mqtt::{AsyncClientExt, MqttClient, MqttConfig, PublishExt},
ArchiveCommand, CameraSegments, Event, EventMetadata, Message, Trigger,
ArchiveCommand, ArchiveSegmentsCommand, Event, EventMetadata, Message, Trigger,
};
use std::{path::PathBuf, time::Duration};
use tracing::{info, warn};
use url::Url;

/// Debugging operations.
#[derive(Debug, Clone, Parser)]
Expand Down Expand Up @@ -70,11 +71,12 @@ impl CliExecute for DebugCommand {
mqtt_client.poll_until_message_is_sent().await;
}
DebugSubcommand::ArchiveSegments(cmd) => {
let segments = CameraSegments {
name: cmd.camera.clone(),
segment_list: cmd.filename.clone(),
};
let message = Message::ArchiveCommand(ArchiveCommand::Segments(segments));
let message =
Message::ArchiveCommand(ArchiveCommand::Segments(ArchiveSegmentsCommand {
camera_name: cmd.camera.clone(),
camera_url: cmd.url.clone(),
segment_list: cmd.filename.clone(),
}));

let mut client = mqtt_client.client();
let topic = mqtt_client.topic();
Expand Down Expand Up @@ -111,6 +113,10 @@ pub(crate) struct DebugArchiveSegmentsCommand {
#[arg(long)]
camera: String,

/// URL of the camera's HLS stream.
#[arg(long)]
url: Url,

/// Filenames of segments to archive.
filename: Vec<PathBuf>,
}
Loading