Skip to content

Commit

Permalink
split receiver and sender, this makes for a better API
Browse files Browse the repository at this point in the history
  • Loading branch information
grim7reaper committed Jan 8, 2024
1 parent d104e2e commit 79e0ab7
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 69 deletions.
136 changes: 68 additions & 68 deletions sqlx-postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,77 +373,77 @@ enum PgCopyBothCommand {
CopyDone { from_client: bool },
}

#[derive(Clone)]
pub struct PgCopyBoth {
recv_stream: flume::r#async::RecvStream<'static, Result<Bytes>>,
send_tx: flume::Sender<PgCopyBothCommand>,
}

impl PgCopyBoth {
// Open a duplex connection allowing high-speed bulk data transfer to and from the server.
//
// # Example
//
// ```rust,no_run
// # use sqlx_postgres::{
// # PgCopyBoth, PgConnectOptions, PgPoolOptions, PgReplicationMode,
// # };
// // Connection must be configured with a replication mode!
// let options = PgConnectOptions::new()
// .host("0.0.0.0")
// .replication_mode(PgReplicationMode::Logical);
//
// let pool = PgPoolOptions::new()
// .connect_with(options)
// .await
// .expect("failed to connect to postgres");
//
// let query = format!(
// r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
// "test_slot", "0/1573178", "test_publication",
// );
// let mut duplex = PgCopyBoth::new(&pool, query.as_str())
// .await
// .expect("start replication");
// // Read data from the server.
// while let Some(data) = duplex.next().await {
// println!("data: {:?}", data);
// // And send some back (e.g. keepalive).
// duplex.send(Vec::new()).await?;
// }
// // Connection closed.
// ```
pub async fn new(pool: &Pool<Postgres>, statement: &str) -> Result<Self, Error> {
// Setup upstream/downstream channels.
let (recv_tx, recv_rx) = flume::bounded(1);
let (send_tx, send_rx) = flume::bounded(1);

crate::rt::spawn({
let pool = pool.clone();
async move {
if let Err(err) = copy_both_handler(pool, recv_tx.clone(), send_rx).await {
let _ignored = recv_tx.send_async(Err(err)).await;
}
pub struct PgCopyBothSender(flume::Sender<PgCopyBothCommand>);
pub struct PgCopyBothReceiver(flume::r#async::RecvStream<'static, Result<Bytes>>);

// Open a duplex connection allowing high-speed bulk data transfer to and from the server.
//
// # Example
//
// ```rust,no_run
// # use sqlx_postgres::{
// # pg_copy_both, PgConnectOptions, PgPoolOptions, PgReplicationMode,
// # };
// // Connection must be configured with a replication mode!
// let options = PgConnectOptions::new()
// .host("0.0.0.0")
// .replication_mode(PgReplicationMode::Logical);
//
// let pool = PgPoolOptions::new()
// .connect_with(options)
// .await
// .expect("failed to connect to postgres");
//
// let query = format!(
// r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
// "test_slot", "0/1573178", "test_publication",
// );
// let (copy_tx, copy_recv) = pg_copy_both(&pool, query.as_str())
// .await
// .expect("start replication");
// // Read data from the server.
// while let Some(data) = copy_recv.next().await {
// println!("data: {:?}", data);
// // And send some back (e.g. keepalive).
// copy_tx.send(Vec::new()).await?;
// }
// // Connection closed.
// ```
pub async fn pg_copy_both(
pool: &Pool<Postgres>,
statement: &str,
) -> Result<(PgCopyBothSender, PgCopyBothReceiver), Error> {
// Setup upstream/downstream channels.
let (recv_tx, recv_rx) = flume::bounded(1);
let (send_tx, send_rx) = flume::bounded(1);

crate::rt::spawn({
let pool = pool.clone();
async move {
if let Err(err) = copy_both_handler(pool, recv_tx.clone(), send_rx).await {
let _ignored = recv_tx.send_async(Err(err)).await;
}
});
}
});

// Execute the given statement to switch into CopyBoth mode.
let mut buf = Vec::new();
Query(statement).encode(&mut buf);
send_tx
.send_async(PgCopyBothCommand::Begin(buf))
.await
.map_err(|_err| Error::WorkerCrashed)?;
// Execute the given statement to switch into CopyBoth mode.
let mut buf = Vec::new();
Query(statement).encode(&mut buf);
send_tx
.send_async(PgCopyBothCommand::Begin(buf))
.await
.map_err(|_err| Error::WorkerCrashed)?;

Ok(Self {
recv_stream: recv_rx.into_stream(),
send_tx,
})
}
Ok((
PgCopyBothSender(send_tx),
PgCopyBothReceiver(recv_rx.into_stream()),
))
}

impl PgCopyBothSender {
/// Send a chunk of `COPY` data.
pub async fn send(&self, data: impl Into<Vec<u8>>) -> Result<()> {
self.send_tx
self.0
.send_async(PgCopyBothCommand::CopyData(data.into()))
.await
.map_err(|_err| Error::WorkerCrashed)?;
Expand All @@ -453,7 +453,7 @@ impl PgCopyBoth {

/// Signal that the CopyBoth mode is complete.
pub async fn finish(self) -> Result<()> {
self.send_tx
self.0
.send_async(PgCopyBothCommand::CopyDone { from_client: true })
.await
.map_err(|_err| Error::WorkerCrashed)?;
Expand All @@ -462,11 +462,11 @@ impl PgCopyBoth {
}
}

impl Stream for PgCopyBoth {
impl Stream for PgCopyBothReceiver {
type Item = Result<Bytes, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.recv_stream).poll_next(cx)
Pin::new(&mut self.0).poll_next(cx)
}
}

Expand Down
2 changes: 1 addition & 1 deletion sqlx-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use advisory_lock::{PgAdvisoryLock, PgAdvisoryLockGuard, PgAdvisoryLockKey};
pub use arguments::{PgArgumentBuffer, PgArguments};
pub use column::PgColumn;
pub use connection::PgConnection;
pub use copy::{PgCopyBoth, PgCopyIn};
pub use copy::{pg_copy_both, PgCopyBothReceiver, PgCopyBothSender, PgCopyIn};
pub use database::Postgres;
pub use error::{PgDatabaseError, PgErrorPosition};
pub use listener::{PgListener, PgNotification};
Expand Down

0 comments on commit 79e0ab7

Please sign in to comment.