Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/tcp/: Call take_error on tokio TcpStream #2725

Merged
merged 5 commits into from Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion transports/tcp/CHANGELOG.md
Expand Up @@ -2,6 +2,10 @@

- Update to `libp2p-core` `v0.34.0`.

- Call `TcpStream::take_error` in tokio `Provider` to report connection
establishment errors early. See also [PR 2458] for the related async-io
change.

# 0.33.0

- Update to `libp2p-core` `v0.33.0`.
Expand All @@ -16,7 +20,9 @@

# 0.31.1 [2022-02-02]

- Call `TcpSocket::take_error` to report connection establishment errors early.
- Call `TcpSocket::take_error` to report connection establishment errors early. See [PR 2458].

[PR 2458]: https://github.com/libp2p/rust-libp2p/pull/2458

# 0.31.0 [2022-01-27]

Expand Down
2 changes: 1 addition & 1 deletion transports/tcp/Cargo.toml
Expand Up @@ -21,7 +21,7 @@ libc = "0.2.80"
libp2p-core = { version = "0.34.0", path = "../../core", default-features = false }
log = "0.4.11"
socket2 = { version = "0.4.0", features = ["all"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true }
tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true }

[features]
default = ["async-io"]
Expand Down
14 changes: 14 additions & 0 deletions transports/tcp/src/provider/tokio.rs
Expand Up @@ -64,8 +64,22 @@ impl Provider for Tcp {

fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result<Self::Stream>> {
async move {
// Taken from [`tokio_crate::net::TcpStream::connect_mio`].

let stream = tokio_crate::net::TcpStream::try_from(s)?;

// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
// writable we check for `take_socket_error` to see if the connect
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
stream.writable().await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at fn writable this seems to be doing sth different than what connect_mio calls under the hood

poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;

vs

 pub async fn writable(&self) -> io::Result<()> {
    self.ready(Interest::WRITABLE).await?;
    Ok(())
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I was following the recommendation in the last paragraph of the poll_write_ready docs:


    /// Polls for write readiness.
    ///
    /// If the tcp stream is not currently ready for writing, this method will
    /// store a clone of the `Waker` from the provided `Context`. When the tcp
    /// stream becomes ready for writing, `Waker::wake` will be called on the
    /// waker.
    ///
    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
    /// the `Waker` from the `Context` passed to the most recent call is
    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
    /// second, independent waker.)
    ///
    /// This function is intended for cases where creating and pinning a future
    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
    /// preferred, as this supports polling from multiple tasks at once.

https://github.com/tokio-rs/tokio/blob/55078ffec3bba78803ff646f3933a23834789e4e/tokio/src/net/tcp/stream.rs#L793-L807

Do you see any problems with that @dignifiedquire?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense, seems like using writable is the right call to make here 👍


if let Some(e) = stream.take_error()? {
return Err(e);
}

Ok(TcpStream(stream))
}
.boxed()
Expand Down