Skip to content

Commit c849339

Browse files
authoredFeb 22, 2023
feat(client): add client::connect::capture_connection() (#3144)
Add `capture_connection` functionality. This allows callers to retrieve the `Connected` struct of the connection that was used internally by Hyper. This is in service of #2605. Although this uses `http::Extensions` under the hood, the API exposed explicitly hides that detail.
1 parent 37ed5a2 commit c849339

File tree

3 files changed

+223
-6
lines changed

3 files changed

+223
-6
lines changed
 

‎src/client/client.rs

+11-4
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,21 @@ use http::uri::{Port, Scheme};
1010
use http::{Method, Request, Response, Uri, Version};
1111
use tracing::{debug, trace, warn};
1212

13+
use crate::body::{Body, HttpBody};
14+
use crate::client::connect::CaptureConnectionExtension;
15+
use crate::common::{
16+
exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin,
17+
Poll,
18+
};
19+
use crate::rt::Executor;
20+
1321
use super::conn;
1422
use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
1523
use super::pool::{
1624
self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
1725
};
1826
#[cfg(feature = "tcp")]
1927
use super::HttpConnector;
20-
use crate::body::{Body, HttpBody};
21-
use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
22-
use crate::rt::Executor;
2328

2429
/// A Client to make outgoing HTTP requests.
2530
///
@@ -238,7 +243,9 @@ where
238243
})
239244
}
240245
};
241-
246+
req.extensions_mut()
247+
.get_mut::<CaptureConnectionExtension>()
248+
.map(|conn| conn.set(&pooled.conn_info));
242249
if pooled.is_http1() {
243250
if req.version() == Version::HTTP_2 {
244251
warn!("Connection is HTTP/1, but request requires HTTP/2");

‎src/client/connect/mod.rs

+179-1
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,11 @@
8282
use std::fmt;
8383
use std::fmt::{Debug, Formatter};
8484
use std::sync::atomic::{AtomicBool, Ordering};
85+
use std::ops::Deref;
8586
use std::sync::Arc;
8687

8788
use ::http::Extensions;
89+
use tokio::sync::watch;
8890

8991
cfg_feature! {
9092
#![feature = "tcp"]
@@ -146,6 +148,114 @@ impl PoisonPill {
146148
}
147149
}
148150

151+
/// [`CaptureConnection`] allows callers to capture [`Connected`] information
152+
///
153+
/// To capture a connection for a request, use [`capture_connection`].
154+
#[derive(Debug, Clone)]
155+
pub struct CaptureConnection {
156+
rx: watch::Receiver<Option<Connected>>,
157+
}
158+
159+
/// Capture the connection for a given request
160+
///
161+
/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
162+
/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
163+
/// as the connection is established.
164+
///
165+
/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
166+
///
167+
/// # Examples
168+
///
169+
/// **Synchronous access**:
170+
/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
171+
/// established. This is ideal for situations where you are certain the connection has already
172+
/// been established (e.g. after the response future has already completed).
173+
/// ```rust
174+
/// use hyper::client::connect::{capture_connection, CaptureConnection};
175+
/// let mut request = http::Request::builder()
176+
/// .uri("http://foo.com")
177+
/// .body(())
178+
/// .unwrap();
179+
///
180+
/// let captured_connection = capture_connection(&mut request);
181+
/// // some time later after the request has been sent...
182+
/// let connection_info = captured_connection.connection_metadata();
183+
/// println!("we are connected! {:?}", connection_info.as_ref());
184+
/// ```
185+
///
186+
/// **Asynchronous access**:
187+
/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
188+
/// connection is available.
189+
///
190+
/// ```rust
191+
/// # #[cfg(feature = "runtime")]
192+
/// # async fn example() {
193+
/// use hyper::client::connect::{capture_connection, CaptureConnection};
194+
/// let mut request = http::Request::builder()
195+
/// .uri("http://foo.com")
196+
/// .body(hyper::Body::empty())
197+
/// .unwrap();
198+
///
199+
/// let mut captured = capture_connection(&mut request);
200+
/// tokio::task::spawn(async move {
201+
/// let connection_info = captured.wait_for_connection_metadata().await;
202+
/// println!("we are connected! {:?}", connection_info.as_ref());
203+
/// });
204+
///
205+
/// let client = hyper::Client::new();
206+
/// client.request(request).await.expect("request failed");
207+
/// # }
208+
/// ```
209+
pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
210+
let (tx, rx) = CaptureConnection::new();
211+
request.extensions_mut().insert(tx);
212+
rx
213+
}
214+
215+
/// TxSide for [`CaptureConnection`]
216+
///
217+
/// This is inserted into `Extensions` to allow Hyper to back channel connection info
218+
#[derive(Clone)]
219+
pub(crate) struct CaptureConnectionExtension {
220+
tx: Arc<watch::Sender<Option<Connected>>>,
221+
}
222+
223+
impl CaptureConnectionExtension {
224+
pub(crate) fn set(&self, connected: &Connected) {
225+
self.tx.send_replace(Some(connected.clone()));
226+
}
227+
}
228+
229+
impl CaptureConnection {
230+
/// Internal API to create the tx and rx half of [`CaptureConnection`]
231+
pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
232+
let (tx, rx) = watch::channel(None);
233+
(
234+
CaptureConnectionExtension { tx: Arc::new(tx) },
235+
CaptureConnection { rx },
236+
)
237+
}
238+
239+
/// Retrieve the connection metadata, if available
240+
pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
241+
self.rx.borrow()
242+
}
243+
244+
/// Wait for the connection to be established
245+
///
246+
/// If a connection was established, this will always return `Some(...)`. If the request never
247+
/// successfully connected (e.g. DNS resolution failure), this method will never return.
248+
pub async fn wait_for_connection_metadata(
249+
&mut self,
250+
) -> impl Deref<Target = Option<Connected>> + '_ {
251+
if self.rx.borrow().is_some() {
252+
return self.rx.borrow();
253+
}
254+
let _ = self.rx.changed().await;
255+
self.rx.borrow()
256+
}
257+
}
258+
149259
pub(super) struct Extra(Box<dyn ExtraInner>);
150260

151261
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -233,7 +343,6 @@ impl Connected {
233343

234344
// Don't public expose that `Connected` is `Clone`, unsure if we want to
235345
// keep that contract...
236-
#[cfg(feature = "http2")]
237346
pub(super) fn clone(&self) -> Connected {
238347
Connected {
239348
alpn: self.alpn.clone(),
@@ -394,6 +503,7 @@ pub(super) mod sealed {
394503
#[cfg(test)]
395504
mod tests {
396505
use super::Connected;
506+
use crate::client::connect::CaptureConnection;
397507

398508
#[derive(Clone, Debug, PartialEq)]
399509
struct Ex1(usize);
@@ -452,4 +562,72 @@ mod tests {
452562
assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
453563
assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
454564
}
565+
566+
#[test]
567+
fn test_sync_capture_connection() {
568+
let (tx, rx) = CaptureConnection::new();
569+
assert!(
570+
rx.connection_metadata().is_none(),
571+
"connection has not been set"
572+
);
573+
tx.set(&Connected::new().proxy(true));
574+
assert_eq!(
575+
rx.connection_metadata()
576+
.as_ref()
577+
.expect("connected should be set")
578+
.is_proxied(),
579+
true
580+
);
581+
582+
// ensure it can be called multiple times
583+
assert_eq!(
584+
rx.connection_metadata()
585+
.as_ref()
586+
.expect("connected should be set")
587+
.is_proxied(),
588+
true
589+
);
590+
}
591+
592+
#[tokio::test]
593+
async fn async_capture_connection() {
594+
let (tx, mut rx) = CaptureConnection::new();
595+
assert!(
596+
rx.connection_metadata().is_none(),
597+
"connection has not been set"
598+
);
599+
let test_task = tokio::spawn(async move {
600+
assert_eq!(
601+
rx.wait_for_connection_metadata()
602+
.await
603+
.as_ref()
604+
.expect("connection should be set")
605+
.is_proxied(),
606+
true
607+
);
608+
// can be awaited multiple times
609+
assert!(
610+
rx.wait_for_connection_metadata().await.is_some(),
611+
"should be awaitable multiple times"
612+
);
613+
614+
assert_eq!(rx.connection_metadata().is_some(), true);
615+
});
616+
// can't be finished, we haven't set the connection yet
617+
assert_eq!(test_task.is_finished(), false);
618+
tx.set(&Connected::new().proxy(true));
619+
620+
assert!(test_task.await.is_ok());
621+
}
622+
623+
#[tokio::test]
624+
async fn capture_connection_sender_side_dropped() {
625+
let (tx, mut rx) = CaptureConnection::new();
626+
assert!(
627+
rx.connection_metadata().is_none(),
628+
"connection has not been set"
629+
);
630+
drop(tx);
631+
assert!(rx.wait_for_connection_metadata().await.is_none());
632+
}
455633
}

‎tests/client.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -1121,10 +1121,11 @@ mod dispatch_impl {
11211121
use http::Uri;
11221122
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
11231123
use tokio::net::TcpStream;
1124+
use tokio_test::block_on;
11241125

11251126
use super::support;
11261127
use hyper::body::HttpBody;
1127-
use hyper::client::connect::{Connected, Connection, HttpConnector};
1128+
use hyper::client::connect::{capture_connection, Connected, Connection, HttpConnector};
11281129
use hyper::Client;
11291130

11301131
#[test]
@@ -1533,6 +1534,37 @@ mod dispatch_impl {
15331534
assert_eq!(connects.load(Ordering::Relaxed), 0);
15341535
}
15351536

1537+
#[test]
1538+
fn capture_connection_on_client() {
1539+
let _ = pretty_env_logger::try_init();
1540+
1541+
let _rt = support::runtime();
1542+
let connector = DebugConnector::new();
1543+
1544+
let client = Client::builder().build(connector);
1545+
1546+
let server = TcpListener::bind("127.0.0.1:0").unwrap();
1547+
let addr = server.local_addr().unwrap();
1548+
thread::spawn(move || {
1549+
let mut sock = server.accept().unwrap().0;
1550+
//drop(server);
1551+
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
1552+
sock.set_write_timeout(Some(Duration::from_secs(5)))
1553+
.unwrap();
1554+
let mut buf = [0; 4096];
1555+
sock.read(&mut buf).expect("read 1");
1556+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
1557+
.expect("write 1");
1558+
});
1559+
let mut req = Request::builder()
1560+
.uri(&*format!("http://{}/a", addr))
1561+
.body(Body::empty())
1562+
.unwrap();
1563+
let captured_conn = capture_connection(&mut req);
1564+
block_on(client.request(req)).expect("200 OK");
1565+
assert!(captured_conn.connection_metadata().is_some());
1566+
}
1567+
15361568
#[test]
15371569
fn client_keep_alive_0() {
15381570
let _ = pretty_env_logger::try_init();

0 commit comments

Comments
 (0)
Please sign in to comment.