From 2b1dc02dd4152af3597550f916b89cf24871e1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Thu, 14 Mar 2024 16:59:43 -0300 Subject: [PATCH] src: stream: pipeline: Fix stream restart logic, use Interval::tick instead of sleep --- src/stream/pipeline/runner.rs | 67 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/src/stream/pipeline/runner.rs b/src/stream/pipeline/runner.rs index 68564cd1..277b4b43 100644 --- a/src/stream/pipeline/runner.rs +++ b/src/stream/pipeline/runner.rs @@ -166,41 +166,44 @@ impl PipelineRunner { let mut lost_timestamps: usize = 0; let max_lost_timestamps: usize = 30; - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - if let Some(reason) = finish.recv().await { - return Err(anyhow!("{reason:?}")); - } + let mut period = tokio::time::interval(tokio::time::Duration::from_millis(100)); - if !allow_block { - // Restart pipeline if pipeline position do not change, - // occur if usb connection is lost and gst do not detect it - let pipeline = pipeline_weak - .upgrade() - .context("Unable to access the Pipeline from its weak reference")?; - - if let Some(position) = pipeline.query_position::() { - previous_position = match previous_position { - Some(current_previous_position) => { - if current_previous_position.nseconds() != 0 - && current_previous_position == position - { - lost_timestamps += 1; - } else if lost_timestamps > 0 { - // We are back in track, erase lost timestamps - warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps"); - lost_timestamps = 0; - } - if lost_timestamps == 1 { - warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated"); - } else if lost_timestamps > max_lost_timestamps { - return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})")); + loop { + tokio::select! { + reason = finish.recv() => { + return Err(anyhow!("{reason:?}")); + } + _ = period.tick() => { + if !allow_block { + // Restart pipeline if pipeline position do not change, + // occur if usb connection is lost and gst do not detect it + let pipeline = pipeline_weak + .upgrade() + .context("Unable to access the Pipeline from its weak reference")?; + + if let Some(position) = pipeline.query_position::() { + previous_position = match previous_position { + Some(current_previous_position) => { + if current_previous_position.nseconds() != 0 + && current_previous_position == position + { + lost_timestamps += 1; + } else if lost_timestamps > 0 { + // We are back in track, erase lost timestamps + warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps"); + lost_timestamps = 0; + } + if lost_timestamps == 1 { + warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated"); + } else if lost_timestamps > max_lost_timestamps { + return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})")); + } + + Some(position) + } + None => Some(position), } - - Some(position) } - None => Some(position), } } }