Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for CopyBoth mode #2924

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

grim7reaper
Copy link

Does the groundwork to be able to talk replication with PostgreSQL, i.e. support "replication" connection and CopyBoth "protocol".

Fixes: #2923

@grim7reaper
Copy link
Author

To test you can do something like that.

In a local DB, create a table and setup replication for it:

ALTER TABLE <your-table> REPLICA IDENTITY FULL;
CREATE PUBLICATION test_publication FOR TABLE <your-table>;

Then run
use a snippet like that:

use futures::prelude::*;
use sqlx::postgres::{PgConnectOptions, PgCopyBoth, PgPoolOptions, PgReplicationMode};

#[tokio::main]
async fn main() {
    // Connection must be configured with a replication mode!
    let options = PgConnectOptions::new()
        .host("0.0.0.0")
        .username("postgres")
        .password("postgres")
        .replication_mode(PgReplicationMode::Logical);

    let pool = PgPoolOptions::new()
        .connect_with(options)
        .await
        .expect("failed to connect to postgres");
    let starting_pos = "0/1573178"; // CHANGE THIS
    let query = format!(
        r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
        "test_slot", starting_pos, "test_publication",
    );
    let mut duplex = PgCopyBoth::new(&pool, query.as_str())
        .await
        .expect("start replication");
    // Read data from the server.
    while let Some(data) = duplex.next().await {
        println!("data: {:?}", data);
    }
}

Then do some insert in the table, and you should receive some bytes (new row inserted).

@grim7reaper
Copy link
Author

Anything I can do to help/make the review easier for you? 🙂

Copy link
Collaborator

@abonander abonander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to see an API more like PgListener which holds a Pool internally. Maybe call it PgReplicationPool and use the existing functionality to create child pools so it can steal permits from the parent:

impl PgReplicationPool {
    pub async fn connect(url: &str, mode; PgReplicationMode) -> crate::Result<Self> {
        // ...
    }

    pub fn from_pool(pool: PgPool, mode: PgReplicationMode) -> Self {
        let pool_options = pool.options().clone();
        let connect_options = pool.connect_options()
            .clone()
            .replication_mode(mode);

        Self(pool_options.parent(pool).connect_lazy_with(connect_options))
    }

    pub async fn start_replication(&self, statement: &str) -> crate::Result<PgReplication> {
        // ...
    }
}

This is also missing a potential use-case where the user doesn't want to involve a Pool at all.

sqlx-postgres/src/copy.rs Outdated Show resolved Hide resolved
pub async fn pg_copy_both(
pool: &Pool<Postgres>,
statement: &str,
) -> Result<(PgCopyBothSender, PgCopyBothReceiver), Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting it into sender and receiver can be done at a higher level, IMO. You're asking the user to give up a lot of control in what is already a niche, low-level API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what way do we give up a lot of control with this approach?
Having two handle seems more flexible (user can group them back into a structure if needed).
+
I thought that it was nice to have a "channel-like" API here as you have the upstream stream of command and the downstream stream of data.


If I go the single-object way I either need to have public fields of an into_sth to split it it two, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I've made a structure with two public field (so that the caller can deconstruct if needed).
Does it sounds good to you or you had something else in mind?

sqlx-postgres/src/options/mod.rs Outdated Show resolved Hide resolved
sqlx-postgres/src/advisory_lock.rs Outdated Show resolved Hide resolved
/// Send a chunk of `COPY` data.
pub async fn send(&self, data: impl Into<Vec<u8>>) -> Result<()> {
self.0
.send_async(PgCopyBothCommand::CopyData(data.into()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason not to do the sender/receiver split at this level, send_async() is not strictly cancel-safe: zesterer/flume#104 (comment)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum good point!

How can I avoid/solve this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion/pointer @abonander ?

}

/// Signal that the CopyBoth mode is complete.
pub async fn finish(self) -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the user get the CommandComplete responses with the number of rows changed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there is a CommandComplete involved in the case of CopyBoth: the command is never really complete, it's a stream of WAL events.
Seems like the graceful shutdown is an exchange of CopyDone message.

[Both the backend and the frontend may then send CopyData messages until either end sends a CopyDone message. After the client sends a CopyDone message, the connection goes from copy-both mode to copy-out mode, and the client may not send any more CopyData messages. Similarly, when the server sends a CopyDone message, the connection goes into copy-in mode, and the server may not send any more CopyData messages. After both sides have sent a CopyDone message, the copy mode is terminated, and the backend reverts to the command-processing mode.

sqlx-postgres/src/options/mod.rs Outdated Show resolved Hide resolved
Copy link
Author

@grim7reaper grim7reaper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough review 🙏

I would prefer to see an API more like PgListener which holds a Pool internally. Maybe call it PgReplicationPool and use the existing functionality to create child pools so it can steal permits from the parent:

Ho interesting, didn't know about this "parent" thing. Will give it a try.
What does it mean to "steal permits from the parent"?

Your API definitely looks better, I'll try to rework the PR to go that way.
Not sure I'll have time to address the changes right now, but I'll try to get back to it next week if possible

sqlx-postgres/src/advisory_lock.rs Outdated Show resolved Hide resolved
sqlx-postgres/src/copy.rs Outdated Show resolved Hide resolved
pub async fn pg_copy_both(
pool: &Pool<Postgres>,
statement: &str,
) -> Result<(PgCopyBothSender, PgCopyBothReceiver), Error> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what way do we give up a lot of control with this approach?
Having two handle seems more flexible (user can group them back into a structure if needed).
+
I thought that it was nice to have a "channel-like" API here as you have the upstream stream of command and the downstream stream of data.


If I go the single-object way I either need to have public fields of an into_sth to split it it two, right?

/// Send a chunk of `COPY` data.
pub async fn send(&self, data: impl Into<Vec<u8>>) -> Result<()> {
self.0
.send_async(PgCopyBothCommand::CopyData(data.into()))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum good point!

How can I avoid/solve this?

sqlx-postgres/src/options/mod.rs Outdated Show resolved Hide resolved
Cf. https://www.postgresql.org/docs/current/protocol-replication.html

Basically, we need to instruct the backend to go into logical
replication walsender mode in order to be able to use commands like
`CREATE_REPLICATION_SLOT` which are necessary to be able to seamlessly
transition from an initial snapshot into the WAL streaming.
Copy-both mode is initiated when a backend in walsender mode executes a
START_REPLICATION statement.
Don't use sqlx_postgres as its API isn't stable.
Use sqlx itself as a dev-dependency instead.
Use something similar to PgListener
@abonander
Copy link
Collaborator

What does it mean to "steal permits from the parent"?

When one pool is a child of another, they effectively share a total max_connections.

To acquire a connection from the pool, the acquire() call must first get a permit from the pool semaphore, which represents the right to have a connection. It may then use this permit either to pop a connection from the idle queue, or open a new one if the pool size is below max_connections.

When a child pool's semaphore is exhausted, but it is below max_connections, it attempts to acquire a semaphore permit from the parent pool. When the checked-out connection using that permit is returned to the child pool, the permit is added to the child pool's semaphore, and further calls to acquire() behave normally. When a child pool is closed, any permits remaining in its semaphore are returned to the parent pool.

This does mean that the child pool can potentially exhaust the permits of the parent pool, so it's important that the sum of all child pools' max_connections are not larger than the parent's. This wasn't really a concern when designing the feature because it was only meant to be a part of #[sqlx::test]'s functionality, to allow multiple tests to run concurrently without exhausting the available connections on the server, so child pools were meant to be short-lived.

I thought about just having the child pool share the same semaphore as the parent pool, but that would have been a problem because the child pool couldn't keep connections in its own idle queue while returning permits to the parent, as other pools sharing the same parent wouldn't know about them, so the total number of open connections could easily exceed the maximum.

If exhaustion ends up becoming a problem, I think I would try addressing that by having the child pool return a permit to the parent when it closes a connection, instead of its own semaphore. And I would recommend having a short idle_timeout to make sure child pools can't hog permits for too long.

@grim7reaper
Copy link
Author

Thanks for the detailed explanation

If exhaustion ends up becoming a problem, I think I would try addressing that by having the child pool return a permit to the parent when it closes a connection, instead of its own semaphore. And I would recommend having a short idle_timeout to make sure child pools can't hog permits for too long.

I don't think exhaustion would be an issue in this use case, since the replication only uses a single (long-lived) connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for CopyBoth mode
2 participants