Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename variables for dag runs #34049

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 19 additions & 19 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2931,12 +2931,12 @@ def bulk_write_to_db(
session.add(orm_dag)
orm_dags.append(orm_dag)

most_recent_runs: dict[str, DagRun] = {}
dag_id_to_last_automated_run: dict[str, DagRun] = {}
num_active_runs: dict[str, int] = {}
# Skip these queries entirely if no DAGs can be scheduled to save time.
if any(dag.timetable.can_be_scheduled for dag in dags):
# Get the latest dag run for each existing dag as a single query (avoid n+1 query)
most_recent_subq = (
# Get the latest automated dag run for each existing dag as a single query (avoid n+1 query)
last_automated_runs_subq = (
select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date"))
.where(
DagRun.dag_id.in_(existing_dags),
Expand All @@ -2945,13 +2945,13 @@ def bulk_write_to_db(
.group_by(DagRun.dag_id)
.subquery()
)
most_recent_runs_iter = session.scalars(
last_automated_runs = session.scalars(
select(DagRun).where(
DagRun.dag_id == most_recent_subq.c.dag_id,
DagRun.execution_date == most_recent_subq.c.max_execution_date,
DagRun.dag_id == last_automated_runs_subq.c.dag_id,
DagRun.execution_date == last_automated_runs_subq.c.max_execution_date,
)
)
most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs}

# Get number of active dagruns for all dags we are processing as a single query.
num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session)
Expand Down Expand Up @@ -2985,15 +2985,15 @@ def bulk_write_to_db(
orm_dag.timetable_description = dag.timetable.description
orm_dag.processor_subdir = processor_subdir

run: DagRun | None = most_recent_runs.get(dag.dag_id)
if run is None:
data_interval = None
last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id)
if last_automated_run is None:
last_automated_data_interval = None
else:
data_interval = dag.get_run_data_interval(run)
last_automated_data_interval = dag.get_run_data_interval(last_automated_run)
if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs:
orm_dag.next_dagrun_create_after = None
else:
orm_dag.calculate_dagrun_date_fields(dag, data_interval)
orm_dag.calculate_dagrun_date_fields(dag, last_automated_data_interval)

dag_tags = set(dag.tags or {})
orm_dag_tags = list(orm_dag.tags or [])
Expand Down Expand Up @@ -3659,27 +3659,27 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[
def calculate_dagrun_date_fields(
self,
dag: DAG,
most_recent_dag_run: None | datetime | DataInterval,
last_automated_dag_run: None | datetime | DataInterval,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.

:param dag: The DAG object
:param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
:param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
if not yet scheduled.
"""
most_recent_data_interval: DataInterval | None
if isinstance(most_recent_dag_run, datetime):
last_automated_data_interval: DataInterval | None
if isinstance(last_automated_dag_run, datetime):
warnings.warn(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. "
"Provide a data interval instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
most_recent_data_interval = dag.infer_automated_data_interval(most_recent_dag_run)
last_automated_data_interval = dag.infer_automated_data_interval(last_automated_dag_run)
else:
most_recent_data_interval = most_recent_dag_run
next_dagrun_info = dag.next_dagrun_info(most_recent_data_interval)
last_automated_data_interval = last_automated_dag_run
next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
if next_dagrun_info is None:
self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None
else:
Expand Down