Skip to content

Commit

Permalink
src: stream: Completely drop the stream before creating a new one
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso authored and patrickelectric committed Mar 15, 2024
1 parent 8bc71a6 commit 3f6e90d
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 71 deletions.
201 changes: 147 additions & 54 deletions src/stream/manager.rs
Expand Up @@ -49,7 +49,9 @@ impl Manager {
.streams
.values()
.filter_map(|stream| match stream.state.read() {
Ok(guard) => Some(guard.video_and_stream_information.clone()),
Ok(guard) => guard
.as_ref()
.map(|state| state.video_and_stream_information.clone()),
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
None
Expand Down Expand Up @@ -208,30 +210,32 @@ pub fn get_first_sdp_from_source(source: String) -> ClonableResult<gst_sdp::SDPM
};

let Some(result) = manager.streams.values().find_map(|stream| {
let state = match stream.state.read() {
let state_guard = match stream.state.read() {
Ok(guard) => guard,
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

if state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
{
state
.pipeline
.inner_state_as_ref()
.sinks
.values()
.find_map(|sink| sink.get_sdp().ok())
} else {
None
}
state_guard.as_ref().and_then(|state| {
if state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
{
state
.pipeline
.inner_state_as_ref()
.sinks
.values()
.find_map(|sink| sink.get_sdp().ok())
} else {
None
}
})
}) else {
return Err(Arc::new(anyhow!(
"Failed to find any valid sdp for souce {source:?}"
Expand Down Expand Up @@ -301,26 +305,40 @@ pub async fn get_jpeg_thumbnail_from_source(
}
};

state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
state.as_ref().is_some_and(|state| {
state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
})
}) else {
return None;
};

let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {

Check warning on line 320 in src/stream/manager.rs

View workflow job for this annotation

GitHub Actions / clippy

this `MutexGuard` is held across an `await` point

warning: this `MutexGuard` is held across an `await` point --> src/stream/manager.rs:320:25 | 320 | let state_guard = match stream.state.read() { | ^^^^^^^^^^^ | = help: consider using an async-aware `Mutex` type or ensuring the `MutexGuard` is dropped before calling await note: these are all the `await` points this lock is held through --> src/stream/manager.rs:343:75 | 343 | sinks.find(|sink| matches!(sink, Sink::Image(_))).await | ^^^^^ ... 351 | .await | ^^^^^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#await_holding_lock
Ok(guard) => {
if guard.is_none() {
return None;
}
guard
}
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

let mut sinks =
futures::stream::iter(state.pipeline.inner_state_as_ref().sinks.values());
let mut sinks = futures::stream::iter(
state_guard
.as_ref()
.unwrap()
.pipeline
.inner_state_as_ref()
.sinks
.values(),
);
let Some(Sink::Image(image_sink)) =
sinks.find(|sink| matches!(sink, Sink::Image(_))).await
else {
Expand Down Expand Up @@ -356,14 +374,21 @@ pub async fn add_stream_and_start(
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};
for stream in manager.streams.values() {
let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

state
state_guard
.as_ref()
.unwrap()
.video_and_stream_information
.conflicts_with(&video_and_stream_information)?;
}
Expand All @@ -382,15 +407,27 @@ pub fn remove_stream_by_name(stream_name: &str) -> Result<()> {
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};
if let Some(stream_id) = &manager.streams.iter().find_map(|(id, stream)| {
let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
warn!("Stream without State, skipping...");
return None;
}
guard
}
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

if state.video_and_stream_information.name == *stream_name {
if state_guard
.as_ref()
.unwrap()
.video_and_stream_information
.name
== *stream_name
{
return Some(*id);
}
None
Expand Down Expand Up @@ -429,7 +466,21 @@ impl WebRTCSessionManagementInterface for Manager {
};

let sink = Sink::WebRTC(WebRTCSink::try_new(bind, sender)?);
stream.state.write().unwrap().pipeline.add_sink(sink)?;

let mut state_guard = match stream.state.write() {
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

state_guard.as_mut().unwrap().pipeline.add_sink(sink)?;

debug!("WebRTC session created: {session_id:?}");

Ok(session_id)
Expand All @@ -450,12 +501,21 @@ impl WebRTCSessionManagementInterface for Manager {
.get_mut(&bind.producer_id)
.context(format!("Producer {:?} not found", bind.producer_id))?;

let mut state = match stream.state.write() {
Ok(guard) => guard,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
let mut state_guard = match stream.state.write() {
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

state
state_guard
.as_mut()
.unwrap()
.pipeline
.remove_sink(&bind.session_id)
.context(format!("Cannot remove session {:?}", bind.session_id))?;
Expand All @@ -475,18 +535,27 @@ impl WebRTCSessionManagementInterface for Manager {
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};

let state = match manager
let state_guard = match manager
.streams
.get(&bind.producer_id)
.context(format!("Producer {:?} not found", bind.producer_id))?
.state
.read()
{
Ok(guard) => guard,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

let sink = state
let sink = state_guard
.as_ref()
.unwrap()
.pipeline
.inner_state_as_ref()
.sinks
Expand Down Expand Up @@ -526,18 +595,27 @@ impl WebRTCSessionManagementInterface for Manager {
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};

let state = match manager
let state_guard = match manager
.streams
.get(&bind.producer_id)
.context(format!("Producer {:?} not found", bind.producer_id))?
.state
.read()
{
Ok(guard) => guard,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

let sink = state
let sink = state_guard
.as_ref()
.unwrap()
.pipeline
.inner_state_as_ref()
.sinks
Expand Down Expand Up @@ -565,8 +643,15 @@ impl StreamManagementInterface<StreamStatus> for Manager {
};

let stream_id = match stream.state.read() {
Ok(guard) => guard.pipeline_id,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard.as_ref().unwrap().pipeline_id
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

if manager.streams.insert(stream_id, stream).is_some() {
Expand Down Expand Up @@ -613,18 +698,26 @@ impl StreamManagementInterface<StreamStatus> for Manager {
.streams
.values()
.filter_map(|stream| {
let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
warn!("Stream without State, skipping...");
return None;
}
guard
}
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

let state_ref = state_guard.as_ref().unwrap();

Some(StreamStatus {
id: state.pipeline_id,
running: state.pipeline.is_running(),
video_and_stream: state.video_and_stream_information.clone(),
id: state_ref.pipeline_id,
running: state_ref.pipeline.is_running(),
video_and_stream: state_ref.video_and_stream_information.clone(),
})
})
.collect())
Expand Down

0 comments on commit 3f6e90d

Please sign in to comment.