Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track_local: Get rid of some unnecessary Mutexes and Options #646

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 37 additions & 39 deletions webrtc/src/rtp_transceiver/rtp_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use crate::rtp_transceiver::{
create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters,
RTCRtpTransceiver, SSRC,
};
use crate::track::track_local::{
InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext, TrackLocalWriter,
};
use crate::track::track_local::{InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext};

pub(crate) struct RTPSenderInternal {
pub(crate) stop_called_rx: Arc<Notify>,
Expand All @@ -38,8 +36,8 @@ pub(crate) struct TrackEncoding {
pub(crate) track: Arc<dyn TrackLocal + Send + Sync>,
pub(crate) srtp_stream: Arc<SrtpWriterFuture>,
pub(crate) rtcp_interceptor: Arc<dyn RTCPReader + Send + Sync>,
pub(crate) stream_info: Mutex<StreamInfo>,
pub(crate) context: Mutex<TrackLocalContext>,
pub(crate) stream_info: StreamInfo,
pub(crate) context: TrackLocalContext,

pub(crate) ssrc: SSRC,

Expand Down Expand Up @@ -275,12 +273,21 @@ impl RTCRtpSender {
None
};

let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
let context = TrackLocalContext {
id: self.id.clone(),
params: super::RTCRtpParameters::default(),
ssrc: 0,
write_stream,
paused: self.paused.clone(),
mid: None,
};
let encoding = TrackEncoding {
track,
srtp_stream,
rtcp_interceptor,
stream_info: Mutex::new(StreamInfo::default()),
context: Mutex::new(TrackLocalContext::default()),
stream_info: StreamInfo::default(),
context,
ssrc,
rtx,
};
Expand Down Expand Up @@ -390,9 +397,8 @@ impl RTCRtpSender {
.first_mut()
.ok_or(Error::ErrRTPSenderNewTrackHasIncorrectEnvelope)?;

let mut context = encoding.context.lock().await;
if self.has_sent() {
encoding.track.unbind(&context).await?;
encoding.track.unbind(&encoding.context).await?;
}

self.seq_trans.reset_offset();
Expand All @@ -406,35 +412,34 @@ impl RTCRtpSender {
.and_then(|t| t.mid());

let new_context = TrackLocalContext {
id: context.id.clone(),
id: encoding.context.id.clone(),
params: self
.media_engine
.get_rtp_parameters_by_kind(t.kind(), RTCRtpTransceiverDirection::Sendonly),
ssrc: context.ssrc,
write_stream: context.write_stream.clone(),
ssrc: encoding.context.ssrc,
write_stream: encoding.context.write_stream.clone(),
paused: self.paused.clone(),
mid,
};

match t.bind(&new_context).await {
Err(err) => {
// Re-bind the original track
encoding.track.bind(&context).await?;
encoding.track.bind(&encoding.context).await?;

Err(err)
}
Ok(codec) => {
// Codec has changed
context.params.codecs = vec![codec];
encoding.context.params.codecs = vec![codec];
encoding.track = Arc::clone(t);
Ok(())
}
}
} else {
if self.has_sent() {
for encoding in track_encodings.drain(..) {
let context = encoding.context.lock().await;
encoding.track.unbind(&context).await?;
encoding.track.unbind(&encoding.context).await?;
}
} else {
track_encodings.clear();
Expand All @@ -449,7 +454,7 @@ impl RTCRtpSender {
if self.has_sent() {
return Err(Error::ErrRTPSenderSendAlreadyCalled);
}
let track_encodings = self.track_encodings.lock().await;
let mut track_encodings = self.track_encodings.lock().await;
if track_encodings.is_empty() {
return Err(Error::ErrRTPSenderTrackRemoved);
}
Expand All @@ -461,41 +466,33 @@ impl RTCRtpSender {
.and_then(|t| t.upgrade())
.and_then(|t| t.mid());

for (idx, encoding) in track_encodings.iter().enumerate() {
for (idx, encoding) in track_encodings.iter_mut().enumerate() {
let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
let mut context = TrackLocalContext {
id: self.id.clone(),
params: self.media_engine.get_rtp_parameters_by_kind(
encoding.track.kind(),
RTCRtpTransceiverDirection::Sendonly,
),
ssrc: parameters.encodings[idx].ssrc,
write_stream: Some(
Arc::clone(&write_stream) as Arc<dyn TrackLocalWriter + Send + Sync>
),
paused: self.paused.clone(),
mid: mid.to_owned(),
};
encoding.context.params = self.media_engine.get_rtp_parameters_by_kind(
encoding.track.kind(),
RTCRtpTransceiverDirection::Sendonly,
);
encoding.context.ssrc = parameters.encodings[idx].ssrc;
encoding.context.write_stream = Arc::clone(&write_stream) as _;
encoding.context.mid = mid.to_owned();

let codec = encoding.track.bind(&context).await?;
let stream_info = create_stream_info(
let codec = encoding.track.bind(&encoding.context).await?;
encoding.stream_info = create_stream_info(
self.id.clone(),
parameters.encodings[idx].ssrc,
codec.payload_type,
codec.capability.clone(),
&parameters.rtp_parameters.header_extensions,
None,
);
context.params.codecs = vec![codec.clone()];
encoding.context.params.codecs = vec![codec.clone()];

let srtp_writer = Arc::clone(&encoding.srtp_stream) as Arc<dyn RTPWriter + Send + Sync>;
let rtp_writer = self
.interceptor
.bind_local_stream(&stream_info, srtp_writer)
.bind_local_stream(&encoding.stream_info, srtp_writer)
.await;

*encoding.context.lock().await = context;
*encoding.stream_info.lock().await = stream_info;
*write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer);

if let (Some(rtx), Some(rtx_codec)) = (
Expand Down Expand Up @@ -573,8 +570,9 @@ impl RTCRtpSender {

let track_encodings = self.track_encodings.lock().await;
for encoding in track_encodings.iter() {
let stream_info = encoding.stream_info.lock().await;
self.interceptor.unbind_local_stream(&stream_info).await;
self.interceptor
.unbind_local_stream(&encoding.stream_info)
.await;

encoding.srtp_stream.close().await?;

Expand Down
10 changes: 5 additions & 5 deletions webrtc/src/track/track_local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ pub trait TrackLocalWriter: fmt::Debug {

/// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
/// in Interceptors.
#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct TrackLocalContext {
pub(crate) id: String,
pub(crate) params: RTCRtpParameters,
pub(crate) ssrc: SSRC,
pub(crate) write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
pub(crate) write_stream: Arc<dyn TrackLocalWriter + Send + Sync>,
pub(crate) paused: Arc<AtomicBool>,
pub(crate) mid: Option<SmolStr>,
}
Expand All @@ -78,7 +78,7 @@ impl TrackLocalContext {

/// write_stream returns the write_stream for this TrackLocal. The implementer writes the outbound
/// media packets to it
pub fn write_stream(&self) -> Option<Arc<dyn TrackLocalWriter + Send + Sync>> {
pub fn write_stream(&self) -> Arc<dyn TrackLocalWriter + Send + Sync> {
self.write_stream.clone()
}

Expand Down Expand Up @@ -131,13 +131,13 @@ pub trait TrackLocal {
/// TrackBinding is a single bind for a Track
/// Bind can be called multiple times, this stores the
/// result for a single bind call so that it can be used when writing
#[derive(Default, Debug)]
#[derive(Debug)]
pub(crate) struct TrackBinding {
id: String,
ssrc: SSRC,
payload_type: PayloadType,
params: RTCRtpParameters,
write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
write_stream: Arc<dyn TrackLocalWriter + Send + Sync>,
sender_paused: Arc<AtomicBool>,
hdr_ext_ids: Vec<rtp::header::Extension>,
}
Expand Down
16 changes: 6 additions & 10 deletions webrtc/src/track/track_local/track_local_static_rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,13 @@ impl TrackLocalStaticRTP {
}
}

if let Some(write_stream) = &b.write_stream {
match write_stream.write_rtp_with_attributes(&pkt, attr).await {
Ok(m) => {
n += m;
}
Err(err) => {
write_errs.push(err);
}
match b.write_stream.write_rtp_with_attributes(&pkt, attr).await {
Ok(m) => {
n += m;
}
Err(err) => {
write_errs.push(err);
}
} else {
write_errs.push(Error::new("track binding has none write_stream".to_owned()));
}
}

Expand Down
Loading