Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: stream: Completely drop the stream before creating a new one #361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
201 changes: 147 additions & 54 deletions src/stream/manager.rs
Expand Up @@ -49,7 +49,9 @@ impl Manager {
.streams
.values()
.filter_map(|stream| match stream.state.read() {
Ok(guard) => Some(guard.video_and_stream_information.clone()),
Ok(guard) => guard
.as_ref()
.map(|state| state.video_and_stream_information.clone()),
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
None
Expand Down Expand Up @@ -208,30 +210,32 @@ pub fn get_first_sdp_from_source(source: String) -> ClonableResult<gst_sdp::SDPM
};

let Some(result) = manager.streams.values().find_map(|stream| {
let state = match stream.state.read() {
let state_guard = match stream.state.read() {
Ok(guard) => guard,
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

if state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
{
state
.pipeline
.inner_state_as_ref()
.sinks
.values()
.find_map(|sink| sink.get_sdp().ok())
} else {
None
}
state_guard.as_ref().and_then(|state| {
if state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
{
state
.pipeline
.inner_state_as_ref()
.sinks
.values()
.find_map(|sink| sink.get_sdp().ok())
} else {
None
}
})
}) else {
return Err(Arc::new(anyhow!(
"Failed to find any valid sdp for souce {source:?}"
Expand Down Expand Up @@ -301,26 +305,40 @@ pub async fn get_jpeg_thumbnail_from_source(
}
};

state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
state.as_ref().is_some_and(|state| {
state
.video_and_stream_information
.video_source
.inner()
.source_string()
== source
})
}) else {
return None;
};

let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
return None;
}
guard
}
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

let mut sinks =
futures::stream::iter(state.pipeline.inner_state_as_ref().sinks.values());
let mut sinks = futures::stream::iter(
state_guard
.as_ref()
.unwrap()
.pipeline
.inner_state_as_ref()
.sinks
.values(),
);
let Some(Sink::Image(image_sink)) =
sinks.find(|sink| matches!(sink, Sink::Image(_))).await
else {
Expand Down Expand Up @@ -356,14 +374,21 @@ pub async fn add_stream_and_start(
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};
for stream in manager.streams.values() {
let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

state
state_guard
.as_ref()
.unwrap()
.video_and_stream_information
.conflicts_with(&video_and_stream_information)?;
}
Expand All @@ -382,15 +407,27 @@ pub fn remove_stream_by_name(stream_name: &str) -> Result<()> {
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};
if let Some(stream_id) = &manager.streams.iter().find_map(|(id, stream)| {
let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
warn!("Stream without State, skipping...");
return None;
}
guard
}
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

if state.video_and_stream_information.name == *stream_name {
if state_guard
.as_ref()
.unwrap()
.video_and_stream_information
.name
== *stream_name
{
return Some(*id);
}
None
Expand Down Expand Up @@ -429,7 +466,21 @@ impl WebRTCSessionManagementInterface for Manager {
};

let sink = Sink::WebRTC(WebRTCSink::try_new(bind, sender)?);
stream.state.write().unwrap().pipeline.add_sink(sink)?;

let mut state_guard = match stream.state.write() {
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

state_guard.as_mut().unwrap().pipeline.add_sink(sink)?;

debug!("WebRTC session created: {session_id:?}");

Ok(session_id)
Expand All @@ -450,12 +501,21 @@ impl WebRTCSessionManagementInterface for Manager {
.get_mut(&bind.producer_id)
.context(format!("Producer {:?} not found", bind.producer_id))?;

let mut state = match stream.state.write() {
Ok(guard) => guard,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
let mut state_guard = match stream.state.write() {
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

state
state_guard
.as_mut()
.unwrap()
.pipeline
.remove_sink(&bind.session_id)
.context(format!("Cannot remove session {:?}", bind.session_id))?;
Expand All @@ -475,18 +535,27 @@ impl WebRTCSessionManagementInterface for Manager {
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};

let state = match manager
let state_guard = match manager
.streams
.get(&bind.producer_id)
.context(format!("Producer {:?} not found", bind.producer_id))?
.state
.read()
{
Ok(guard) => guard,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

let sink = state
let sink = state_guard
.as_ref()
.unwrap()
.pipeline
.inner_state_as_ref()
.sinks
Expand Down Expand Up @@ -526,18 +595,27 @@ impl WebRTCSessionManagementInterface for Manager {
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
};

let state = match manager
let state_guard = match manager
.streams
.get(&bind.producer_id)
.context(format!("Producer {:?} not found", bind.producer_id))?
.state
.read()
{
Ok(guard) => guard,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

let sink = state
let sink = state_guard
.as_ref()
.unwrap()
.pipeline
.inner_state_as_ref()
.sinks
Expand Down Expand Up @@ -565,8 +643,15 @@ impl StreamManagementInterface<StreamStatus> for Manager {
};

let stream_id = match stream.state.read() {
Ok(guard) => guard.pipeline_id,
Err(error) => return Err(anyhow!("Failed locking a Mutex. Reason: {error}")),
Ok(guard) => {
if guard.is_none() {
return Err(anyhow!("Stream without State"));
}
guard.as_ref().unwrap().pipeline_id
}
Err(error) => {
return Err(anyhow!("Failed locking a Mutex. Reason: {error}"));
}
};

if manager.streams.insert(stream_id, stream).is_some() {
Expand Down Expand Up @@ -613,18 +698,26 @@ impl StreamManagementInterface<StreamStatus> for Manager {
.streams
.values()
.filter_map(|stream| {
let state = match stream.state.read() {
Ok(guard) => guard,
let state_guard = match stream.state.read() {
Ok(guard) => {
if guard.is_none() {
warn!("Stream without State, skipping...");
return None;
}
guard
}
Err(error) => {
error!("Failed locking a Mutex. Reason: {error}");
return None;
}
};

let state_ref = state_guard.as_ref().unwrap();

Some(StreamStatus {
id: state.pipeline_id,
running: state.pipeline.is_running(),
video_and_stream: state.video_and_stream_information.clone(),
id: state_ref.pipeline_id,
running: state_ref.pipeline.is_running(),
video_and_stream: state_ref.video_and_stream_information.clone(),
})
})
.collect())
Expand Down