Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent connection problems with use of poll #1248

Open
ghost opened this issue Mar 2, 2021 · 22 comments
Open

Concurrent connection problems with use of poll #1248

ghost opened this issue Mar 2, 2021 · 22 comments

Comments

@ghost
Copy link

ghost commented Mar 2, 2021

Python: Windows 32-bit 2.7.15 & 3.7.9
Psycopg2: 2.7.3.2 upgraded to 2.8.2

Description:
We have a program with multiple threads. We use SQLAlchemy as a way to setup the connections and build the queries.
We have 1 connection pool for all the threads. This used to work fine, when we were using psycopg2 2.7.3.2.

Now we are upgrading all our dependencies which includes psycopg2 and are noticing some weird behavior.
We have narrowed it down to 2 things.

  • The upgrade to the newer psycopg2 version
  • the use of connection.poll in one of our threads

If we use connection.poll it breaks with weird errors like SSL verification error, SSL wrong version nummer error, file descriptor cannot be a negative integer error or even deadlocks.
We don't seem to have these problems in threads where we only run plain-queries.
It also doesn't only break the connection in the threads that is executing the poll, but other threads aswell.

@dvarrazzo
Copy link
Member

dvarrazzo commented Mar 2, 2021

What are you using connection.poll() for? Are you using it during connection?

@ghost
Copy link
Author

ghost commented Mar 2, 2021

Yeah. The thread that uses the connection.poll is for database notifications with the LISTEN command.
We pull out the psycopg2 connection from SQLAlchemy and use the connection.poll to receive the notifications from the database server.
Like the example shows here: https://www.psycopg.org/docs/advanced.html#asynchronous-notifications

@dvarrazzo
Copy link
Member

If you are receiving SSL connection errors I think it means you are using poll() on a connection that is not completed yet.

I think your application is polling indiscriminately other connections instead of just the one used for notifications?

I don't think there is any change related to the connection procedure between the versions you mention. However the libssl used has probably been upgraded: you should check the NEWS file to know the version change.

Can you do just one of the two things and work out better if it's a connection error or if it's about your use of poll()?

@ghost
Copy link
Author

ghost commented Mar 2, 2021

We have 2 versions of our program, both with the poll and threads. The only difference between the 2 is the version of psycopg2.

We have been narrowing down to find the specific issue. So we cut a lot of code and eventually narrowed it down to the use of poll and the version of psycopg2. So if we use the same script in python with psycopg2: 2.7.3.2, it works. When we use it in Python with 2.8.2, it breaks.
Edit: My collegue has tested a bit further, he confirmed it happens in versions higher than 2.7.7

If it helps I could post our test script.

@dvarrazzo
Copy link
Member

Yes please: being able to reproduce the problem is useful

@ghost
Copy link
Author

ghost commented Mar 2, 2021

Python 2.7.11
pip==20.2.2
psycopg2==2.8.6
setuptools==44.1.1
SQLAlchemy==1.3.20
wheel==0.35.1
import select
from threading import Thread
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class PollerThread(Thread):
    def __init__(self, connection):
        Thread.__init__(self)
        self.setDaemon(1)
        self.connection = connection
    def run(self):
        while True:
            result = select.select([self.connection], [], [], 3.0)
            if result != ([], [], []):
                self.connection.poll()


if __name__ == "__main__":
    dbUser = ""
    dbPassword = ""
    dbServer = ""
    dbName = ""
    fullDatabaseUrl = str("postgresql+psycopg2://%s:%s@%s/%s" % (dbUser, dbPassword, dbServer, dbName))

    Session = sessionmaker(autocommit=True)
    engine = create_engine(fullDatabaseUrl)
    Session.configure(bind=engine)
    session = Session()
    connection = session.connection().connection.connection  # <- psycopg2 connection object
    dbEventListener = PollerThread(connection)
    dbEventListener.start()
    while True:
        result = session.execute("SELECT * FROM table1")
        data = result.fetchall()
        time.sleep(0.1)

@ghost
Copy link
Author

ghost commented Mar 2, 2021

Also, thank you for your lighting-quick responses.

@dvarrazzo
Copy link
Member

Does the problem also manifest without using sqlalchemy? Could you provide an use case which doesn't use it?

@ghost
Copy link
Author

ghost commented Mar 2, 2021

Yes. Just tried it with psycopg2 only. The 2.7 works, but the 2.8 version crashes.

import select
from threading import Thread
import psycopg2
import time


class PollerThread(Thread):
	def __init__(self, connection):
		Thread.__init__(self)
		self.setDaemon(1)
		self.connection = connection

	def run(self):
		while True:
			result = select.select([self.connection], [], [], 3.0)
			if result != ([], [], []):
				self.connection.poll()


if __name__ == "__main__":
	dbUser = ""
	dbPassword = ""
	dbServer = ""
	dbName = ""

	connection = psycopg2.connect(dbname=dbName, user=dbUser, password=dbPassword, host=dbServer)

	dbEventListener = PollerThread(connection)
	dbEventListener.start()

	while True:
		cursor = connection.cursor()
		result = cursor.execute("SELECT * FROM table1")
		data = cursor.fetchall()
		time.sleep(0.1)

@dvarrazzo
Copy link
Member

You cannot use the same connection both for normal querying and waiting for notifications. poll() will steal the result and cause problems to the connection internal state.

@ghost
Copy link
Author

ghost commented Mar 2, 2021

Okay. But.. It is working in a previous version.. So that is a bug?

@dvarrazzo
Copy link
Member

I have to verify that: querying and polling should be protected indeed from each other. Testing your script with latest 2.8 gets stuck somewhere. I only have Python 2.8 handy here so cannot install < 2.8.4. Will try with Python 3.6 as soon as I can.

However, in your program, do use a separate connection for polling.

@dvarrazzo
Copy link
Member

I can confirm the bug. Running the script (slightly modified, using select 1 as query) works no problem on branch maint_2_7 with Python 3.6/3.7 and stops or crashes on maint_2_8 and Python 3.7/3.8.

@dvarrazzo
Copy link
Member

Likely culprit is the refactoring happened in _conn_poll_advance_read().

@stratsimir
Copy link

Hi,

I hit the same bug. After some digging, I found out that pq_get_result_async() is called without proper locking, although such is required (The function should be called with the lock and holding the GIL.). The following patch fixed the issue for me:

--- ../psycopg2-2.8.6/psycopg/connection_int.c	2020-09-05 16:39:27.000000000 -0600
+++ ../psycopg2-2.8.6-patched/psycopg/connection_int.c	2021-06-02 09:17:41.528951575 -0600
@@ -904,8 +904,14 @@ _conn_poll_advance_read(connectionObject

     Dprintf("conn_poll: poll reading");

+    Py_BEGIN_ALLOW_THREADS;
+    pthread_mutex_lock(&(self->lock));
+
     busy = pq_get_result_async(self);

+    pthread_mutex_unlock(&(self->lock));
+    Py_END_ALLOW_THREADS;
+
     switch (busy) {
     case 0: /* result is ready */
         Dprintf("conn_poll: async_status -> ASYNC_DONE");

dvarrazzo added a commit that referenced this issue Jun 2, 2021
@dvarrazzo
Copy link
Member

Hi @stratsimir thank you very much: that indeed seems to fix the problem. Will include your patch in the upcoming release.

@dvarrazzo
Copy link
Member

Oops, too bad it segfaults in test suite... https://github.com/psycopg/psycopg2/runs/2732488602?check_suite_focus=true

@dvarrazzo
Copy link
Member

Looking better at the patch above. pq_get_result_async() calls into the Python API (e.g. calling PyErr_SetString() but not only) so it cannot run without the gil.

@mfmarche
Copy link

hi @dvarrazzo
We have been tracking down I believe a similiar problem to this. I'm not sure the status of this issue or if there is a fix in progress. Using a single connection pool, and many gevent co-routines, eventually we see a lockup.

This is using psycopg2 2.9.3
I have some PSYCOPG_DEBUG:

[7] pq_send_query: sending ASYNC query:
[7]     SELECT resources.id AS resources_id, applications.id AS applications_id, resources.resource_org_id AS resources_resource_org_id, resources.resource_name AS resources_resource_name, resources.name_slug
[7] psyco_wait
[7] conn_poll: status = 2
[7] conn_poll: status -> CONN_STATUS_*
[7] conn_poll: async_status = ASYNC_WRITE
[7] conn_poll: poll writing
[7] conn_poll: PQflush() = 0
[7] conn_poll: async_status -> ASYNC_READ
[7] conn_poll: returning 1
[7] conn_poll: status = 2
[7] conn_poll: status -> CONN_STATUS_*
[7] conn_poll: async_status = ASYNC_READ
[7] conn_poll: poll reading
[7] pq_get_result_async: calling PQconsumeInput()
[7] pq_get_result_async: calling PQisBusy()
[7] pq_get_result_async: PQisBusy() = 1
[7] conn_poll: returning 1
[7] conn_poll: status = 2
[7] conn_poll: status -> CONN_STATUS_*
[7] conn_poll: async_status = ASYNC_READ
[7] conn_poll: poll reading
[7] pq_get_result_async: calling PQconsumeInput()
[7] pq_get_result_async: calling PQisBusy()
[7] pq_get_result_async: PQisBusy() = 1
[7] conn_poll: returning 1
[7] conn_poll: status = 2
[7] conn_poll: status -> CONN_STATUS_*
[7] conn_poll: async_status = ASYNC_READ
[7] conn_poll: poll reading
[7] pq_get_result_async: calling PQconsumeInput()
[7] pq_get_result_async: calling PQisBusy()
[7] pq_get_result_async: PQisBusy() = 1
[7] conn_poll: returning 1
[7] conn_poll: status = 2
[7] conn_poll: status -> CONN_STATUS_*
[7] conn_poll: async_status = ASYNC_READ
[7] conn_poll: poll reading
[7] pq_get_result_async: calling PQconsumeInput()
[7] pq_get_result_async: calling PQisBusy()
[7] pq_get_result_async: PQisBusy() = 1
[7] conn_poll: returning 1
[7] conn_poll: status = 2
[7] conn_poll: status -> CONN_STATUS_*
[7] conn_poll: async_status = ASYNC_READ
[7] conn_poll: poll reading
[7] pq_get_result_async: calling PQconsumeInput()
[7] pq_get_result_async: calling PQisBusy()
[7] pq_get_result_async: PQisBusy() = 1
[7] conn_poll: returning 1
<HANG HERE>

Interestingly we captured the data on the wire via tcpdump, and there is a response available available for the request. I believe its stuck on waiting for read input. This is what led me to this issue, possibly some contention on the read status.

I did try the fix above but unforunately it crashes.

Going to dig in more, but open to suggestions on how to proceed or if there is another potential fix in progress.

thanks.

@dvarrazzo
Copy link
Member

@mfmarche "pool" or "poll"?

Do you also use the same connection to simultaneously query and get notifications? No, there is no fix in progress. I am not sure the problem is the same here, maybe you can try downgrading to verify it.

@mfmarche
Copy link

I did mean pool, but that was a sqlalchemy term which is really one connection.

I verified above using the example test that it fails in the same way. Your comment that a simulataneously query + poll is not allowed, which to me makes sense, but via green threads, I would assume that needs to be protected by the calls themselves, and it shouldn't be up to the callers to provide this protection.

I'm starting to look at the _conn_poll_advance_read, trying to see about fitting in some mutex there, but not yet sure how/where. Somehow a connection in progress must be known, and the poll should somehow bail out i would assume and not touch the cursor/connection and cause the breakage.

@mfmarche
Copy link

I modifed the gevent_wait_callback slightly, so that it will always retry on a timeout. I introduced a small timeout (10ms).The code would look like:

GEVENT_INITIAL_TIMEOUT = float(os.environ.get("GEVENT_INITIAL_TIMEOUT", 0.01))
GEVENT_MAX_TIMEOUT = float(os.environ.get("GEVENT_MAX_TIMEOUT", 5))


def patch_psycopg_with_timeout():
    if not hasattr(extensions, "set_wait_callback"):
        raise ImportError(
            "support for coroutines not available in this Psycopg version (%s)"
            % psycopg2.__version__
        )

    extensions.set_wait_callback(gevent_wait_callback_with_timeout)


def get_timeout_backoff(retry_count: int, max_backoff: int = 0) -> float:
    return min(max_backoff, 0.1 * (2 ** retry_count))


def gevent_wait_callback_with_timeout(conn, timeout=None):
    do_timeout = timeout
    if do_timeout is None:
        do_timeout = GEVENT_INITIAL_TIMEOUT
    retry_count = 0
    while 1:
        try:
            return gevent_wait_callback(conn, timeout=do_timeout)
        except socket_timeout:
            if timeout is not None:
                # original timeout, always raise timeout
                raise socket_timeout
            retry_count += 1
            do_timeout = get_timeout_backoff(retry_count, GEVENT_MAX_TIMEOUT)

With this change, no more stalls/lockups are seen, suggesting that the data was ready and received before it decided to call wait_read. The system is comprised of these versions:

cockroachdb v20.2.18
sqlalchemy-cockroachdb 1.4.3
psycopg2 2.9.3
gevent 21.12.0
sqlalchemy 1.4.29

Although not ideal, this workaround appears to do the trick. Suggestions on what the proper fix is?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants