Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bad completion of some futures in io::copy_bidirectional #6519

Closed
Armillus opened this issue Apr 27, 2024 · 8 comments
Closed

Bad completion of some futures in io::copy_bidirectional #6519

Armillus opened this issue Apr 27, 2024 · 8 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io

Comments

@Armillus
Copy link

Armillus commented Apr 27, 2024

Version

tokio v1.37.0

Platform

Debian 10, but also WSL 2, WIndows and Debian 11 (untested on others platforms)

Description

It seems I've reached an edge case with the function io::copy_bidirectional. Unfortunately, the minimal reproducible example provided below is a bit long, so I'll rather describe the problem instead.

It might be a bad understanding or implementation on my side, but I feel like it's rather a tricky problem in some very rare circumstances.

Minimal reproducible example

The gist can be found here.

Context

I have a server which acts as a proxy which is made using tokio. In this case, the server will take some data from TCP clients and distribute it to the appropriate backend services. Data is not transformed, it is just a Vec<u8> that can be moved. Data is flowing in both directions, but the TCP clients are sending much more data.

I had no problems with io::copy_bidirectional with most use cases, but I encountered exactly one edge case where the issue showed up, since certain conditions need to be met.

Note that my AsyncWrite and AsyncRead implementations were working perfectly fine with a custom, manual loop using a tokio::select! (with or without the correct poll_flush implementation).

Origin of the problem

I use io::copy_bidirectional between a TcpStream and a Stream of my own, which sends and receives data using mspc bounded channels. At the beginning, I just implemented poll_write and poll_read, and my implementation of poll_flush was a no-op. However, data sent through the channel looked like it was "corrupted" at some point.

The service in charge of processing this data was claiming that a packet was incomplete and provoked an interruption of the connection. Then, I implemented properly poll_flush, which led me to a mutual halt, where both the service and my Stream were waiting for some data. Then, I remarked that io::copy_bidirectional did not wait for my poll_flush future to complete before calling it again after many writing operations.

For reference, my flushing code roughly looks like this (simplified for the issue):

let mut flushing_future = self.flushing_future.take().unwrap_or_else(|| {
    let sender = self.sender.clone();
    let mut data = Vec::new();

    data.append(&mut self.write_buffer);

    Box::pin(async move {
        // Simulating some slight back-pressure
        tokio::time::sleep(Duration::from_millis(1)).await;

        sender.send(data).await
    })
});

match flushing_future.as_mut().poll(cx) {
    Poll::Pending => {
        self.flushing_future = Some(flushing_future);

        Poll::Pending
    },
    Poll::Ready(Err(_)) => Poll::Ready(Err(std::io::ErrorKind::Other.into())),
    Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
}

Understanding of the problem

After a while, I decided to go into the details of io::copy_bidirectional. I've put some debugging in there to be sure, but I could finally observe that my problem was related to this code:

// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
    ready!(writer.as_mut().poll_flush(cx))?;
    #[cfg(any(
        feature = "fs",
        feature = "io-std",
        feature = "net",
        feature = "process",
        feature = "rt",
        feature = "signal",
        feature = "sync",
        feature = "time",
    ))]
    coop.made_progress();
    self.need_flush = false;
}

According to the comments, a flush might be performed when there is data to write and when a read operation is pending, for optimization purpose. However, I realized that when this flushing future itself is pending, the function poll_copy is returned from, which leads to a new scheduling and eventually to a new call.

Ignoring some tokio stuff which is not part of our problem, the function will again enter the main loop and step into the poll_fill_buf matching. If the read operation is not pending anymore, then self.buf will be filled again, then it will be written, and finally it might be flushed again. To summarize, we have the following process:

In poll_copy:

  • after a while, self.buf holds some data and self.need_flush is true
  • poll_fill_buf is pending
  • poll_flush is performed but is pending (let's call it F)
  • poll_copy is returned from, then called again
  • poll_fill_buf is performed again and returns immediately a result
  • poll_write_buf is done several times in a row (due to my bufferization, the future returns immediately)
  • poll_flush is performed (let's call it F'), but F did just finish, so it returns Poll::Ready(Ok(()))
  • poll_copy thinks that F' has completed, but in fact it was just F that did complete, and F' is not even started

Hence, the main issue is that when a flush is performed during a read operation, it will probably never be led to completion, so when poll_copy calls it again, it will probably returns Poll::Ready(), whereas the second future was not even started. While it does not appear as a problem when the data flow is "uninterrupted", it becomes one when both ends of the connection are waiting for some data, which is not flushed because of this issue.

Moreover, I suspect that this optimization was responsible for my previous issue, when I did not implement poll_flush properly. My guess is that it triggers a similar problem, starting a future that will never be completed properly.

Solution

In my case, with the same poll_flush implementation on my side, I could fixed the problem by copying this code here, just before the loop.
It works because as soon as the function is called again, it will first check that the potential poll_flush that was triggered during a pending read has completed properly. Of course, this likely has a performance cost, but it is now working.

Note that it does not fixes the problem I had without poll_flush, but I think it is just another check to perform to lead another future to completion (see the end of the previous section).

The only other way to fix the problem is either to use a manual loop with tokio::select!, which does the same as io::copy_bidirectional but is less efficient, or to replace my Option<Future> in my Stream by a Vec<Future>, which does not make sense in my opinion.

Conclusion

If that's not clear enough, I can provide more information or rework the minimal reproducible example.
If my issue is correct, I can happily provide a PR with my solution (which I'm sure can be improved).

Otherwise, I would be happy to know what I'm missing.

Sorry for the long read :)

@Armillus Armillus added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Apr 27, 2024
@mox692 mox692 added the M-io Module: tokio/io label Apr 30, 2024
@mox692
Copy link
Member

mox692 commented Apr 30, 2024

I could fixed the problem by copying this code here, just before the loop.

This could be a fix, but I think that ideally it should be implemented in such a way that reads to buffer do not stop if there is room in the buffer but flushing future is pending.

@Armillus
Copy link
Author

Armillus commented May 1, 2024

I could fixed the problem by copying this code here, just before the loop.

This could be a fix, but I think that ideally it should be implemented in such a way that reads to buffer do not stop if there is room in the buffer but flushing future is pending.

I agree, that would be definitely more efficient. As I imagine it, this solution would look like this:

loop {
    // If our buffer is empty, then we need to read some data to
    // continue.
    if self.pos == self.cap && !self.read_done {
       // This code remains the same
    }

    // If a flush future is in progress, do not write until it is finished
    if self.need_flush {
        ready!(writer.as_mut().poll_flush(cx))?;
        #[cfg(any(
          feature = "fs",
          feature = "io-std",
          feature = "net",
          feature = "process",
          feature = "rt",
          feature = "signal",
          feature = "sync",
          feature = "time",
        ))]
        coop.made_progress();
        self.need_flush = false;
    }
    
    // Remaining code is untouched
}

Note that any error during flushing is ignored in my code, since it is also ignored elsewhere in the function. The flushing code could probably be factorized, since it's the same as the one in case of a pending read.

Besides, I think that we can fix the other similar problem coming from this optimization (read future may never be properly polled upon termination) by changing the condition at the entry of the loop:

loop {
    // If our buffer is empty, then we need to read some data to
    // continue. Otherwise, if we can read a bit more data, do it
    // to improve the chances of a large write
    let is_readable = (self.pos == self.cap) || (self.cap < self.buf.len());
    
    if is_readable && !self.read_done {
       // This code remains the same
    }

    // Remaining code is untouched
}

@Armillus
Copy link
Author

Armillus commented May 3, 2024

While digging into the first issue I had (with just a poll_write() implementation), I could confirm that this optimization is partially meaningless for now. Indeed, the current implementation will not poll again a pending read future before the next poll_write(), which is not fatal, but makes the whole thing useless if the reader is not immediately ready.

Hence, I've reworked my fixes and implemented them in the related PR. With those fixes, everything works as expected, both in the real project where I spotted the issue and the minimal example provided here.

The fix related to the flush future is definitely the most important since it breaks the poll_flush contract, but the modification of the read condition seemed to improve a bit the overall performance in my case (high workload with a lot of data flowing from especially one side).

@Darksonn
Copy link
Contributor

Darksonn commented May 4, 2024

If poll_flush returns Ready and you haven't flushed everything, then your IO resource is incorrect.

@Armillus
Copy link
Author

Armillus commented May 4, 2024

If poll_flush returns Ready and you haven't flushed everything, then your IO resource is incorrect.

So if I write N bytes of data, then I start to flush (thus getting a Poll::Pending), then I write again X bytes of data, to eventually flush again and getting a Poll::Ready(Ok(())), it will necessary mean that N + X bytes of data have been flushed properly?

@Darksonn
Copy link
Contributor

Darksonn commented May 4, 2024

Yes. If your IO resource relies on futures internally, then the poll_flush that finishes the flush future the N bytes must not return Ready. Instead, that call to poll_flush should start another flush operation for the remaining X bytes. You don't return Ready until the X bytes have also been flushed.

@Armillus
Copy link
Author

Armillus commented May 4, 2024

Ok, thank you very much, I did not understand it this way. The documentation is not very explicit in this sense in my opinion, but maybe that I'm the only one to think so and that I've misunderstood its wording.

Hence, I guess that the only thing that still makes sense in this issue and the related PR is about this optimization. It's a minor detail, but with the current implementation, if the read operation is pending, then the next write might occur before the next read, without letting any chance for a second poll for the read operation. In other terms, we could potentially allow for more optimizations by trying to read a second time before writing again when both operations are pending. What do you think @Darksonn?

@Darksonn
Copy link
Contributor

Darksonn commented May 4, 2024

Reading before writing to include more data in the write makes sense to me.

I'll close this issue, but if you want to adapt the PR to optimize something, then that's fine with me.

@Darksonn Darksonn closed this as not planned Won't fix, can't repro, duplicate, stale May 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io
Projects
None yet
Development

No branches or pull requests

3 participants