Skip to content

Commit

Permalink
feat: Add dynamic_* connection methods to Pool/PoolOptions
Browse files Browse the repository at this point in the history
This allows external updates of the ConnectionOptions used when a new
connection needs to be opened for the pool.  The primary use case
is to support dynamically updated (read: rotated) credentials used
by systems like AWS RDS.

Closes launchbadge#445
  • Loading branch information
moatra committed Sep 9, 2022
1 parent ddffaa7 commit ab17025
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 10 deletions.
16 changes: 12 additions & 4 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
use std::cmp;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;

use crate::pool::options::PoolConnectionMetadata;
Expand All @@ -20,7 +20,7 @@ use futures_util::FutureExt;
use std::time::{Duration, Instant};

pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) connect_options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
Expand All @@ -33,7 +33,7 @@ pub(crate) struct PoolInner<DB: Database> {
impl<DB: Database> PoolInner<DB> {
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
connect_options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Arc<Self> {
let capacity = options.max_connections as usize;

Expand Down Expand Up @@ -294,7 +294,15 @@ impl<DB: Database> PoolInner<DB> {

// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
match sqlx_rt::timeout(
timeout,
self.connect_options
.read()
.expect("external config updater panicked")
.connect(),
)
.await
{
// successfully established connection
Ok(Ok(mut raw)) => {
// See comment on `PoolOptions::after_connect`
Expand Down
51 changes: 47 additions & 4 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -282,6 +282,23 @@ impl<DB: Database> Pool<DB> {
PoolOptions::<DB>::new().connect_with(options).await
}

/// Create a new connection pool with a default pool configuration and
/// the given `ConnectOptions`, and immediately establish one connection.
///
/// The default configuration is mainly suited for testing and light-duty applications.
/// For production applications, you'll likely want to make at least few tweaks.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
///
/// See [`PoolOptions::new()`] for details.
pub async fn dynamic_connect_with(
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Result<Self, Error> {
PoolOptions::<DB>::new().dynamic_connect_with(options).await
}

/// Create a new connection pool with a default pool configuration and
/// the given connection URL.
///
Expand Down Expand Up @@ -315,6 +332,25 @@ impl<DB: Database> Pool<DB> {
PoolOptions::<DB>::new().connect_lazy_with(options)
}

/// Create a new connection pool with a default pool configuration and
/// the given `ConnectOptions`.
///
/// The pool will establish connections only as needed.
///
/// The default configuration is mainly suited for testing and light-duty applications.
/// For production applications, you'll likely want to make at least few tweaks.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
///
/// See [`PoolOptions::new()`] for details.
pub fn dynamic_connect_lazy_with(
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Self {
PoolOptions::<DB>::new().dynamic_connect_lazy_with(options)
}

/// Retrieves a connection from the pool.
///
/// The total time this method is allowed to execute is capped by
Expand Down Expand Up @@ -490,8 +526,11 @@ impl<DB: Database> Pool<DB> {
}

/// Get the connection options for this pool
pub fn connect_options(&self) -> &<DB::Connection as Connection>::Options {
&self.0.connect_options
pub fn connect_options(&self) -> RwLockReadGuard<<DB::Connection as Connection>::Options> {
self.0
.connect_options
.read()
.expect("external config updater panicked")
}

/// Get the options for this pool
Expand All @@ -514,7 +553,11 @@ impl Pool<Any> {
///
/// Determined by the connection URL.
pub fn any_kind(&self) -> AnyKind {
self.0.connect_options.kind()
self.0
.connect_options
.read()
.expect("external config updater panicked")
.kind()
}
}

Expand Down
35 changes: 33 additions & 2 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::pool::inner::PoolInner;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

/// Configuration options for [`Pool`][super::Pool].
Expand Down Expand Up @@ -439,7 +439,23 @@ impl<DB: Database> PoolOptions<DB> {
/// The total number of connections opened is <code>min(1, [min_connections][Self::min_connections])</code>.
pub async fn connect_with(
self,
options: <DB::Connection as Connection>::Options,
options: RwLock<<DB::Connection as Connection>::Options>,
) -> Result<Pool<DB>, Error> {
self.dynamic_connect_with(Arc::new(RwLock::new(options)))
}

/// Create a new pool from this `PoolOptions` and immediately open at least one connection.
///
/// This ensures the configuration is correct.
///
/// The total number of connections opened is <code>min(1, [min_connections][Self::min_connections])</code>.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
pub async fn dynamic_connect_with(
self,
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Result<Pool<DB>, Error> {
// Don't take longer than `acquire_timeout` starting from when this is called.
let deadline = Instant::now() + self.acquire_timeout;
Expand Down Expand Up @@ -480,6 +496,21 @@ impl<DB: Database> PoolOptions<DB> {
/// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
/// optimistically establish that many connections for the pool.
pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
self.dynamic_connect_lazy_with(Arc::new(RwLock::new(options)))
}

/// Create a new pool from this `PoolOptions`, but don't open any connections right now.
///
/// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
/// optimistically establish that many connections for the pool.
///
/// The `RwLock`-guarded ConnectionOptions allows external updates to the options used when the
/// pool opens any new connections. This is useful, for example, if the credentials needed
/// to connect to the database are routinely updated.
pub fn dynamic_connect_lazy_with(
self,
options: Arc<RwLock<<DB::Connection as Connection>::Options>>,
) -> Pool<DB> {
// `min_connections` is guaranteed by the idle reaper now.
Pool(PoolInner::new_arc(self, options))
}
Expand Down

0 comments on commit ab17025

Please sign in to comment.