Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

psycopg2.OperationalError: PQexec not allowed during COPY BOTH when running drop_replication_slot #1456

Open
AzeemIqbal opened this issue May 23, 2022 · 8 comments

Comments

@AzeemIqbal
Copy link

AzeemIqbal commented May 23, 2022

This is a bug tracker

Please complete the following information:

  • OS: Docker image - FROM python:3.9-slim-buster
  • Psycopg version: psycopg2-binary==2.9.3
  • Python version: 3.9
  • PostgreSQL version: 11.13
  • pip version: pip 22.0.4

Describe the bug

We are trying logical replication on a Postgres RDS, and are running a docker image on ECS to consume the wal2json data. We have a timeout condition to drop the replication slot, before exiting the process, so that we don't run out of storage on the RDS. We tried using cur.consume_stream previously, and we saw that eventually the image would be running for some time without printing anything, and storage on the RDS would explode.

When running
cur.drop_replication_slot('wal2json_test_slot')

We get

File "/usr/local/lib/python3.9/site-packages/psycopg2/extras.py", line 563, in drop_replication_slot  
self.execute(command)  
psycopg2.OperationalError: PQexec not allowed during COPY BOTH

The slot does seem to drop successfully, so I don't know why we get this error in our logs.

The script:

def consume(msg):
    print("Attempting to put to Kinesis")
    kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey="default")
    msg.cursor.send_feedback(flush_lsn=msg.data_start)
    print("Success")

cur = my_connection.cursor()
try:
    print("Attempting to drop old replication slot")
    cur.drop_replication_slot('wal2json_test_slot')
    print("Old replication slot dropped")
except:
    pass
cur.create_replication_slot('wal2json_test_slot', output_plugin = 'wal2json')
cur.start_replication(slot_name = 'wal2json_test_slot', options = {'pretty-print' : 0}, decode= True)

# cur.consume_stream(consume)

last_msg_time = datetime.now()
status_interval = 10.0

print("Starting Replication")
while True:
    print("Checking for messages")
    if datetime.now() - last_msg_time > timedelta(minutes=10):
        print("Not received a message in 10 minutes, dropping replication slot and exiting")
        cur.drop_replication_slot('wal2json_test_slot')
        sys.exit()
    msg = cur.read_message()
    if msg:
        last_msg_time = msg.send_time
        print(f"Message received at: {last_msg_time.isoformat()}")
        consume(msg)
    else:
        print("No message received, waiting")
        now = datetime.now()
        timeout = status_interval - (now - cur.feedback_timestamp).total_seconds()
        sel = select([cur], [], [], max(0, timeout))
@srik506
Copy link

srik506 commented Jul 22, 2022

Were you able to resolve it? I am having the same issues with dropping a slot.

@mhkarimi1383
Copy link

I'm having same problem

@dvarrazzo
Copy link
Member

Are you people using the same connection to run queries? I understand postgres doesn't like it and you must use a separate connection which is not interested by the replication.

Or am I not understanding correctly?

@mhkarimi1383
Copy link

Are you people using the same connection to run queries? I understand postgres doesn't like it and you must use a separate connection which is not interested by the replication.

Or am I not understanding correctly?

I have a single connection and that is only for replication I am not running other queries with that

@dvarrazzo
Copy link
Member

Ah ok, so it's the internal query to cur.drop_replication_slot() to cause the problem.

@mhkarimi1383
Copy link

Ah ok, so it's the internal query to cur.drop_replication_slot() to cause the problem.

I should do it with another connection?

@dvarrazzo
Copy link
Member

I am not familiar with the replication stack, but I think that you should stop the generator before being able to run any command on the same connection.

You can try stopping the generator, or drop the slot from another connection, but the latter seems more brutal.

@marcosschroh
Copy link

I am having the same problem. In my case the query connection and the replication connection are different.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants