Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using PGReplicationStream to consume and output the data, it always throws an error after running for a while, showing "PGStream is closed" #3152

Open
jackin853 opened this issue Mar 5, 2024 · 4 comments

Comments

@jackin853
Copy link

Please read https://stackoverflow.com/help/minimal-reproducible-example

Describe the issue
I created a logical replication slot corresponding to a database table. When I simulate a large amount of data, such as 300,000 records, using PGReplicationStream to consume and output the data, it always throws an error after running for a while, showing "PGStream is closed" and then the program stops running. I captured the packets and found that the client always initiates a connection close, which is strange. Why does the client close the connection?

Here is an example code:

String url = "jdbc:postgresql://localhost:5432/test";
Properties props = new Properties();
PGProperty.USER.set(props, "postgres");
PGProperty.PASSWORD.set(props, "postgres");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(url, props);
PGConnection replConnection = con.unwrap(PGConnection.class);

replConnection.getReplicationAPI()
    .createReplicationSlot()
    .logical()
    .withSlotName("demo_logical_slot")
    .withOutputPlugin("test_decoding")
    .make();

PGReplicationStream stream =
    replConnection.getReplicationAPI()
    .replicationStream()
    .logical()
    .withSlotName("demo_logical_slot")
    .withSlotOption("include-xids", false)
    .withSlotOption("skip-empty-xacts", true)
    .withStatusInterval(20, TimeUnit.SECONDS)
    .start();

while (true) {
    //non blocking receive message
    ByteBuffer msg = stream.readPending();

    if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10 L);
        continue;
    }

    int offset = msg.arrayOffset();
    byte[] source = msg.array();
    int length = source.length - offset;
    System.out.println(new String(source, offset, length));

    //feedback
    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());
}

Driver Version?
43.3.3
Java Version?
1.8
OS Version?
centos7 and windows 10 all the same issue
PostgreSQL Version?
11.19

@davecramer
Copy link
Member

I presume you mean you are using 42.3.3
First thing I'd do is upgrade to the most recent version
As for why it is closing. I have no clue. It should not be

@jackin853
Copy link
Author

I used a SQL statement to create a large amount of data in a transaction: "insert into t_test SELECT generate_series(1,500000) as key,repeat( chr(int4(random()*26)+65),4);". If I only insert one row at a time, there is no problem. However, if I add the following statement before the line "ByteBuffer msg = stream.readPending();", the error occurs less frequently: "if(replConnection == null) { System.out.println("current connection is null"); }".

@davecramer
Copy link
Member

First off, let's make sure you are using the latest code. Can you upgrade to 42.7.2?

@davecramer
Copy link
Member

Just ran your test above on my mac laptop and it ran fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants