New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for CopyBoth mode #2924
base: main
Are you sure you want to change the base?
Conversation
To test you can do something like that. In a local DB, create a table and setup replication for it: ALTER TABLE <your-table> REPLICA IDENTITY FULL;
CREATE PUBLICATION test_publication FOR TABLE <your-table>; Then run use futures::prelude::*;
use sqlx::postgres::{PgConnectOptions, PgCopyBoth, PgPoolOptions, PgReplicationMode};
#[tokio::main]
async fn main() {
// Connection must be configured with a replication mode!
let options = PgConnectOptions::new()
.host("0.0.0.0")
.username("postgres")
.password("postgres")
.replication_mode(PgReplicationMode::Logical);
let pool = PgPoolOptions::new()
.connect_with(options)
.await
.expect("failed to connect to postgres");
let starting_pos = "0/1573178"; // CHANGE THIS
let query = format!(
r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
"test_slot", starting_pos, "test_publication",
);
let mut duplex = PgCopyBoth::new(&pool, query.as_str())
.await
.expect("start replication");
// Read data from the server.
while let Some(data) = duplex.next().await {
println!("data: {:?}", data);
}
} Then do some insert in the table, and you should receive some bytes (new row inserted). |
Anything I can do to help/make the review easier for you? 🙂 |
79e0ab7
to
419cf06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to see an API more like PgListener
which holds a Pool
internally. Maybe call it PgReplicationPool
and use the existing functionality to create child pools so it can steal permits from the parent:
impl PgReplicationPool {
pub async fn connect(url: &str, mode; PgReplicationMode) -> crate::Result<Self> {
// ...
}
pub fn from_pool(pool: PgPool, mode: PgReplicationMode) -> Self {
let pool_options = pool.options().clone();
let connect_options = pool.connect_options()
.clone()
.replication_mode(mode);
Self(pool_options.parent(pool).connect_lazy_with(connect_options))
}
pub async fn start_replication(&self, statement: &str) -> crate::Result<PgReplication> {
// ...
}
}
This is also missing a potential use-case where the user doesn't want to involve a Pool
at all.
sqlx-postgres/src/copy.rs
Outdated
pub async fn pg_copy_both( | ||
pool: &Pool<Postgres>, | ||
statement: &str, | ||
) -> Result<(PgCopyBothSender, PgCopyBothReceiver), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Splitting it into sender and receiver can be done at a higher level, IMO. You're asking the user to give up a lot of control in what is already a niche, low-level API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what way do we give up a lot of control with this approach?
Having two handle seems more flexible (user can group them back into a structure if needed).
+
I thought that it was nice to have a "channel-like" API here as you have the upstream stream of command and the downstream stream of data.
If I go the single-object way I either need to have public fields of an into_sth
to split it it two, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I've made a structure with two public field (so that the caller can deconstruct if needed).
Does it sounds good to you or you had something else in mind?
sqlx-postgres/src/copy.rs
Outdated
/// Send a chunk of `COPY` data. | ||
pub async fn send(&self, data: impl Into<Vec<u8>>) -> Result<()> { | ||
self.0 | ||
.send_async(PgCopyBothCommand::CopyData(data.into())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason not to do the sender/receiver split at this level, send_async()
is not strictly cancel-safe: zesterer/flume#104 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum good point!
How can I avoid/solve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestion/pointer @abonander ?
sqlx-postgres/src/copy.rs
Outdated
} | ||
|
||
/// Signal that the CopyBoth mode is complete. | ||
pub async fn finish(self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the user get the CommandComplete
responses with the number of rows changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure there is a CommandComplete
involved in the case of CopyBoth
: the command is never really complete, it's a stream of WAL events.
Seems like the graceful shutdown is an exchange of CopyDone
message.
[Both the backend and the frontend may then send CopyData messages until either end sends a CopyDone message. After the client sends a CopyDone message, the connection goes from copy-both mode to copy-out mode, and the client may not send any more CopyData messages. Similarly, when the server sends a CopyDone message, the connection goes into copy-in mode, and the server may not send any more CopyData messages. After both sides have sent a CopyDone message, the copy mode is terminated, and the backend reverts to the command-processing mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough review 🙏
I would prefer to see an API more like
PgListener
which holds aPool
internally. Maybe call itPgReplicationPool
and use the existing functionality to create child pools so it can steal permits from the parent:
Ho interesting, didn't know about this "parent" thing. Will give it a try.
What does it mean to "steal permits from the parent"?
Your API definitely looks better, I'll try to rework the PR to go that way.
Not sure I'll have time to address the changes right now, but I'll try to get back to it next week if possible
sqlx-postgres/src/copy.rs
Outdated
pub async fn pg_copy_both( | ||
pool: &Pool<Postgres>, | ||
statement: &str, | ||
) -> Result<(PgCopyBothSender, PgCopyBothReceiver), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what way do we give up a lot of control with this approach?
Having two handle seems more flexible (user can group them back into a structure if needed).
+
I thought that it was nice to have a "channel-like" API here as you have the upstream stream of command and the downstream stream of data.
If I go the single-object way I either need to have public fields of an into_sth
to split it it two, right?
sqlx-postgres/src/copy.rs
Outdated
/// Send a chunk of `COPY` data. | ||
pub async fn send(&self, data: impl Into<Vec<u8>>) -> Result<()> { | ||
self.0 | ||
.send_async(PgCopyBothCommand::CopyData(data.into())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum good point!
How can I avoid/solve this?
Cf. https://www.postgresql.org/docs/current/protocol-replication.html Basically, we need to instruct the backend to go into logical replication walsender mode in order to be able to use commands like `CREATE_REPLICATION_SLOT` which are necessary to be able to seamlessly transition from an initial snapshot into the WAL streaming.
Copy-both mode is initiated when a backend in walsender mode executes a START_REPLICATION statement.
Also remove an unecessary mut
Don't use sqlx_postgres as its API isn't stable. Use sqlx itself as a dev-dependency instead.
419cf06
to
0bdb8d9
Compare
Use something similar to PgListener
0d1bcf6
to
3c8008c
Compare
When one pool is a child of another, they effectively share a total To acquire a connection from the pool, the When a child pool's semaphore is exhausted, but it is below This does mean that the child pool can potentially exhaust the permits of the parent pool, so it's important that the sum of all child pools' I thought about just having the child pool share the same semaphore as the parent pool, but that would have been a problem because the child pool couldn't keep connections in its own idle queue while returning permits to the parent, as other pools sharing the same parent wouldn't know about them, so the total number of open connections could easily exceed the maximum. If exhaustion ends up becoming a problem, I think I would try addressing that by having the child pool return a permit to the parent when it closes a connection, instead of its own semaphore. And I would recommend having a short |
Thanks for the detailed explanation
I don't think exhaustion would be an issue in this use case, since the replication only uses a single (long-lived) connection. |
Does the groundwork to be able to talk replication with PostgreSQL, i.e. support "replication" connection and CopyBoth "protocol".
Fixes: #2923