Skip to content

Commit

Permalink
Only heartbeat if necessary in backfill loop (apache#39399)
Browse files Browse the repository at this point in the history
Currently, backfill sleeps for a minute in every iteration, which is extremely slow.

The reason is that it waits synchronously until a heartbeat is necessary. Since the loop is otherwise fast, this results in waits of up to a minute between syncing.

With this change, if we don't add sleep(1), the loop will be very fast and generate tons of logs. So I sleep each second to slow it down just a bit.
  • Loading branch information
dstandish authored and RodrigoGanancia committed May 10, 2024
1 parent e4078c4 commit c04183d
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
from airflow.timetables.base import DagRunInfo
from airflow.utils import helpers, timezone
from airflow.utils.configuration import conf as airflow_conf, tmp_configuration_copy
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
Expand Down Expand Up @@ -475,10 +475,8 @@ def _process_backfill_task_instances(
"""
executed_run_dates = []

is_unit_test = airflow_conf.getboolean("core", "unit_test_mode")

while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked:
self.log.debug("*** Clearing out not_ready list ***")
self.log.debug("Clearing out not_ready list")
ti_status.not_ready.clear()

# we need to execute the tasks bottom to top
Expand Down Expand Up @@ -697,7 +695,7 @@ def _per_task_process(key, ti: TaskInstance, session):
self.log.debug(e)

perform_heartbeat(
job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=is_unit_test
job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
)
# execute the tasks in the queue
executor.heartbeat()
Expand Down Expand Up @@ -749,6 +747,7 @@ def to_keep(key: TaskInstanceKey) -> bool:

self._log_progress(ti_status)
session.commit()
time.sleep(1)

# return updated status
return executed_run_dates
Expand Down

0 comments on commit c04183d

Please sign in to comment.