Skip to content

Commit

Permalink
Merge pull request #9154 from bluetech/refactor-callspec2
Browse files Browse the repository at this point in the history
python: refactor CallSpec2
  • Loading branch information
bluetech committed Oct 4, 2021
2 parents c4557c3 + 570b1fa commit dced00e
Showing 1 changed file with 84 additions and 56 deletions.
140 changes: 84 additions & 56 deletions src/_pytest/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from typing import TYPE_CHECKING
from typing import Union

import attr

import _pytest
from _pytest import fixtures
from _pytest import nodes
Expand All @@ -37,6 +39,7 @@
from _pytest._io import TerminalWriter
from _pytest._io.saferepr import saferepr
from _pytest.compat import ascii_escaped
from _pytest.compat import assert_never
from _pytest.compat import final
from _pytest.compat import get_default_arg_names
from _pytest.compat import get_real_func
Expand Down Expand Up @@ -451,11 +454,12 @@ def _genfunctions(self, name: str, funcobj) -> Iterator["Function"]:
module = modulecol.obj
clscol = self.getparent(Class)
cls = clscol and clscol.obj or None
fm = self.session._fixturemanager

definition = FunctionDefinition.from_parent(self, name=name, callobj=funcobj)
fixtureinfo = definition._fixtureinfo

# pytest_generate_tests impls call metafunc.parametrize() which fills
# metafunc._calls, the outcome of the hook.
metafunc = Metafunc(
definition=definition,
fixtureinfo=fixtureinfo,
Expand All @@ -469,13 +473,13 @@ def _genfunctions(self, name: str, funcobj) -> Iterator["Function"]:
methods.append(module.pytest_generate_tests)
if cls is not None and hasattr(cls, "pytest_generate_tests"):
methods.append(cls().pytest_generate_tests)

self.ihook.pytest_generate_tests.call_extra(methods, dict(metafunc=metafunc))

if not metafunc._calls:
yield Function.from_parent(self, name=name, fixtureinfo=fixtureinfo)
else:
# Add funcargs() as fixturedefs to fixtureinfo.arg2fixturedefs.
fm = self.session._fixturemanager
fixtures.add_funcarg_pseudo_fixture_def(self, metafunc, fm)

# Add_funcarg_pseudo_fixture_def may have shadowed some fixtures
Expand Down Expand Up @@ -894,61 +898,75 @@ def hasnew(obj: object) -> bool:


@final
@attr.s(frozen=True, slots=True, auto_attribs=True)
class CallSpec2:
def __init__(self, metafunc: "Metafunc") -> None:
self.metafunc = metafunc
self.funcargs: Dict[str, object] = {}
self._idlist: List[str] = []
self.params: Dict[str, object] = {}
# Used for sorting parametrized resources.
self._arg2scope: Dict[str, Scope] = {}
self.marks: List[Mark] = []
self.indices: Dict[str, int] = {}

def copy(self) -> "CallSpec2":
cs = CallSpec2(self.metafunc)
cs.funcargs.update(self.funcargs)
cs.params.update(self.params)
cs.marks.extend(self.marks)
cs.indices.update(self.indices)
cs._arg2scope.update(self._arg2scope)
cs._idlist = list(self._idlist)
return cs

def getparam(self, name: str) -> object:
try:
return self.params[name]
except KeyError as e:
raise ValueError(name) from e
"""A planned parameterized invocation of a test function.
@property
def id(self) -> str:
return "-".join(map(str, self._idlist))
Calculated during collection for a given test function's Metafunc.
Once collection is over, each callspec is turned into a single Item
and stored in item.callspec.
"""

def setmulti2(
# arg name -> arg value which will be passed to the parametrized test
# function (direct parameterization).
funcargs: Dict[str, object] = attr.Factory(dict)
# arg name -> arg value which will be passed to a fixture of the same name
# (indirect parametrization).
params: Dict[str, object] = attr.Factory(dict)
# arg name -> arg index.
indices: Dict[str, int] = attr.Factory(dict)
# Used for sorting parametrized resources.
_arg2scope: Dict[str, Scope] = attr.Factory(dict)
# Parts which will be added to the item's name in `[..]` separated by "-".
_idlist: List[str] = attr.Factory(list)
# Marks which will be applied to the item.
marks: List[Mark] = attr.Factory(list)

def setmulti(
self,
*,
valtypes: Mapping[str, "Literal['params', 'funcargs']"],
argnames: Sequence[str],
argnames: Iterable[str],
valset: Iterable[object],
id: str,
marks: Iterable[Union[Mark, MarkDecorator]],
scope: Scope,
param_index: int,
) -> None:
) -> "CallSpec2":
funcargs = self.funcargs.copy()
params = self.params.copy()
indices = self.indices.copy()
arg2scope = self._arg2scope.copy()
for arg, val in zip(argnames, valset):
if arg in self.params or arg in self.funcargs:
if arg in params or arg in funcargs:
raise ValueError(f"duplicate {arg!r}")
valtype_for_arg = valtypes[arg]
if valtype_for_arg == "params":
self.params[arg] = val
params[arg] = val
elif valtype_for_arg == "funcargs":
self.funcargs[arg] = val
else: # pragma: no cover
assert False, f"Unhandled valtype for arg: {valtype_for_arg}"
self.indices[arg] = param_index
self._arg2scope[arg] = scope
self._idlist.append(id)
self.marks.extend(normalize_mark_list(marks))
funcargs[arg] = val
else:
assert_never(valtype_for_arg)
indices[arg] = param_index
arg2scope[arg] = scope
return CallSpec2(
funcargs=funcargs,
params=params,
arg2scope=arg2scope,
indices=indices,
idlist=[*self._idlist, id],
marks=[*self.marks, *normalize_mark_list(marks)],
)

def getparam(self, name: str) -> object:
try:
return self.params[name]
except KeyError as e:
raise ValueError(name) from e

@property
def id(self) -> str:
return "-".join(self._idlist)


@final
Expand Down Expand Up @@ -990,9 +1008,11 @@ def __init__(
#: Class object where the test function is defined in or ``None``.
self.cls = cls

self._calls: List[CallSpec2] = []
self._arg2fixturedefs = fixtureinfo.name2fixturedefs

# Result of parametrize().
self._calls: List[CallSpec2] = []

def parametrize(
self,
argnames: Union[str, List[str], Tuple[str, ...]],
Expand All @@ -1009,9 +1029,18 @@ def parametrize(
_param_mark: Optional[Mark] = None,
) -> None:
"""Add new invocations to the underlying test function using the list
of argvalues for the given argnames. Parametrization is performed
during the collection phase. If you need to setup expensive resources
see about setting indirect to do it rather at test setup time.
of argvalues for the given argnames. Parametrization is performed
during the collection phase. If you need to setup expensive resources
see about setting indirect to do it rather than at test setup time.
Can be called multiple times, in which case each call parametrizes all
previous parametrizations, e.g.
::
unparametrized: t
parametrize ["x", "y"]: t[x], t[y]
parametrize [1, 2]: t[x-1], t[x-2], t[y-1], t[y-2]
:param argnames:
A comma-separated string denoting one or more argument names, or
Expand Down Expand Up @@ -1104,17 +1133,16 @@ def parametrize(
# more than once) then we accumulate those calls generating the cartesian product
# of all calls.
newcalls = []
for callspec in self._calls or [CallSpec2(self)]:
for callspec in self._calls or [CallSpec2()]:
for param_index, (param_id, param_set) in enumerate(zip(ids, parameters)):
newcallspec = callspec.copy()
newcallspec.setmulti2(
arg_values_types,
argnames,
param_set.values,
param_id,
param_set.marks,
scope_,
param_index,
newcallspec = callspec.setmulti(
valtypes=arg_values_types,
argnames=argnames,
valset=param_set.values,
id=param_id,
marks=param_set.marks,
scope=scope_,
param_index=param_index,
)
newcalls.append(newcallspec)
self._calls = newcalls
Expand Down

0 comments on commit dced00e

Please sign in to comment.