Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help with timed events on a event loop structure #234

Open
fabracht opened this issue Apr 19, 2023 · 0 comments
Open

Help with timed events on a event loop structure #234

fabracht opened this issue Apr 19, 2023 · 0 comments

Comments

@fabracht
Copy link

Hello,
I'm migrating my event loop from using the mio crate to the io-uring system and I'm running into some problems. I'm having difficulties finding the culprit here, but I have the impression I'm doing things wrong in when registering the timer and also when trying to reset it.
The socket trigger works ( you can do a nc -u 127.0.0.1 44571 on a second terminal and check the messages coming in. The loop duration triggers properly and the loop breaks as expected. The problem I'm having is with the timers. I'm having a hard time spotting what I'm doing wrong, so I thought I might ask for some help from the experts.
I've pasted the code below.

I'm running this on a VM inside my m2 Mac. So I'm running with this linker flag: linker = "aarch64-linux-gnu-gcc"
I don't have a non arm computer to test this, so I can't confirm it has any relation to this. Although I don't think this is the culprit, I thought it wouldn't harm mentioning it.

event_loop.rs

use io_uring::{opcode, types};
use std::{
    collections::HashMap,
    io,
    os::fd::{AsRawFd, RawFd},
    time::Duration,
};

pub type CommonError = Box<dyn std::error::Error>;

#[derive(Debug)]
pub struct Itimerspec {
    pub it_interval: Duration,
    pub it_value: Duration,
}

pub type TimedSources<T> = (
    RawFd,
    usize,
    Box<dyn FnMut(&mut T) -> Result<i32, CommonError>>,
);

pub struct PolledSource<T: AsRawFd> {
    fd: T,
    pub callback: Box<dyn FnMut(&mut T) -> Result<i32, CommonError> + 'static>,
}

pub struct UringEventLoop<T: AsRawFd> {
    ring: io_uring::IoUring,
    pub sources: HashMap<usize, PolledSource<T>>,
    timed_sources: HashMap<usize, TimedSources<T>>,
    next_token: usize,
}

impl<T: AsRawFd> UringEventLoop<T> {
    pub fn new(event_capacity: usize) -> Self {
        Self {
            ring: io_uring::IoUring::new(event_capacity.try_into().unwrap()).unwrap(),
            sources: HashMap::new(),
            timed_sources: HashMap::new(),
            next_token: 0,
        }
    }

    pub fn generate_token(&mut self) -> usize {
        let token = self.next_token;
        self.next_token += 1;
        token
    }

    pub fn register_event_source<F>(
        &mut self,
        event_source: T,
        callback: F,
    ) -> Result<usize, CommonError>
    where
        F: FnMut(&mut T) -> Result<i32, CommonError> + 'static,
    {
        let token = self.generate_token();
        let fd = event_source.as_raw_fd();
        let polled_source = PolledSource {
            fd: event_source,
            callback: Box::new(callback),
        };
        let _ = self.sources.insert(token, polled_source);
        eprintln!("Registering event source with token {}", token);
        let poll_e = opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _)
            .multi(true) // Add this line to enable the multi feature
            .build()
            .user_data(token as _);

        let (submitter, mut submit_queue, _) = self.ring.split();

        loop {
            if submit_queue.is_full() {
                submitter.submit()?;
            }
            submit_queue.sync(); // sync with the real current queue
            match unsafe { submit_queue.push(&poll_e) } {
                Ok(_) => break,
                Err(_) => continue,
            };
        }
        eprintln!("Registered event source with token {}", token);
        Ok(token)
    }

    pub fn run(&mut self) -> Result<(), CommonError> {
        let (submitter, mut _submit_queue, mut completion_queue) = self.ring.split();
        for timed_entry in &self.timed_sources {
            eprintln!("-------  timed_entry = {:?} -----------", timed_entry.0);
        }
        'outer: loop {
            eprintln!("Running loop");
            // Submit queued events and wait
            match submitter.submit_and_wait(1) {
                Ok(_) => (),
                Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => eprintln!("EBUSY"),
                Err(err) => return Err(err.into()),
            }
            eprintln!("Finished waiting");
            // Sync with the real current queue
            completion_queue.sync();
            eprintln!("completion_queue.len() = {}", completion_queue.len());
            // Process events from the completion queue
            for completion_event in &mut completion_queue {
                let result = completion_event.result();
                let token_index = completion_event.user_data() as usize;

                // Check for errors in the event result
                if result < 0 {
                    if result != -62 {
                        let error = io::Error::from_raw_os_error(-result);
                        eprintln!("token {} error {:?}", token_index, error);
                        return Err(error.into());
                    }
                }

                eprintln!("token {} ready", token_index);

                if let Some(polled_source) = self.sources.get_mut(&token_index) {
                    let fd = &mut polled_source.fd;
                    eprintln!("Running polled source");
                    polled_source.callback.as_mut()(fd)?;
                } else if let Some(timed_source) = self.timed_sources.get_mut(&token_index) {
                    eprintln!("Running timed source");
                    if let Some(polled_source) = self.sources.get_mut(&timed_source.1) {
                        let fd = &mut polled_source.fd;
                        timed_source.2.as_mut()(fd)?;
                    }
                    eprintln!("Reseting timer"); // Doing manually for the sake of simplicity in the example
                    let updated_time_spec = Itimerspec {
                        it_interval: Duration::from_secs(0),
                        it_value: Duration::from_millis(250),
                    };
                    Self::reset_timer(&mut _submit_queue, timed_source, &updated_time_spec)?;
                } else {
                    eprintln!("Max duration on token {} reached. Exiting", token_index);
                    break 'outer;
                }
            }
        }

        Ok(())
    }

    pub fn add_duration(&mut self, time_spec: Itimerspec) -> Result<usize, CommonError> {
        let token = self.generate_token();

        let timespec = types::Timespec::new()
            .nsec(time_spec.it_value.subsec_nanos() as u32)
            .sec(time_spec.it_value.as_secs() as u64);
        eprintln!("timespec = {:?}", timespec);
        let timeout = opcode::Timeout::new(&timespec as _)
            .build()
            .user_data(token as u64);
        let (submitter, mut submit_queue, _) = self.ring.split();

        loop {
            if submit_queue.is_full() {
                submitter.submit()?;
            }
            submit_queue.sync(); // sync with the real current queue
            match unsafe { submit_queue.push(&timeout) } {
                Ok(_) => break,
                Err(e) => {
                    eprintln!("Error pushing timeout: {:?}", e);
                    continue;
                }
            };
        }
        eprintln!("Registered duration {:?} with token {}", timeout, token);
        Ok(token)
    }

    pub fn add_timer<F>(
        &mut self,
        time_spec: Itimerspec,
        token: &usize,
        callback: F,
    ) -> Result<usize, CommonError>
    where
        F: FnMut(&mut T) -> Result<i32, CommonError> + 'static,
    {
        let new_token = self.add_duration(time_spec)?;
        if let Some(polled_source) = self.sources.get_mut(token) {
            self.timed_sources.insert(
                new_token,
                (polled_source.fd.as_raw_fd(), *token, Box::new(callback)),
            );
            eprintln!(
                "Registered timer with token {} for source with token {}",
                new_token, token
            );
        }

        Ok(new_token)
    }

    fn reset_timer(
        submit_queue: &mut io_uring::SubmissionQueue,
        timed_source: &mut TimedSources<T>,
        updated_time_spec: &Itimerspec,
    ) -> Result<(), CommonError> {
        let (_, source_token, _) = timed_source;

        // Create a new timespec with the updated Itimerspec
        let new_timespec = types::Timespec::new()
            .nsec(updated_time_spec.it_value.subsec_nanos() as u32)
            .sec(updated_time_spec.it_value.as_secs() as u64);

        let new_timeout = opcode::TimeoutUpdate::new(*source_token as u64, &new_timespec as _)
            .build()
            .user_data(*source_token as u64);

        // Replace the old timer with the new one
        loop {
            submit_queue.sync(); // Sync with the real current queue
            match unsafe { submit_queue.push(&new_timeout) } {
                Ok(_) => break,
                Err(e) => {
                    eprintln!("Error pushing new timeout: {:?}", e);
                    continue;
                }
            };
        }
        eprintln!("Reset timer with token {} ", source_token);

        Ok(())
    }
}

main.rs

use std::{
    net::{IpAddr, Ipv4Addr, SocketAddr},
    os::fd::{AsRawFd, RawFd},
    time::Duration,
};

use event_loop::Itimerspec;

use crate::event_loop::UringEventLoop;

mod event_loop;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let udp_socket = std::net::UdpSocket::bind("127.0.0.1:44571")?;
    udp_socket.set_nonblocking(true)?;
    let udp_fd = udp_socket.as_raw_fd();
    let mut event_loop: UringEventLoop<RawFd> = UringEventLoop::new(1024);

    let socket_token = event_loop.register_event_source(udp_fd, move |fd| {
        let mut buffer = [0u8; 1024];

        let mut sockaddr = libc::sockaddr_in {
            sin_family: libc::AF_INET as libc::sa_family_t,
            sin_port: 0,
            sin_addr: libc::in_addr { s_addr: 0 },
            sin_zero: [0; 8],
        };

        // Receive the message using `recvfrom` from the libc crate
        let n: isize = unsafe {
            libc::recvfrom(
                fd.as_raw_fd(),
                buffer.as_mut_ptr() as *mut _,
                buffer.len(),
                0,
                &mut sockaddr as *const _ as *mut _,
                &mut std::mem::size_of_val(&sockaddr) as *const _ as *mut _,
            )
        };

        // Convert the message to a string
        let ip_bytes = sockaddr.sin_addr.s_addr.to_le_bytes();
        let socket_addr = SocketAddr::new(
            IpAddr::V4(Ipv4Addr::new(
                ip_bytes[0],
                ip_bytes[1],
                ip_bytes[2],
                ip_bytes[3],
            )),
            sockaddr.sin_port.to_be(),
        );
        println!("Received {} bytes from {}", n, socket_addr);
        Ok(0)
    })?;

    let it_interval1 = Duration::from_millis(0);
    let it_value1 = Duration::from_millis(250);
    let time_spec = Itimerspec {
        it_interval: it_interval1,
        it_value: it_value1,
    };
    event_loop.add_timer(time_spec, &socket_token, move |_socket| {
        println!("Timer fired");
        // Do send_to operation with socket
        Ok(0)
    })?;

    let it_interval = Duration::from_millis(0);
    let it_value = Duration::from_millis(1000);
    event_loop.add_duration(Itimerspec {
        it_interval,
        it_value,
    })?;

    event_loop.run()?;

    Ok(())
}

cargo.toml

[package]
name = "eq-uring"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
io-uring = "0.6.0"
libc = "0.2.141"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant