Skip to content

Commit

Permalink
Fix tokio panic
Browse files Browse the repository at this point in the history
This might not be needed after all because of
tokio-rs/tokio#5710, but since I already wrote
the code, I created a PR anyway.
  • Loading branch information
Hocuri committed May 23, 2023
1 parent 5b435d1 commit ebfe917
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 29 deletions.
3 changes: 2 additions & 1 deletion deltachat-jsonrpc/src/api/mod.rs
Expand Up @@ -6,6 +6,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
pub use deltachat::accounts::Accounts;
use deltachat::message::get_msg_read_receipts;
use deltachat::qr::Qr;
use deltachat::tools;
use deltachat::{
chat::{
self, add_contact_to_chat, forward_msgs, get_chat_media, get_chat_msgs, get_chat_msgs_ex,
Expand Down Expand Up @@ -152,7 +153,7 @@ impl CommandApi {
impl CommandApi {
/// Test function.
async fn sleep(&self, delay: f64) {
tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await
tools::sleep(std::time::Duration::from_secs_f64(delay)).await
}

// ---------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions examples/simple.rs
@@ -1,10 +1,10 @@
use deltachat::chat::{self, ChatId};
use deltachat::chatlist::*;
use deltachat::config;
use deltachat::contact::*;
use deltachat::context::*;
use deltachat::message::Message;
use deltachat::stock_str::StockStrings;
use deltachat::{chatlist::*, tools};
use deltachat::{EventType, Events};
use tempfile::tempdir;

Expand Down Expand Up @@ -80,7 +80,7 @@ async fn main() {
}

// wait for the message to be sent out
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tools::sleep(std::time::Duration::from_secs(1)).await;

log::info!("fetching chats..");
let chats = Chatlist::try_load(&ctx, 0, None, None).await.unwrap();
Expand Down
13 changes: 13 additions & 0 deletions python/tests/test_3_offline.py
Expand Up @@ -481,6 +481,19 @@ def test_create_contact(self, acfactory):
contact2 = ac1.create_contact("display1 <x@example.org>", "real")
assert contact2.name == "real"

def test_send_lots_of_offline_msgs(self, acfactory):
ac1 = acfactory.get_pseudo_configured_account()
ac1.set_config("configured_mail_server", "example.org")
ac1.set_config("configured_mail_user", "example.org")
ac1.set_config("configured_mail_pw", "example.org")
ac1.set_config("configured_send_server", "example.org")
ac1.set_config("configured_send_user", "example.org")
ac1.set_config("configured_send_pw", "example.org")
ac1.start_io()
chat = ac1.create_contact("some1@example.org", name="some1").create_chat()
for i in range(50):
chat.send_text("hello")

def test_create_chat_simple(self, acfactory):
ac1 = acfactory.get_pseudo_configured_account()
contact1 = ac1.create_contact("some1@example.org", name="some1")
Expand Down
7 changes: 5 additions & 2 deletions src/accounts.rs
Expand Up @@ -510,7 +510,10 @@ impl AccountConfig {
#[cfg(test)]
mod tests {
use super::*;
use crate::stock_str::{self, StockMessage};
use crate::{
stock_str::{self, StockMessage},
tools,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_account_new_open() {
Expand Down Expand Up @@ -732,7 +735,7 @@ mod tests {

// Test that event emitter does not return `None` immediately.
let duration = std::time::Duration::from_millis(1);
assert!(tokio::time::timeout(duration, event_emitter.recv())
assert!(tools::timeout(duration, event_emitter.recv())
.await
.is_err());

Expand Down
2 changes: 1 addition & 1 deletion src/authres.rs
Expand Up @@ -669,7 +669,7 @@ Authentication-Results: dkim=";

// Sleep to make sure key reset is ignored because of DKIM failure
// and not because reordering is suspected.
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

let bob2 = tcm.unconfigured().await;
bob2.configure_addr("bob@example.net").await;
Expand Down
15 changes: 8 additions & 7 deletions src/chat.rs
Expand Up @@ -3798,6 +3798,7 @@ mod tests {
use crate::message::delete_msgs;
use crate::receive_imf::receive_imf;
use crate::test_utils::TestContext;
use crate::tools;
use tokio::fs;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -4107,17 +4108,17 @@ mod tests {

add_contact_to_chat(&alice, alice_chat_id, claire_id).await?;
let add2 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

add_contact_to_chat(&alice, alice_chat_id, daisy_id).await?;
let add3 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 4);

remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?;
let remove1 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?;
let remove2 = alice.pop_sent_msg().await;
Expand Down Expand Up @@ -4154,11 +4155,11 @@ mod tests {

send_text_msg(&alice, alice_chat_id, "populate".to_string()).await?;
let add = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

remove_contact_from_chat(&alice, alice_chat_id, claire_id).await?;
let remove1 = alice.pop_sent_msg().await;
tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
tools::sleep(std::time::Duration::from_millis(1100)).await;

remove_contact_from_chat(&alice, alice_chat_id, daisy_id).await?;
let remove2 = alice.pop_sent_msg().await;
Expand Down Expand Up @@ -4752,9 +4753,9 @@ mod tests {
.await
.unwrap()
.chat_id;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tools::sleep(std::time::Duration::from_millis(1000)).await;
let chat_id2 = t.get_self_chat().await.id;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
tools::sleep(std::time::Duration::from_millis(1000)).await;
let chat_id3 = create_group_chat(&t, ProtectionStatus::Unprotected, "foo")
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/contact.rs
Expand Up @@ -16,7 +16,7 @@ use regex::Regex;
use rusqlite::OptionalExtension;
use serde::{Deserialize, Serialize};
use tokio::task;
use tokio::time::{timeout, Duration};
use tokio::time::Duration;

use crate::aheader::EncryptPreference;
use crate::chat::ChatId;
Expand All @@ -32,6 +32,7 @@ use crate::mimeparser::AvatarAction;
use crate::param::{Param, Params};
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
use crate::sql::{self, params_iter};
use crate::tools::timeout;
use crate::tools::{
duration_to_str, get_abs_path, improve_single_line_input, strip_rtlo_characters, time,
EmailAddress,
Expand Down
2 changes: 1 addition & 1 deletion src/ephemeral.rs
Expand Up @@ -71,7 +71,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{ensure, Result};
use async_channel::Receiver;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;

use crate::chat::{send_msg, ChatId};
use crate::constants::{DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH};
Expand All @@ -84,6 +83,7 @@ use crate::message::{Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::sql::{self, params_iter};
use crate::stock_str;
use crate::tools::timeout;
use crate::tools::{duration_to_str, time};

/// Ephemeral timer value.
Expand Down
3 changes: 2 additions & 1 deletion src/imap/idle.rs
Expand Up @@ -8,6 +8,7 @@ use futures_lite::FutureExt;
use super::session::Session;
use super::Imap;
use crate::imap::{client::IMAP_TIMEOUT, FolderMeaning};
use crate::tools;
use crate::{context::Context, scheduler::InterruptInfo};

const IDLE_TIMEOUT: Duration = Duration::from_secs(23 * 60);
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Session {
}
}

let mut session = tokio::time::timeout(Duration::from_secs(15), handle.done())
let mut session = tools::timeout(Duration::from_secs(15), handle.done())
.await
.with_context(|| format!("{folder_name}: IMAP IDLE protocol timed out"))?
.with_context(|| format!("{folder_name}: IMAP IDLE failed"))?;
Expand Down
3 changes: 2 additions & 1 deletion src/imex.rs
Expand Up @@ -790,6 +790,7 @@ mod tests {
use crate::pgp::{split_armored_data, HEADER_AUTOCRYPT, HEADER_SETUPCODE};
use crate::stock_str::StockMessage;
use crate::test_utils::{alice_keypair, TestContext};
use crate::tools;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_render_setup_file() {
Expand Down Expand Up @@ -963,7 +964,7 @@ mod tests {
// The database is still unconfigured;
// fill the config cache with the old value.
context2.is_configured().await.ok();
tokio::time::sleep(Duration::from_micros(1)).await;
tools::sleep(Duration::from_micros(1)).await;
}

// Assert that the config cache has the new value now.
Expand Down
3 changes: 2 additions & 1 deletion src/imex/transfer.rs
Expand Up @@ -598,6 +598,7 @@ mod tests {
use crate::chat::{get_chat_msgs, send_msg, ChatItem};
use crate::message::{Message, Viewtype};
use crate::test_utils::TestContextManager;
use crate::tools;

use super::*;

Expand Down Expand Up @@ -629,7 +630,7 @@ mod tests {
get_backup(&ctx1, provider.qr()).await.unwrap();

// Make sure the provider finishes without an error.
tokio::time::timeout(Duration::from_secs(30), provider)
tools::timeout(Duration::from_secs(30), provider)
.await
.expect("timed out")
.expect("error in provider");
Expand Down
2 changes: 1 addition & 1 deletion src/location.rs
Expand Up @@ -6,7 +6,6 @@ use std::time::Duration;
use anyhow::{ensure, Context as _, Result};
use async_channel::Receiver;
use quick_xml::events::{BytesEnd, BytesStart, BytesText};
use tokio::time::timeout;

use crate::chat::{self, ChatId};
use crate::contact::ContactId;
Expand All @@ -15,6 +14,7 @@ use crate::events::EventType;
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::stock_str;
use crate::tools::timeout;
use crate::tools::{duration_to_str, time};

/// Location record.
Expand Down
2 changes: 1 addition & 1 deletion src/net.rs
Expand Up @@ -6,11 +6,11 @@ use std::time::Duration;

use anyhow::{Context as _, Error, Result};
use tokio::net::{lookup_host, TcpStream};
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;

use crate::context::Context;
use crate::tools::time;
use crate::tools::timeout;

pub(crate) mod http;
pub(crate) mod session;
Expand Down
10 changes: 5 additions & 5 deletions src/scheduler.rs
Expand Up @@ -16,13 +16,13 @@ use crate::context::Context;
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType;
use crate::imap::{FolderMeaning, Imap};
use crate::job;
use crate::location;
use crate::log::LogExt;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
use crate::tools::time;
use crate::tools::{duration_to_str, maybe_add_time_based_warnings};
use crate::{job, tools};

pub(crate) mod connectivity;

Expand Down Expand Up @@ -674,7 +674,7 @@ async fn smtp_loop(
"smtp got rate limited, waiting for {} until can send again",
duration_to_str(duration_until_can_send)
);
tokio::time::timeout(duration_until_can_send, async {
tools::timeout(duration_until_can_send, async {
idle_interrupt_receiver.recv().await.unwrap_or_default()
})
.await
Expand All @@ -701,7 +701,7 @@ async fn smtp_loop(
"smtp has messages to retry, planning to retry {} seconds later", timeout
);
let duration = std::time::Duration::from_secs(timeout);
tokio::time::timeout(duration, async {
tools::timeout(duration, async {
idle_interrupt_receiver.recv().await.unwrap_or_default()
})
.await
Expand Down Expand Up @@ -865,12 +865,12 @@ impl Scheduler {
// Actually shutdown tasks.
let timeout_duration = std::time::Duration::from_secs(30);
for b in once(self.inbox).chain(self.oboxes.into_iter()) {
tokio::time::timeout(timeout_duration, b.handle)
tools::timeout(timeout_duration, b.handle)
.await
.log_err(context)
.ok();
}
tokio::time::timeout(timeout_duration, self.smtp_handle)
tools::timeout(timeout_duration, self.smtp_handle)
.await
.log_err(context)
.ok();
Expand Down
6 changes: 3 additions & 3 deletions src/test_utils.rs
Expand Up @@ -37,7 +37,7 @@ use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::MimeMessage;
use crate::receive_imf::receive_imf;
use crate::stock_str::StockStrings;
use crate::tools::EmailAddress;
use crate::tools::{self, EmailAddress};

#[allow(non_upper_case_globals)]
pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png");
Expand Down Expand Up @@ -432,7 +432,7 @@ impl TestContext {
break row;
}
if start.elapsed() < timeout {
tokio::time::sleep(Duration::from_millis(100)).await;
tools::sleep(Duration::from_millis(100)).await;
} else {
return None;
}
Expand Down Expand Up @@ -954,7 +954,7 @@ impl EventTracker {
/// If no matching events are ready this will wait for new events to arrive and time out
/// after 10 seconds.
pub async fn get_matching<F: Fn(&EventType) -> bool>(&self, event_matcher: F) -> EventType {
tokio::time::timeout(Duration::from_secs(10), async move {
tools::timeout(Duration::from_secs(10), async move {
loop {
let event = self.recv().await.unwrap();
if event_matcher(&event.typ) {
Expand Down
16 changes: 15 additions & 1 deletion src/tools.rs
Expand Up @@ -4,6 +4,7 @@
#![allow(missing_docs)]

use std::borrow::Cow;
use std::cmp::min;
use std::fmt;
use std::io::{Cursor, Write};
use std::mem;
Expand All @@ -14,7 +15,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{bail, Context as _, Result};
use base64::Engine as _;
use chrono::{Local, NaiveDateTime, NaiveTime, TimeZone};
use futures::{StreamExt, TryStreamExt};
use futures::{Future, StreamExt, TryStreamExt};
use mailparse::dateparse;
use mailparse::headers::Headers;
use mailparse::MailHeaderMap;
Expand Down Expand Up @@ -710,6 +711,19 @@ pub(crate) fn strip_rtlo_characters(input_str: &str) -> String {
input_str.replace(|char| RTLO_CHARACTERS.contains(&char), "")
}

const THIRTY_YEARS: Duration = Duration::from_secs(60 * 60 * 24 * 365 * 10);

pub fn sleep(duration: Duration) -> tokio::time::Sleep {
tokio::time::sleep(min(THIRTY_YEARS, duration))
}

pub fn timeout<F>(duration: Duration, future: F) -> tokio::time::Timeout<F>
where
F: Future,
{
tokio::time::timeout(min(THIRTY_YEARS, duration), future)
}

#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing)]
Expand Down

0 comments on commit ebfe917

Please sign in to comment.