Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose API for providing a custom Connect impl and pass through the runtime feature #316

Merged
merged 3 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ jobs:
- run: cargo check -p deadpool-${{ matrix.crate }}
--features ${{ matrix.feature }}

check-integration-wasm:
name: Check integration (WebAssembly)
strategy:
fail-fast: false
matrix:
crate:
- postgres
feature:
- --features rt_tokio_1
- --features serde --features rt_tokio_1
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
target: wasm32-unknown-unknown

- run: cargo check -p deadpool-${{ matrix.crate }}
--no-default-features
${{ matrix.feature }}
--target wasm32-unknown-unknown

msrv:
name: MSRV
strategy:
Expand Down
8 changes: 6 additions & 2 deletions postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ readme = "README.md"
all-features = true

[features]
default = ["rt_tokio_1"]
default = ["runtime", "rt_tokio_1"]
runtime = ["tokio-postgres/runtime"]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this a dependency of the rt_tokio_1 and rt_async-std_1 feature instead? That way the default feature set will just stay the same and it won't break for users of rs_async-std_1 either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea! Will update the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or actually, on second thought, this won't work unfortunately. Because for example on WASM, we do want rt_tokio_1 if we're running on Tokio, just not the runtime feature.

rt_tokio_1 = ["deadpool/rt_tokio_1"]
rt_async-std_1 = ["deadpool/rt_async-std_1"]
serde = ["deadpool/serde", "dep:serde"]
Expand All @@ -27,9 +28,12 @@ serde = { package = "serde", version = "1.0", features = [
"derive",
], optional = true }
tokio = { version = "1.29", features = ["rt"] }
tokio-postgres = "0.7.9"
tokio-postgres = { version = "0.7.9", default-features = false }
tracing = "0.1.37"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }

[dev-dependencies]
config = { version = "0.14", features = ["json"] }
dotenvy = "0.15.0"
Expand Down
22 changes: 15 additions & 7 deletions postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

use std::{env, fmt, net::IpAddr, str::FromStr, time::Duration};

use tokio_postgres::config::{
ChannelBinding as PgChannelBinding, LoadBalanceHosts as PgLoadBalanceHosts,
SslMode as PgSslMode, TargetSessionAttrs as PgTargetSessionAttrs,
};

#[cfg(feature = "runtime")]
use super::Pool;
#[cfg(feature = "runtime")]
use crate::{CreatePoolError, PoolBuilder, Runtime};
#[cfg(feature = "runtime")]
use tokio_postgres::{
config::{
ChannelBinding as PgChannelBinding, LoadBalanceHosts as PgLoadBalanceHosts,
SslMode as PgSslMode, TargetSessionAttrs as PgTargetSessionAttrs,
},
tls::{MakeTlsConnect, TlsConnect},
Socket,
};

use crate::{CreatePoolError, PoolBuilder, Runtime};

use super::{Pool, PoolConfig};
use super::PoolConfig;

/// Configuration object.
///
Expand Down Expand Up @@ -94,6 +98,7 @@ pub struct Config {
pub connect_timeout: Option<Duration>,
/// See [`tokio_postgres::Config::keepalives`].
pub keepalives: Option<bool>,
#[cfg(not(target_arch = "wasm32"))]
/// See [`tokio_postgres::Config::keepalives_idle`].
pub keepalives_idle: Option<Duration>,
/// See [`tokio_postgres::Config::target_session_attrs`].
Expand Down Expand Up @@ -146,6 +151,7 @@ impl Config {
Self::default()
}

#[cfg(feature = "runtime")]
/// Creates a new [`Pool`] using this [`Config`].
///
/// # Errors
Expand All @@ -165,6 +171,7 @@ impl Config {
builder.build().map_err(CreatePoolError::Build)
}

#[cfg(feature = "runtime")]
/// Creates a new [`PoolBuilder`] using this [`Config`].
///
/// # Errors
Expand Down Expand Up @@ -264,6 +271,7 @@ impl Config {
if let Some(keepalives) = self.keepalives {
cfg.keepalives(keepalives);
}
#[cfg(not(target_arch = "wasm32"))]
if let Some(keepalives_idle) = self.keepalives_idle {
cfg.keepalives_idle(keepalives_idle);
}
Expand Down
48 changes: 39 additions & 9 deletions postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ use std::{
};

use deadpool::managed;
use tokio::{spawn, task::JoinHandle};
#[cfg(feature = "runtime")]
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio_postgres::{
tls::MakeTlsConnect, tls::TlsConnect, types::Type, Client as PgClient, Config as PgConfig,
Error, IsolationLevel, Socket, Statement, Transaction as PgTransaction,
TransactionBuilder as PgTransactionBuilder,
types::Type, Client as PgClient, Config as PgConfig, Error, IsolationLevel, Statement,
Transaction as PgTransaction, TransactionBuilder as PgTransactionBuilder,
};

#[cfg(feature = "runtime")]
use tokio_postgres::{
tls::{MakeTlsConnect, TlsConnect},
Socket,
};

pub use tokio_postgres;
Expand Down Expand Up @@ -82,6 +89,7 @@ pub struct Manager {
}

impl Manager {
#[cfg(feature = "runtime")]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need to feature gate all of this? As the feature gated functions are generic over T it shouldn't cause any harm, does it? Is the core of the issue the tokio::spawn which isn't supported by the WASM target?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this one (and its friends) need to be gated because they all initialize the Postgres connection through the tokio_postgres::Config. That's what's not supported when you don't have the runtime feature on tokio_postgres (like on WASM), since it requires tokio_postgres to establish the TCP connection on your behalf. So in a world without runtime, we can only keep around the APIs where you bring your own connection.

/// Creates a new [`Manager`] using the given [`tokio_postgres::Config`] and
/// `tls` connector.
pub fn new<T>(pg_config: tokio_postgres::Config, tls: T) -> Self
Expand All @@ -94,6 +102,7 @@ impl Manager {
Self::from_config(pg_config, tls, ManagerConfig::default())
}

#[cfg(feature = "runtime")]
/// Create a new [`Manager`] using the given [`tokio_postgres::Config`], and
/// `tls` connector and [`ManagerConfig`].
pub fn from_config<T>(pg_config: tokio_postgres::Config, tls: T, config: ManagerConfig) -> Self
Expand All @@ -103,10 +112,20 @@ impl Manager {
T::TlsConnect: Sync + Send,
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
Self::from_connect(pg_config, ConfigConnectImpl { tls }, config)
}

/// Create a new [`Manager`] using the given [`tokio_postgres::Config`], and
/// `connect` impl and [`ManagerConfig`].
pub fn from_connect(
pg_config: tokio_postgres::Config,
connect: impl Connect + 'static,
config: ManagerConfig,
) -> Self {
Self {
config,
pg_config,
connect: Box::new(ConnectImpl { tls }),
connect: Box::new(connect),
statement_caches: StatementCaches::default(),
}
}
Expand Down Expand Up @@ -157,24 +176,35 @@ impl managed::Manager for Manager {
}
}

trait Connect: Sync + Send {
/// Describes a mechanism for establishing a connection to a PostgreSQL
/// server via `tokio_postgres`.
pub trait Connect: Sync + Send {
/// Establishes a new `tokio_postgres` connection, returning
/// the associated `Client` and a `JoinHandle` to a tokio task
/// for processing the connection.
fn connect(
&self,
pg_config: &PgConfig,
) -> BoxFuture<'_, Result<(PgClient, JoinHandle<()>), Error>>;
}

struct ConnectImpl<T>
#[cfg(feature = "runtime")]
/// Provides an implementation of [`Connect`] that establishes the connection
/// using the `tokio_postgres` configuration itself.
#[derive(Debug)]
pub struct ConfigConnectImpl<T>
where
T: MakeTlsConnect<Socket> + Clone + Sync + Send + 'static,
T::Stream: Sync + Send,
T::TlsConnect: Sync + Send,
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
tls: T,
/// The TLS connector to use for the connection.
pub tls: T,
}

impl<T> Connect for ConnectImpl<T>
#[cfg(feature = "runtime")]
impl<T> Connect for ConfigConnectImpl<T>
where
T: MakeTlsConnect<Socket> + Clone + Sync + Send + 'static,
T::Stream: Sync + Send,
Expand Down
7 changes: 7 additions & 0 deletions src/managed/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
#[cfg(not(target_arch = "wasm32"))]
use std::time::{Duration, Instant};

/// Statistics regarding an object returned by the pool
#[derive(Clone, Copy, Debug)]
#[must_use]
pub struct Metrics {
#[cfg(not(target_arch = "wasm32"))]
/// The instant when this object was created
pub created: Instant,
#[cfg(not(target_arch = "wasm32"))]
/// The instant when this object was last used
pub recycled: Option<Instant>,
/// The number of times the objects was recycled
pub recycle_count: usize,
}

impl Metrics {
#[cfg(not(target_arch = "wasm32"))]
/// Access the age of this object
pub fn age(&self) -> Duration {
self.created.elapsed()
}
#[cfg(not(target_arch = "wasm32"))]
/// Get the time elapsed when this object was last used
pub fn last_used(&self) -> Duration {
self.recycled.unwrap_or(self.created).elapsed()
Expand All @@ -26,7 +31,9 @@ impl Metrics {
impl Default for Metrics {
fn default() -> Self {
Self {
#[cfg(not(target_arch = "wasm32"))]
created: Instant::now(),
#[cfg(not(target_arch = "wasm32"))]
recycled: None,
recycle_count: 0,
}
Expand Down
10 changes: 8 additions & 2 deletions src/managed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ use std::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, Weak,
},
time::{Duration, Instant},
time::Duration,
};

#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;

use deadpool_runtime::Runtime;
use tokio::sync::{Semaphore, TryAcquireError};

Expand Down Expand Up @@ -409,7 +412,10 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
}

inner.metrics.recycle_count += 1;
inner.metrics.recycled = Some(Instant::now());
#[cfg(not(target_arch = "wasm32"))]
{
inner.metrics.recycled = Some(Instant::now());
}

Ok(Some(unready_obj.ready()))
}
Expand Down