diff --git a/src/mavlink/mavlink_camera.rs b/src/mavlink/mavlink_camera.rs index a4eacdb5..58427ff3 100644 --- a/src/mavlink/mavlink_camera.rs +++ b/src/mavlink/mavlink_camera.rs @@ -138,8 +138,9 @@ impl MavlinkCameraInner { let component_id = camera.component.component_id; let system_id = camera.component.system_id; + let mut period = tokio::time::interval(tokio::time::Duration::from_secs(1)); loop { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + period.tick().await; let header = mavlink::MavHeader { system_id, diff --git a/src/stream/gst/utils.rs b/src/stream/gst/utils.rs index 7c8a62a0..d6cbd697 100644 --- a/src/stream/gst/utils.rs +++ b/src/stream/gst/utils.rs @@ -78,8 +78,10 @@ pub async fn wait_for_element_state_async( ) -> Result<()> { let mut trials = 1000 * timeout_time_secs / polling_time_millis; + let mut period = tokio::time::interval(tokio::time::Duration::from_millis(polling_time_millis)); + loop { - tokio::time::sleep(tokio::time::Duration::from_millis(polling_time_millis)).await; + period.tick().await; let Some(element) = element_weak.upgrade() else { return Err(anyhow!("Cannot access Element")); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 741ed80a..af4e9981 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -98,8 +98,9 @@ impl Stream { let mut video_and_stream_information = video_and_stream_information; + let mut period = tokio::time::interval(tokio::time::Duration::from_millis(100)); loop { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + period.tick().await; if !state .read() diff --git a/src/stream/pipeline/runner.rs b/src/stream/pipeline/runner.rs index 68564cd1..277b4b43 100644 --- a/src/stream/pipeline/runner.rs +++ b/src/stream/pipeline/runner.rs @@ -166,41 +166,44 @@ impl PipelineRunner { let mut lost_timestamps: usize = 0; let max_lost_timestamps: usize = 30; - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - if let Some(reason) = finish.recv().await { - return Err(anyhow!("{reason:?}")); - } + let mut period = tokio::time::interval(tokio::time::Duration::from_millis(100)); - if !allow_block { - // Restart pipeline if pipeline position do not change, - // occur if usb connection is lost and gst do not detect it - let pipeline = pipeline_weak - .upgrade() - .context("Unable to access the Pipeline from its weak reference")?; - - if let Some(position) = pipeline.query_position::() { - previous_position = match previous_position { - Some(current_previous_position) => { - if current_previous_position.nseconds() != 0 - && current_previous_position == position - { - lost_timestamps += 1; - } else if lost_timestamps > 0 { - // We are back in track, erase lost timestamps - warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps"); - lost_timestamps = 0; - } - if lost_timestamps == 1 { - warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated"); - } else if lost_timestamps > max_lost_timestamps { - return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})")); + loop { + tokio::select! { + reason = finish.recv() => { + return Err(anyhow!("{reason:?}")); + } + _ = period.tick() => { + if !allow_block { + // Restart pipeline if pipeline position do not change, + // occur if usb connection is lost and gst do not detect it + let pipeline = pipeline_weak + .upgrade() + .context("Unable to access the Pipeline from its weak reference")?; + + if let Some(position) = pipeline.query_position::() { + previous_position = match previous_position { + Some(current_previous_position) => { + if current_previous_position.nseconds() != 0 + && current_previous_position == position + { + lost_timestamps += 1; + } else if lost_timestamps > 0 { + // We are back in track, erase lost timestamps + warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps"); + lost_timestamps = 0; + } + if lost_timestamps == 1 { + warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated"); + } else if lost_timestamps > max_lost_timestamps { + return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})")); + } + + Some(position) + } + None => Some(position), } - - Some(position) } - None => Some(position), } } }