Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stream recreation logic, prefer Interval::tick to time::sleep in tokio loops #362

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/mavlink/mavlink_camera.rs
Expand Up @@ -138,8 +138,9 @@ impl MavlinkCameraInner {
let component_id = camera.component.component_id;
let system_id = camera.component.system_id;

let mut period = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
period.tick().await;

let header = mavlink::MavHeader {
system_id,
Expand Down
4 changes: 3 additions & 1 deletion src/stream/gst/utils.rs
Expand Up @@ -78,8 +78,10 @@ pub async fn wait_for_element_state_async(
) -> Result<()> {
let mut trials = 1000 * timeout_time_secs / polling_time_millis;

let mut period = tokio::time::interval(tokio::time::Duration::from_millis(polling_time_millis));

loop {
tokio::time::sleep(tokio::time::Duration::from_millis(polling_time_millis)).await;
period.tick().await;

let Some(element) = element_weak.upgrade() else {
return Err(anyhow!("Cannot access Element"));
Expand Down
3 changes: 2 additions & 1 deletion src/stream/mod.rs
Expand Up @@ -98,8 +98,9 @@ impl Stream {

let mut video_and_stream_information = video_and_stream_information;

let mut period = tokio::time::interval(tokio::time::Duration::from_millis(100));
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
period.tick().await;

if !state
.read()
Expand Down
67 changes: 35 additions & 32 deletions src/stream/pipeline/runner.rs
Expand Up @@ -166,41 +166,44 @@ impl PipelineRunner {
let mut lost_timestamps: usize = 0;
let max_lost_timestamps: usize = 30;

loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

if let Some(reason) = finish.recv().await {
return Err(anyhow!("{reason:?}"));
}
Comment on lines -172 to -174
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not clear, the wrong logic was here. We were only going to the if below (which triggers the stream recreation process) after this future ends, which would only be Some(reason), which would return, or None (closed), which would not necessarily happen.

let mut period = tokio::time::interval(tokio::time::Duration::from_millis(100));

if !allow_block {
// Restart pipeline if pipeline position do not change,
// occur if usb connection is lost and gst do not detect it
let pipeline = pipeline_weak
.upgrade()
.context("Unable to access the Pipeline from its weak reference")?;

if let Some(position) = pipeline.query_position::<gst::ClockTime>() {
previous_position = match previous_position {
Some(current_previous_position) => {
if current_previous_position.nseconds() != 0
&& current_previous_position == position
{
lost_timestamps += 1;
} else if lost_timestamps > 0 {
// We are back in track, erase lost timestamps
warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps");
lost_timestamps = 0;
}
if lost_timestamps == 1 {
warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated");
} else if lost_timestamps > max_lost_timestamps {
return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})"));
loop {
tokio::select! {
reason = finish.recv() => {
return Err(anyhow!("{reason:?}"));
}
_ = period.tick() => {
if !allow_block {
// Restart pipeline if pipeline position do not change,
// occur if usb connection is lost and gst do not detect it
let pipeline = pipeline_weak
.upgrade()
.context("Unable to access the Pipeline from its weak reference")?;

if let Some(position) = pipeline.query_position::<gst::ClockTime>() {
previous_position = match previous_position {
Some(current_previous_position) => {
if current_previous_position.nseconds() != 0
&& current_previous_position == position
{
lost_timestamps += 1;
} else if lost_timestamps > 0 {
// We are back in track, erase lost timestamps
warn!("Position normalized, but didn't changed for {lost_timestamps} timestamps");
lost_timestamps = 0;
}
if lost_timestamps == 1 {
warn!("Position did not change for {lost_timestamps}, silently tracking until {max_lost_timestamps}, then the stream will be recreated");
} else if lost_timestamps > max_lost_timestamps {
return Err(anyhow!("Pipeline lost too many timestamps (max. was {max_lost_timestamps})"));
}

Some(position)
}
None => Some(position),
}

Some(position)
}
None => Some(position),
}
}
}
Expand Down