Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler logic to plan new dag runs by ignoring manual runs #34027

Merged
merged 9 commits into from
Sep 5, 2023

Conversation

Bisk1
Copy link
Contributor

@Bisk1 Bisk1 commented Sep 1, 2023

Fixes #33949

Added filter to exclude manual runs from triggering calculate_dagrun_date_fields method which was causing extra scheduled runs.

Before this change, manual runs were triggering re-calculating next_dagrun_create_after which was supposed to be calculated only after an automated (SCHEDULED or BACKFILL) run is over in order to help scheduler schedule the next DAG run. There is no reason for manual runs to affect scheduled runs so they were excluded with an extra condition.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Sep 1, 2023
# In such case, schedule next only if last_dag_run is finished and was an automated run.
if last_dag_run and not (
last_dag_run.state in State.finished_dr_states
and last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual bugfix: previously we didn't check run type so manual runs were messing with scheduling decisions. Compare it with

or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED),

where exact same filtering is applied before eventually invoking calculate_dagrun_date_fields for filtered dag runs.

@@ -1465,11 +1478,9 @@ def _schedule_dag_run(
return callback
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
# Check if DAG not scheduled then skip interval calculation to same scheduler runtime
if dag_run.state in State.finished_dr_states:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part was pulled into _should_update_dag_next_dagruns

self,
dag: DAG,
dag_model: DagModel,
last_dag_run: DagRun | None = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 3 places where this method is used:

  • when dag run is finished successfully
  • when dag run fails
  • when new dag runs were created
    The first 2 cases are similar and we look at last dag run to make this decision, but in the 3rd case there is no previous dag run so I made this parameter optional and skip checking it if not provided

Copy link
Contributor

@ephraimbuddy ephraimbuddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good but the renaming should be in a separate PR if it's actually needed

Bisk1 pushed a commit to Bisk1/airflow that referenced this pull request Sep 3, 2023
Follow-up to discussion in apache#34027

 Some variables used when invoking calculate_dagrun_date_fields where renamed to make it explicit that this method expected automated dag runs only - clarifying naming should make it less likely for maintainers to accidentally misuse this method again.
@Bisk1
Copy link
Contributor Author

Bisk1 commented Sep 3, 2023

Looks good but the renaming should be in a separate PR if it's actually needed
@ephraimbuddy

Ok I removed it and moved to separate PR #34049

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

@eladkal eladkal added this to the Airflow 2.7.2 milestone Sep 5, 2023
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Sep 5, 2023
@ephraimbuddy ephraimbuddy merged commit 20d8142 into apache:main Sep 5, 2023
42 checks passed
@Bisk1 Bisk1 deleted the fix-33949 branch September 5, 2023 14:16
ephraimbuddy pushed a commit that referenced this pull request Oct 5, 2023
)

* Fix manual task triggering scheduled tasks

Fixes #33949

* fix static checks

* static checks

* add unit test

* static check

* Undo renaming

* Update airflow/jobs/scheduler_job_runner.py

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>

* use keyword-only arguments for last_dag_run and total_active_runs

---------

Co-authored-by: daniel.dylag <danieldylag1990@gmail.com>
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 20d8142)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Manual DAG triggers with Logical Date in the Past trigger a second run when schedule is timedelta
5 participants