Skip to content

Commit

Permalink
Merge pull request #216 from aio-libs/pool-recylce
Browse files Browse the repository at this point in the history
Pool recylce
  • Loading branch information
jettify committed Oct 15, 2017
2 parents 4b75b3f + f24fc7f commit f05419e
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 10 deletions.
7 changes: 7 additions & 0 deletions aiomysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def __init__(self, host="localhost", user=None, password="",
self._db = db
self._no_delay = no_delay
self._echo = echo
self._last_usage = self._loop.time()

self._unix_socket = unix_socket
if charset:
Expand Down Expand Up @@ -240,6 +241,11 @@ def echo(self):
"""Return echo mode status."""
return self._echo

@property
def last_usage(self):
"""Return time() when connection was used."""
return self._last_usage

@property
def loop(self):
return self._loop
Expand Down Expand Up @@ -375,6 +381,7 @@ def cursor(self, cursor=None):
:raises TypeError: cursor_class is not a subclass of Cursor.
"""
self._ensure_alive()
self._last_usage = self._loop.time()
if cursor is not None and not issubclass(cursor, Cursor):
raise TypeError('Custom cursor must be subclass of Cursor')

Expand Down
21 changes: 15 additions & 6 deletions aiomysql/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@
_PoolAcquireContextManager, create_future, create_task)


def create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs):
def create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1,
loop=None, **kwargs):
coro = _create_pool(minsize=minsize, maxsize=maxsize, echo=echo,
loop=loop, **kwargs)
pool_recycle=pool_recycle, loop=loop, **kwargs)
return _PoolContextManager(coro)


@asyncio.coroutine
def _create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs):
def _create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1,
loop=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()

pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo, loop=loop,
**kwargs)
pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo,
pool_recycle=pool_recycle, loop=loop, **kwargs)
if minsize > 0:
with (yield from pool._cond):
yield from pool._fill_free_pool(False)
Expand All @@ -32,7 +34,7 @@ def _create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs):
class Pool(asyncio.AbstractServer):
"""Connection pool"""

def __init__(self, minsize, maxsize, echo, loop, **kwargs):
def __init__(self, minsize, maxsize, echo, pool_recycle, loop, **kwargs):
if minsize < 0:
raise ValueError("minsize should be zero or greater")
if maxsize < minsize:
Expand All @@ -48,6 +50,7 @@ def __init__(self, minsize, maxsize, echo, loop, **kwargs):
self._closing = False
self._closed = False
self._echo = echo
self._recycle = pool_recycle

@property
def echo(self):
Expand Down Expand Up @@ -153,6 +156,12 @@ def _fill_free_pool(self, override_min):
if conn._reader.at_eof():
self._free.pop()
conn.close()

elif (self._recycle > -1 and
self._loop.time() - conn.last_usage > self._recycle):
self._free.pop()
conn.close()

else:
self._free.rotate()
n += 1
Expand Down
9 changes: 5 additions & 4 deletions aiomysql/sa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@


def create_engine(minsize=1, maxsize=10, loop=None,
dialect=_dialect, **kwargs):
dialect=_dialect, pool_recycle=-1, **kwargs):
"""A coroutine for Engine creation.
Returns Engine instance with embedded connection pool.
The pool has *minsize* opened connections to PostgreSQL server.
"""
coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop,
dialect=dialect, **kwargs)
dialect=dialect, pool_recycle=pool_recycle, **kwargs)
return _EngineContextManager(coro)


@asyncio.coroutine
def _create_engine(minsize=1, maxsize=10, loop=None,
dialect=_dialect, **kwargs):
dialect=_dialect, pool_recycle=-1, **kwargs):

if loop is None:
loop = asyncio.get_event_loop()
pool = yield from aiomysql.create_pool(minsize=minsize, maxsize=maxsize,
loop=loop, **kwargs)
loop=loop,
pool_recycle=pool_recycle, **kwargs)
conn = yield from pool.acquire()
try:
return Engine(dialect, pool, **kwargs)
Expand Down
21 changes: 21 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,24 @@ def test_cancelled_connection(pool_creator, loop):
res = yield from cur2.fetchall()
# If we receive [(1, 0)] - we retrieved old cursor's values
assert list(res) == [(2, 0)]


@asyncio.coroutine
def test_pool_with_connection_recycling(pool_creator, loop):
pool = yield from pool_creator(minsize=1,
maxsize=1,
pool_recycle=3)
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val

yield from asyncio.sleep(5, loop=loop)

assert 1 == pool.freesize
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val

0 comments on commit f05419e

Please sign in to comment.