From 08fddd3f9c40cc31995bd0f001a64531882b243c Mon Sep 17 00:00:00 2001 From: Haakon Sporsheim Date: Tue, 7 Jan 2025 11:56:40 +0100 Subject: [PATCH] track_local: Get rid of some unnecessary Mutexes and Options --- webrtc/src/rtp_transceiver/rtp_sender/mod.rs | 76 +++++++++---------- webrtc/src/track/track_local/mod.rs | 10 +-- .../track_local/track_local_static_rtp.rs | 16 ++-- 3 files changed, 48 insertions(+), 54 deletions(-) diff --git a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs index 9b8585bfe..d1e35d18f 100644 --- a/webrtc/src/rtp_transceiver/rtp_sender/mod.rs +++ b/webrtc/src/rtp_transceiver/rtp_sender/mod.rs @@ -25,9 +25,7 @@ use crate::rtp_transceiver::{ create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters, RTCRtpTransceiver, SSRC, }; -use crate::track::track_local::{ - InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext, TrackLocalWriter, -}; +use crate::track::track_local::{InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext}; pub(crate) struct RTPSenderInternal { pub(crate) stop_called_rx: Arc, @@ -38,8 +36,8 @@ pub(crate) struct TrackEncoding { pub(crate) track: Arc, pub(crate) srtp_stream: Arc, pub(crate) rtcp_interceptor: Arc, - pub(crate) stream_info: Mutex, - pub(crate) context: Mutex, + pub(crate) stream_info: StreamInfo, + pub(crate) context: TrackLocalContext, pub(crate) ssrc: SSRC, @@ -275,12 +273,21 @@ impl RTCRtpSender { None }; + let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); + let context = TrackLocalContext { + id: self.id.clone(), + params: super::RTCRtpParameters::default(), + ssrc: 0, + write_stream, + paused: self.paused.clone(), + mid: None, + }; let encoding = TrackEncoding { track, srtp_stream, rtcp_interceptor, - stream_info: Mutex::new(StreamInfo::default()), - context: Mutex::new(TrackLocalContext::default()), + stream_info: StreamInfo::default(), + context, ssrc, rtx, }; @@ -390,9 +397,8 @@ impl RTCRtpSender { .first_mut() .ok_or(Error::ErrRTPSenderNewTrackHasIncorrectEnvelope)?; - let mut context = encoding.context.lock().await; if self.has_sent() { - encoding.track.unbind(&context).await?; + encoding.track.unbind(&encoding.context).await?; } self.seq_trans.reset_offset(); @@ -406,12 +412,12 @@ impl RTCRtpSender { .and_then(|t| t.mid()); let new_context = TrackLocalContext { - id: context.id.clone(), + id: encoding.context.id.clone(), params: self .media_engine .get_rtp_parameters_by_kind(t.kind(), RTCRtpTransceiverDirection::Sendonly), - ssrc: context.ssrc, - write_stream: context.write_stream.clone(), + ssrc: encoding.context.ssrc, + write_stream: encoding.context.write_stream.clone(), paused: self.paused.clone(), mid, }; @@ -419,13 +425,13 @@ impl RTCRtpSender { match t.bind(&new_context).await { Err(err) => { // Re-bind the original track - encoding.track.bind(&context).await?; + encoding.track.bind(&encoding.context).await?; Err(err) } Ok(codec) => { // Codec has changed - context.params.codecs = vec![codec]; + encoding.context.params.codecs = vec![codec]; encoding.track = Arc::clone(t); Ok(()) } @@ -433,8 +439,7 @@ impl RTCRtpSender { } else { if self.has_sent() { for encoding in track_encodings.drain(..) { - let context = encoding.context.lock().await; - encoding.track.unbind(&context).await?; + encoding.track.unbind(&encoding.context).await?; } } else { track_encodings.clear(); @@ -449,7 +454,7 @@ impl RTCRtpSender { if self.has_sent() { return Err(Error::ErrRTPSenderSendAlreadyCalled); } - let track_encodings = self.track_encodings.lock().await; + let mut track_encodings = self.track_encodings.lock().await; if track_encodings.is_empty() { return Err(Error::ErrRTPSenderTrackRemoved); } @@ -461,24 +466,18 @@ impl RTCRtpSender { .and_then(|t| t.upgrade()) .and_then(|t| t.mid()); - for (idx, encoding) in track_encodings.iter().enumerate() { + for (idx, encoding) in track_encodings.iter_mut().enumerate() { let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone())); - let mut context = TrackLocalContext { - id: self.id.clone(), - params: self.media_engine.get_rtp_parameters_by_kind( - encoding.track.kind(), - RTCRtpTransceiverDirection::Sendonly, - ), - ssrc: parameters.encodings[idx].ssrc, - write_stream: Some( - Arc::clone(&write_stream) as Arc - ), - paused: self.paused.clone(), - mid: mid.to_owned(), - }; + encoding.context.params = self.media_engine.get_rtp_parameters_by_kind( + encoding.track.kind(), + RTCRtpTransceiverDirection::Sendonly, + ); + encoding.context.ssrc = parameters.encodings[idx].ssrc; + encoding.context.write_stream = Arc::clone(&write_stream) as _; + encoding.context.mid = mid.to_owned(); - let codec = encoding.track.bind(&context).await?; - let stream_info = create_stream_info( + let codec = encoding.track.bind(&encoding.context).await?; + encoding.stream_info = create_stream_info( self.id.clone(), parameters.encodings[idx].ssrc, codec.payload_type, @@ -486,16 +485,14 @@ impl RTCRtpSender { ¶meters.rtp_parameters.header_extensions, None, ); - context.params.codecs = vec![codec.clone()]; + encoding.context.params.codecs = vec![codec.clone()]; let srtp_writer = Arc::clone(&encoding.srtp_stream) as Arc; let rtp_writer = self .interceptor - .bind_local_stream(&stream_info, srtp_writer) + .bind_local_stream(&encoding.stream_info, srtp_writer) .await; - *encoding.context.lock().await = context; - *encoding.stream_info.lock().await = stream_info; *write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer); if let (Some(rtx), Some(rtx_codec)) = ( @@ -573,8 +570,9 @@ impl RTCRtpSender { let track_encodings = self.track_encodings.lock().await; for encoding in track_encodings.iter() { - let stream_info = encoding.stream_info.lock().await; - self.interceptor.unbind_local_stream(&stream_info).await; + self.interceptor + .unbind_local_stream(&encoding.stream_info) + .await; encoding.srtp_stream.close().await?; diff --git a/webrtc/src/track/track_local/mod.rs b/webrtc/src/track/track_local/mod.rs index 09f4f2a7b..9963e491a 100644 --- a/webrtc/src/track/track_local/mod.rs +++ b/webrtc/src/track/track_local/mod.rs @@ -47,12 +47,12 @@ pub trait TrackLocalWriter: fmt::Debug { /// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used /// in Interceptors. -#[derive(Default, Debug, Clone)] +#[derive(Debug, Clone)] pub struct TrackLocalContext { pub(crate) id: String, pub(crate) params: RTCRtpParameters, pub(crate) ssrc: SSRC, - pub(crate) write_stream: Option>, + pub(crate) write_stream: Arc, pub(crate) paused: Arc, pub(crate) mid: Option, } @@ -78,7 +78,7 @@ impl TrackLocalContext { /// write_stream returns the write_stream for this TrackLocal. The implementer writes the outbound /// media packets to it - pub fn write_stream(&self) -> Option> { + pub fn write_stream(&self) -> Arc { self.write_stream.clone() } @@ -131,13 +131,13 @@ pub trait TrackLocal { /// TrackBinding is a single bind for a Track /// Bind can be called multiple times, this stores the /// result for a single bind call so that it can be used when writing -#[derive(Default, Debug)] +#[derive(Debug)] pub(crate) struct TrackBinding { id: String, ssrc: SSRC, payload_type: PayloadType, params: RTCRtpParameters, - write_stream: Option>, + write_stream: Arc, sender_paused: Arc, hdr_ext_ids: Vec, } diff --git a/webrtc/src/track/track_local/track_local_static_rtp.rs b/webrtc/src/track/track_local/track_local_static_rtp.rs index 5ec250a42..44e6de8de 100644 --- a/webrtc/src/track/track_local/track_local_static_rtp.rs +++ b/webrtc/src/track/track_local/track_local_static_rtp.rs @@ -150,17 +150,13 @@ impl TrackLocalStaticRTP { } } - if let Some(write_stream) = &b.write_stream { - match write_stream.write_rtp_with_attributes(&pkt, attr).await { - Ok(m) => { - n += m; - } - Err(err) => { - write_errs.push(err); - } + match b.write_stream.write_rtp_with_attributes(&pkt, attr).await { + Ok(m) => { + n += m; + } + Err(err) => { + write_errs.push(err); } - } else { - write_errs.push(Error::new("track binding has none write_stream".to_owned())); } }