Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
RichieSK committed May 4, 2024
2 parents 401674a + 3d14213 commit baabcd9
Show file tree
Hide file tree
Showing 21 changed files with 660 additions and 526 deletions.
61 changes: 48 additions & 13 deletions airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ class TaskDecoratorCollection:
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand All @@ -129,6 +130,13 @@ class TaskDecoratorCollection:
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
Expand All @@ -154,6 +162,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -164,9 +175,10 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand All @@ -176,9 +188,13 @@ class TaskDecoratorCollection:
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -187,6 +203,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch( # type: ignore[misc]
Expand All @@ -211,14 +230,15 @@ class TaskDecoratorCollection:
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
Expand All @@ -232,9 +252,13 @@ class TaskDecoratorCollection:
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
Expand All @@ -253,6 +277,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -264,9 +291,10 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.
Expand All @@ -279,9 +307,13 @@ class TaskDecoratorCollection:
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -290,6 +322,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch_external_python(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv():
"""

@task.virtualenv(
use_dill=True,
serializer="dill", # Use `dill` for advanced serialization.
system_site_packages=False,
requirements=["funcsigs"],
)
Expand Down
19 changes: 17 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
) -> None:
"""
Handle Failure for a task instance.
Expand All @@ -903,6 +904,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
fail_stop=fail_stop,
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
Expand Down Expand Up @@ -2966,8 +2968,13 @@ def fetch_handle_failure_context(
context: Context | None = None,
force_fail: bool = False,
session: Session = NEW_SESSION,
fail_stop: bool = False,
):
"""Handle Failure for the TaskInstance."""
"""
Handle Failure for the TaskInstance.
:param fail_stop: if true, stop remaining tasks in dag
"""
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session
)
Expand Down Expand Up @@ -3030,7 +3037,7 @@ def fetch_handle_failure_context(
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and task.dag and task.dag.fail_stop:
if task and fail_stop:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.QUEUED:
Expand Down Expand Up @@ -3079,13 +3086,21 @@ def handle_failure(
:param context: Jinja2 context
:param force_fail: if True, task does not retry
"""
if TYPE_CHECKING:
assert self.task
assert self.task.dag
try:
fail_stop = self.task.dag.fail_stop
except Exception:
fail_stop = False
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=fail_stop,
)

def is_eligible_to_retry(self):
Expand Down

0 comments on commit baabcd9

Please sign in to comment.