Skip to content

Commit

Permalink
feat: Add dynamic_* connection methods to Pool/PoolOptions
Browse files Browse the repository at this point in the history
This allows external updates of the ConnectionOptions used when a new
connection needs to be opened for the pool.  The primary use case
is to support dynamically updated (read: rotated) credentials used
by systems like AWS RDS.

Closes launchbadge#445
  • Loading branch information
moatra committed Sep 9, 2022
1 parent ddffaa7 commit 27c82c3
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 9 deletions.
16 changes: 12 additions & 4 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
use std::cmp;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::pool::options::PoolConnectionMetadata;
Expand All @@ -20,7 +20,7 @@ use futures_util::FutureExt;
use std::time::{Duration, Instant};

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) connect_options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
Expand All @@ -33,7 +33,7 @@ pub(crate) struct PoolInner<DB: Database> {
impl<DB: Database> PoolInner<DB> {
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
connect_options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Arc<Self> {
let capacity = options.max_connections as usize;

Expand Down Expand Up @@ -292,9 +292,17 @@ impl<DB: Database> PoolInner<DB> {
loop {
let timeout = deadline_as_timeout::<DB>(deadline)?;

// clone the options so they can be used across async await points without
// having to hold the RwLock's read guard
let opts = self
.connect_options
.read()
.expect("external config updater panicked")
.clone();

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
match sqlx_rt::timeout(timeout, opts.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
Expand Down
77 changes: 73 additions & 4 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ use futures_core::FusedFuture;
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -282,6 +283,23 @@ impl<DB: Database> Pool<DB> {
PoolOptions::<DB>::new().connect_with(options).await
}

/// Create a new connection pool with a default pool configuration and
/// the given `ConnectOptions`, and immediately establish one connection.
///
/// The default configuration is mainly suited for testing and light-duty applications.
/// For production applications, you'll likely want to make at least few tweaks.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
///
/// See [`PoolOptions::new()`] for details.
pub async fn dynamic_connect_with(
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Result<Self, Error> {
PoolOptions::<DB>::new().dynamic_connect_with(options).await
}

/// Create a new connection pool with a default pool configuration and
/// the given connection URL.
///
Expand Down Expand Up @@ -315,6 +333,25 @@ impl<DB: Database> Pool<DB> {
PoolOptions::<DB>::new().connect_lazy_with(options)
}

/// Create a new connection pool with a default pool configuration and
/// the given `ConnectOptions`.
///
/// The pool will establish connections only as needed.
///
/// The default configuration is mainly suited for testing and light-duty applications.
/// For production applications, you'll likely want to make at least few tweaks.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
///
/// See [`PoolOptions::new()`] for details.
pub fn dynamic_connect_lazy_with(
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Self {
PoolOptions::<DB>::new().dynamic_connect_lazy_with(options)
}

/// Retrieves a connection from the pool.
///
/// The total time this method is allowed to execute is capped by
Expand Down Expand Up @@ -490,8 +527,36 @@ impl<DB: Database> Pool<DB> {
}

/// Get the connection options for this pool
pub fn connect_options(&self) -> &<DB::Connection as Connection>::Options {
&self.0.connect_options
pub fn connect_options(&self) -> RwLockReadGuard<'_, <DB::Connection as Connection>::Options> {
self.0
.connect_options
.read()
.expect("external config updater panicked")
}

/// Get the connection options for this pool
pub fn connect_options_mut(
&self,
) -> RwLockWriteGuard<'_, <DB::Connection as Connection>::Options> {
self.0
.connect_options
.write()
.expect("external config udpater panicked or already locked")
}

/// Callback-based updater for this pool's connection options
pub fn update_connect_options<
Fn: FnOnce(&mut <DB::Connection as Connection>::Options) -> (),
>(
&self,
callback: Fn,
) {
let mut guard = self
.0
.connect_options
.write()
.expect("external config updater panicked or already locked");
callback(guard.deref_mut())
}

/// Get the options for this pool
Expand All @@ -514,7 +579,11 @@ impl Pool<Any> {
///
/// Determined by the connection URL.
pub fn any_kind(&self) -> AnyKind {
self.0.connect_options.kind()
self.0
.connect_options
.read()
.expect("external config updater panicked")
.kind()
}
}

Expand Down
34 changes: 33 additions & 1 deletion sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::pool::inner::PoolInner;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

/// Configuration options for [`Pool`][super::Pool].
Expand Down Expand Up @@ -440,6 +440,23 @@ impl<DB: Database> PoolOptions<DB> {
pub async fn connect_with(
self,
options: <DB::Connection as Connection>::Options,
) -> Result<Pool<DB>, Error> {
self.dynamic_connect_with(Arc::new(RwLock::new(options)))
.await
}

/// Create a new pool from this `PoolOptions` and immediately open at least one connection.
///
/// This ensures the configuration is correct.
///
/// The total number of connections opened is <code>min(1, [min_connections][Self::min_connections])</code>.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
pub async fn dynamic_connect_with(
self,
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Result<Pool<DB>, Error> {
// Don't take longer than `acquire_timeout` starting from when this is called.
let deadline = Instant::now() + self.acquire_timeout;
Expand Down Expand Up @@ -480,6 +497,21 @@ impl<DB: Database> PoolOptions<DB> {
/// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
/// optimistically establish that many connections for the pool.
pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
self.dynamic_connect_lazy_with(Arc::new(RwLock::new(options)))
}

/// Create a new pool from this `PoolOptions`, but don't open any connections right now.
///
/// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
/// optimistically establish that many connections for the pool.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
pub fn dynamic_connect_lazy_with(
self,
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Pool<DB> {
// `min_connections` is guaranteed by the idle reaper now.
Pool(PoolInner::new_arc(self, options))
}
Expand Down

0 comments on commit 27c82c3

Please sign in to comment.