-
Hi, I am trying to hookup a SQLAlchemy 1.4 engine and global scoped asynchronous session in behave The approach is to have a parent transaction and each test running in a nested transaction that gets rolled back when the test completes. Unfortunately the The application under test uses an asynchronous engine and asynchronous session created using sessionmaker: def _create_session_factory(engine) -> sessionmaker[AsyncSession]:
factory = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
factory.configure(bind=engine)
return factory In the THIS RAISES AN ERROR RuntimeError: no running event loop context.session = context.Session(bind=context.connection, loop=loop) The full code listing for setting up the test environment is listed below. import asyncio
import logging
from behave.api.async_step import use_or_create_async_context
from behave.log_capture import capture
from behave.runner import Context
from behave.model import Scenario
from sqlalchemy import event
from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session
from sqlalchemy.ext.asyncio.engine import async_engine_from_config
from sqlalchemy.orm.session import sessionmaker
from fastapi_graphql.server.config import Settings
logger = logging.getLogger()
@capture(level=logging.INFO)
def before_all(context: Context) -> None:
"""Setup database engine and session factory."""
logging.info("Setting up logging for behave tests...")
context.config.setup_logging()
logging.info("Setting up async context...")
use_or_create_async_context(context)
loop = context.async_context.loop
asyncio.set_event_loop(loop)
logging.info("Configuring db engine...")
settings = Settings()
config = settings.dict()
config["sqlalchemy_url"] = settings.db_url
engine = async_engine_from_config(config, prefix="sqlalchemy_")
logging.info(f"Db engine configured for connecting to: {settings.db_url}")
logging.info("Creating a global session instance")
factory = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
# factory.configure(bind=engine)
Session = async_scoped_session(factory(), scopefunc=asyncio.current_task)
context.engine = engine
context.connection = loop.run_until_complete(engine.connect())
context.factory = factory
context.Session = Session
@capture(level=logging.INFO)
def after_all(context: Context) -> None:
"""Teardown database engine gracefully."""
loop = context.async_context.loop
logging.info("Closing connection")
loop.run_until_complete(context.connection.close())
logging.info("Closing database engine...")
loop.run_until_complete(context.engine.dispose())
logging.info("Database engine closed")
@capture(level=logging.INFO)
def before_scenario(context: Context, scenario: Scenario) -> None:
"""Create a database session."""
loop = context.async_context.loop
logging.info("Starting a transaction...")
context.transaction = loop.run_until_complete(context.connection.begin())
logging.info("Transaction started...")
logging.info("Creating a db session...")
breakpoint()
# -- ERROR POSITION: THIS RAISES AN ERROR RuntimeError: no running event loop
context.session = context.Session(bind=context.connection, loop=loop)
logging.info("Db session created")
breakpoint()
logging.info("Starting a nested transaction...")
context.session.begin_nested()
logging.info("Nested transaction started...")
@event.listens_for(context.session, "after_transaction_end")
def restart_savepoint(db_session, transaction):
"""Support tests with rollbacks.
This is required for tests that call some services that issue
rollbacks in try-except blocks.
With this event the Session always runs all operations within
the scope of a SAVEPOINT, which is established at the start of
each transaction, so that tests can also rollback the
“transaction” as well while still remaining in the scope of a
larger “transaction” that’s never committed.
"""
if context.transaction.nested and not context.transaction._parent.nested:
# ensure that state is expired the way session.commit() at
# the top level normally does
context.session.expire_all()
context.session.begin_nested()
@capture(level=logging.INFO)
def after_scenario(context: Context, scenario: Scenario) -> None:
"""Close the database session."""
logging.info("Closing db session...")
loop = asyncio.get_event_loop()
loop.run_until_complete(context.Session.remove())
logging.info("Db session closed")
logging.info("Rolling back transaction...")
loop.run_until_complete(context.transaction.rollback())
logging.info("Rolled back transaction") |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
The core problem is the # PART OF: before_all() hook
Session = async_scoped_session(factory(), scopefunc=asyncio.current_task) OTHERWISE:
async def before_scenario_coroutine(context):
await ... # DETAILS left out HERE (see above).
def before_scenario(context, scenario):
asyncio.run(before_scenario_coroutine(context))
HINT:
SEE ALSO:
|
Beta Was this translation helpful? Give feedback.
The core problem is the
asyncio.current_task()
function that is used asscopefunc=asyncio.current_task
when creating theSession
object(-factory). It can only be executed while running in a coroutine scope because it will callasyncio.get_running_loop()
.OTHERWISE:
The asyncio development has continued and has provided other functionality, probably inspired by
anyio.run()
or others.Nowadays, you probably should :
asyncio.run()
orasyncio.Runner
in thebehave-hooks
instead of using the event-loop.