Skip to content

Commit

Permalink
Rename variables for dag runs (#34049)
Browse files Browse the repository at this point in the history
Co-authored-by: daniel.dylag <danieldylag1990@gmail.com>
  • Loading branch information
Bisk1 and daniel.dylag committed Sep 5, 2023
1 parent 8d312b8 commit d81fe09
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2939,12 +2939,12 @@ def bulk_write_to_db(
session.add(orm_dag)
orm_dags.append(orm_dag)

most_recent_runs: dict[str, DagRun] = {}
dag_id_to_last_automated_run: dict[str, DagRun] = {}
num_active_runs: dict[str, int] = {}
# Skip these queries entirely if no DAGs can be scheduled to save time.
if any(dag.timetable.can_be_scheduled for dag in dags):
# Get the latest dag run for each existing dag as a single query (avoid n+1 query)
most_recent_subq = (
# Get the latest automated dag run for each existing dag as a single query (avoid n+1 query)
last_automated_runs_subq = (
select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date"))
.where(
DagRun.dag_id.in_(existing_dags),
Expand All @@ -2953,13 +2953,13 @@ def bulk_write_to_db(
.group_by(DagRun.dag_id)
.subquery()
)
most_recent_runs_iter = session.scalars(
last_automated_runs = session.scalars(
select(DagRun).where(
DagRun.dag_id == most_recent_subq.c.dag_id,
DagRun.execution_date == most_recent_subq.c.max_execution_date,
DagRun.dag_id == last_automated_runs_subq.c.dag_id,
DagRun.execution_date == last_automated_runs_subq.c.max_execution_date,
)
)
most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs}

# Get number of active dagruns for all dags we are processing as a single query.
num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session)
Expand Down Expand Up @@ -2993,15 +2993,15 @@ def bulk_write_to_db(
orm_dag.timetable_description = dag.timetable.description
orm_dag.processor_subdir = processor_subdir

run: DagRun | None = most_recent_runs.get(dag.dag_id)
if run is None:
data_interval = None
last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id)
if last_automated_run is None:
last_automated_data_interval = None
else:
data_interval = dag.get_run_data_interval(run)
last_automated_data_interval = dag.get_run_data_interval(last_automated_run)
if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs:
orm_dag.next_dagrun_create_after = None
else:
orm_dag.calculate_dagrun_date_fields(dag, data_interval)
orm_dag.calculate_dagrun_date_fields(dag, last_automated_data_interval)

dag_tags = set(dag.tags or {})
orm_dag_tags = list(orm_dag.tags or [])
Expand Down Expand Up @@ -3667,27 +3667,27 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[
def calculate_dagrun_date_fields(
self,
dag: DAG,
most_recent_dag_run: None | datetime | DataInterval,
last_automated_dag_run: None | datetime | DataInterval,
) -> None:
"""
Calculate ``next_dagrun`` and `next_dagrun_create_after``.
:param dag: The DAG object
:param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
:param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
if not yet scheduled.
"""
most_recent_data_interval: DataInterval | None
if isinstance(most_recent_dag_run, datetime):
last_automated_data_interval: DataInterval | None
if isinstance(last_automated_dag_run, datetime):
warnings.warn(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. "
"Provide a data interval instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
most_recent_data_interval = dag.infer_automated_data_interval(most_recent_dag_run)
last_automated_data_interval = dag.infer_automated_data_interval(last_automated_dag_run)
else:
most_recent_data_interval = most_recent_dag_run
next_dagrun_info = dag.next_dagrun_info(most_recent_data_interval)
last_automated_data_interval = last_automated_dag_run
next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
if next_dagrun_info is None:
self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None
else:
Expand Down

0 comments on commit d81fe09

Please sign in to comment.