Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

Improve update and savepoint handling #420

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

elanv
Copy link
Contributor

@elanv elanv commented Feb 23, 2021

Purpose of this PR

Currently, savepoint and its related routines are scattered in several places. It make difficult to enhance this operator now. This PR organizes them so that savepoint-related routines can be improved and extended in the future. It also improves the update, cancel and recovery features that depend on savepoint routines.

Changes

  • Make job deploy phase clearly with new job states.
  • Organize savepoint routines.
  • Fix some savepoint related issues.
  • Improve update stability.
  • Change the job stop process that is applied when updating and canceling a job.
  • Elaborate update/restart strategy more.

Details

  • Organize and fix savepoint routine

    • Organize Savepoint handling and related routines in one place
    • Add field to configure the latest savepoint age for update
      • SavepointMaxAgeForUpdateSeconds
    • Auto savepoint
      • Delete lastSavepointTriggerTime and lastSavepointTriggerID: duplicated with status.savepoint
      • Change the first trigger to be based on status.job.startTime and delete SavepointTriggerReasonScheduledInitial
    • Savepoint state
      • Add a routine to derive the state from HTTP code
      • Get rid of operator's own savepoint timeout error
  • Change job stop behavior when updating and cancelling a job

    • From version 1.9, the stop API that supports exactly-once semantics was introduced, but for compatibility up to version 1.8, "cancel with savepoint" will be applied first. In the future, add the flinkVersion field and support "stop with savepoint" in 1.9 or higher.
    • Apply "cancel with savepoint" API
  • Improve update process

  • Elaborate update/restart strategy
    Limit job state age from which job can be restarted when auto restarting from failure, updating stopped job and updating running job wiith takeSavepointOnUpdate false

    • Add a field to limit maximum savepoint age to restore for job on restart.
      • MaxStateAgeToRestoreSeconds
  • Add new job deployment states

    • Deploying, DeployFailed, Restarting
      image

@elanv elanv mentioned this pull request Feb 25, 2021
@elanv elanv changed the title Improve update and savepoint handling [WIP] Improve update and savepoint handling Feb 26, 2021
@functicons
Copy link
Collaborator

Thanks for the PRs! Let me know when it is ready for review.

@shashken
Copy link
Contributor

shashken commented Mar 3, 2021

@elanv Thank you for doing this! you are awesome :)
I opened an issue #427 , I think its the root cause for my mitigation where I introduced some of the fields you are removing here, I will appreciate your feedback there too.

Also, I see that in the takeSavepointOnUpgrade flow I added (nice rename BTW) it takes the SP synchronously. And I saw that while it happends the operator fails to submit new jobclsuters (it just waits for the SP to finish then it submits them) Do you think its something we can change as well? or it proves to difficult ATM?

last thing, do you thinks its possible to leverage the new SavepointMaxAgeToRestoreSeconds (or a new field) to make the job submitter fail if not SP is provided for it? (To reduce the chance for an error in deployment where a job that must start from a SP does not)

@syucream syucream mentioned this pull request Mar 9, 2021
@elanv
Copy link
Contributor Author

elanv commented Mar 17, 2021

I plan to write additional unit test code while doing enough tests. The diagram of the new job states applied to new commit is also attached to the PR description.
@shashken Thanks for the comments. I will organize my thoughts and answer later.

@elanv
Copy link
Contributor Author

elanv commented Mar 26, 2021

@shashken Sorry for late response.

The savepoint issue is specific to the auto savepoint feature. When auto savepoint is enabled, as soon as job starts the operator triggers a savepoint, but the savepoint fails if some tasks of the Flink job did not started. In that case, the savepoint status is updated as failed, and immediatley the next iteration of the reconciliation loop starts again because the status change triggers it. But in the following iterations, it is likely that the previous failing routine is still repeated because there wasn't enough time between the iterations. Basically the savepoint should not be triggered as soon as job starts, therefore, I have fixed it in this PR to trigger the first savepoint after provided time interval passed.

I do not understand still in which case takeSavepointOnUpgrade is needed. When a Flink job is updated, it is expected that the job resumes from its last state. But when takeSavepointOnUpgrade is false, I'm not sure where the job should be restored. Could you explain more about the field?

MaxStateAgeToRestoreSeconds is for the auto restarting from failure or updating job from stopped states. With the field, Flink operator will restart the Flink job only when the recorded savepoint met the age condition. Does it fit your intention too?

@shashken
Copy link
Contributor

shashken commented Apr 1, 2021

@elanv
I am making a PR for the concurrent reconcile calls. will mention you there to take a look once its ready.

Regarding the takeSavepointOnUpgrade flag, we need to take care of 3 situations:

  1. When a job fails or gets updated, what is the acceptable age for the state to prevent human errors/bugs (MaxStateAgeToRestoreSeconds covers it perfectly ✔️ )
  2. When a job is updated when do we trigger a new SP? - which is not covered by MaxStateAgeToRestoreSeconds
    (for example, what if we say we trigger a SP every 24h, and its acceptable to restore from 24h- SP, but when we update the job if the SP is 23h old we probably would like to trigger a new one)
    Maybe another argument can cover this case?
  3. When a job is upgraded, the default should be to take a fresh SP if needed (as mentioned in the point above) but sometimes, the user might not want to wait for a SP to be created, so an option to disable the SP creation on upgrade would be nice
    (the first 2 cases are more important IMO)

- fix handling failed auto savepoint
- fix validations and tests related to changes
- improve update routine
- change the behavior of handling unexpected jobs
- add a constraint for update: when takeSavepointOnUpdate is true, latest savepoint age should be less than maxStateAgeToRestore
@elanv elanv changed the title [WIP] Improve update and savepoint handling Improve update and savepoint handling Apr 5, 2021
@elanv
Copy link
Contributor Author

elanv commented Apr 5, 2021

@functicons Could you review this PR? I think completed this work almost. Now, it would be nice to add the documentation and tests while proceeding with the review.

@elanv
Copy link
Contributor Author

elanv commented Apr 5, 2021

@shashken I have assumed that the job must be resumed from its last state when update is triggered . So I thought takeSavepointOnUpdate == true means savepoint must be taken always before job update. Assuming on that, it seems that the second issue can be resolved with takeSavepointOnUpdate == false and autoSavepointSeconds == 3600. If it is not appropriate to take savepoint so often, IMO, it seem efficient to take savepoint on update time with takeSavepointOnUpdate == true. If it is necessary to support use case just like the issue, the update strategy need to be elaborated more.

And in the last commit, I applied maxStateAgeToRestoreSeconds to the case takeSavepointOnUpdate == false also, to prevent updating running job with too old savepoint.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FlinkOperator crashes when deploying a new Job in a FlinkCluster
3 participants