Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX #162 "attached to a different loop", and add a new test case #164

Merged
merged 8 commits into from
Jun 2, 2020
76 changes: 50 additions & 26 deletions pytest_asyncio/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,61 @@ def pytest_pycollect_makeitem(collector, name, obj):
return list(collector._genfunctions(name, obj))


class FixtureStripper:
"""Include additional Fixture, and then strip them"""
REQUEST = "request"
EVENT_LOOP = "event_loop"

def __init__(self, fixturedef):
self.fixturedef = fixturedef
self.to_strip = set()

def add(self, name):
"""Add fixture name to fixturedef
and record in to_strip list (If not previously included)"""
if name in self.fixturedef.argnames:
return
self.fixturedef.argnames += (name, )
self.to_strip.add(name)

def get_and_strip_from(self, name, data_dict):
"""Strip name from data, and return value"""
result = data_dict[name]
if name in self.to_strip:
del data_dict[name]
return result

@pytest.hookimpl(trylast=True)
def pytest_fixture_post_finalizer(fixturedef, request):
"""Called after fixture teardown"""
if fixturedef.argname == "event_loop":
# Set empty loop policy, so that subsequent get_event_loop() provides a new loop
asyncio.set_event_loop_policy(None)



@pytest.hookimpl(hookwrapper=True)
def pytest_fixture_setup(fixturedef, request):
"""Adjust the event loop policy when an event loop is produced."""
if fixturedef.argname == "event_loop" and 'asyncio' in request.keywords:
if fixturedef.argname == "event_loop":
outcome = yield
loop = outcome.get_result()
policy = asyncio.get_event_loop_policy()
try:
old_loop = policy.get_event_loop()
except RuntimeError as exc:
if 'no current event loop' not in str(exc):
raise
old_loop = None
policy.set_event_loop(loop)
fixturedef.addfinalizer(lambda: policy.set_event_loop(old_loop))
return

if isasyncgenfunction(fixturedef.func):
# This is an async generator function. Wrap it accordingly.
generator = fixturedef.func

strip_request = False
if 'request' not in fixturedef.argnames:
fixturedef.argnames += ('request', )
strip_request = True
fixture_stripper = FixtureStripper(fixturedef)
fixture_stripper.add(FixtureStripper.EVENT_LOOP)
fixture_stripper.add(FixtureStripper.REQUEST)


def wrapper(*args, **kwargs):
request = kwargs['request']
if strip_request:
del kwargs['request']
loop = fixture_stripper.get_and_strip_from(FixtureStripper.EVENT_LOOP, kwargs)
request = fixture_stripper.get_and_strip_from(FixtureStripper.REQUEST, kwargs)

gen_obj = generator(*args, **kwargs)

Expand All @@ -96,21 +121,26 @@ async def async_finalizer():
msg = "Async generator fixture didn't stop."
msg += "Yield only once."
raise ValueError(msg)
asyncio.get_event_loop().run_until_complete(async_finalizer())
loop.run_until_complete(async_finalizer())

request.addfinalizer(finalizer)
return asyncio.get_event_loop().run_until_complete(setup())
return loop.run_until_complete(setup())

fixturedef.func = wrapper
elif inspect.iscoroutinefunction(fixturedef.func):
coro = fixturedef.func

fixture_stripper = FixtureStripper(fixturedef)
fixture_stripper.add(FixtureStripper.EVENT_LOOP)

def wrapper(*args, **kwargs):
loop = fixture_stripper.get_and_strip_from(FixtureStripper.EVENT_LOOP, kwargs)

async def setup():
res = await coro(*args, **kwargs)
return res

return asyncio.get_event_loop().run_until_complete(setup())
return loop.run_until_complete(setup())

fixturedef.func = wrapper
yield
Expand Down Expand Up @@ -144,15 +174,9 @@ def wrap_in_sync(func, _loop):
def inner(**kwargs):
coro = func(**kwargs)
if coro is not None:
task = asyncio.ensure_future(coro, loop=_loop)
try:
loop = asyncio.get_event_loop()
except RuntimeError as exc:
if 'no current event loop' not in str(exc):
raise
loop = _loop
task = asyncio.ensure_future(coro, loop=loop)
try:
loop.run_until_complete(task)
_loop.run_until_complete(task)
except BaseException:
# run_until_complete doesn't get the result from exceptions
# that are not subclasses of `Exception`. Consume all
Expand Down
51 changes: 51 additions & 0 deletions tests/async_fixtures/test_async_fixtures_with_finalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio
import contextlib
import functools
import pytest


@pytest.mark.asyncio
async def test_module_with_event_loop_finalizer(port1):
await asyncio.sleep(0.01)
assert port1

@pytest.mark.asyncio
async def test_module_with_get_event_loop_finalizer(port2):
await asyncio.sleep(0.01)
assert port2

@pytest.fixture(scope="module")
def event_loop():
"""Change event_loop fixture to module level."""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()


@pytest.fixture(scope="module")
async def port1(request, event_loop):
def port_finalizer(finalizer):
async def port_afinalizer():
# await task inside get_event_loop()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task is not awaited inside get_event_loop(), but in loop.run_until_complete().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

# RantimeError is raised if task is created on a different loop
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: RuntimeError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

await finalizer
event_loop.run_until_complete(port_afinalizer())

worker = asyncio.ensure_future(asyncio.sleep(0.2))
request.addfinalizer(functools.partial(port_finalizer, worker))
return True


@pytest.fixture(scope="module")
async def port2(request, event_loop):
def port_finalizer(finalizer):
async def port_afinalizer():
# await task inside get_event_loop()
# if loop is different a RuntimeError is raised
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to align the two comments from port1 and port2 and then highlight the difference of the two fixtures (first uses event_loop, second uses asyncio.get_event_loop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

await finalizer
asyncio.get_event_loop().run_until_complete(port_afinalizer())

worker = asyncio.ensure_future(asyncio.sleep(0.2))
request.addfinalizer(functools.partial(port_finalizer, worker))
return True