Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Feature request: Do not try to recover output checkpoints for plugins that don't use it #896

Open
sundbry opened this issue May 23, 2019 · 3 comments

Comments

@sundbry
Copy link
Contributor

sundbry commented May 23, 2019

Many output plugins do not use checkpoint state. It would be nice if the system did not even bother to read/write these checkpoint files if they are just going to be empty anyways. This should give us a couple benefits:

  1. Anti-fragility in resuming jobs (especially when the output/structure of the job changes but the input stays the same)
  2. Reduced load and costs of S3

I imagine to keep the interface flexible, we could add to the plugin protocol to check at runtime for various features of the plugin, such as if output checkpointing is supported.

Thoughts?

(Typical stack trace when I resume a job and output checkpointing fails b/c I changed the job definition around)

ERROR 2019-05-23 09:23:29,085 service.data.job.core: {:message Onyx lifecycle exception,  :phase :lifecycle/recover-output}
com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: A07B7892DD6BE81F; S3 Extended Request ID: AOGX9hab+QAnQIuIk9C1gVVVSOPTZRUeBNkzRIbUE/vxnk7wlDS/OWqqquH/M9GNnWNUr4DWyF8=), S3 Extended Request ID: AOGX9hab+QAnQIuIk9C1gVVVSOPTZRUeBNkzRIbUE/vxnk7wlDS/OWqqquH/M9GNnWNUr4DWyF8=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
        at com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
        at onyx.storage.s3$read_checkpointed_bytes.invokeStatic(s3.clj:102)
        at onyx.storage.s3$read_checkpointed_bytes.invoke(s3.clj:100)
        at onyx.storage.s3$eval49560$fn__49562$fn__49564.invoke(s3.clj:241)
        at onyx.storage.s3$eval49560$fn__49562.invoke(s3.clj:239)
        at clojure.lang.MultiFn.invoke(MultiFn.java:284)
        at onyx.peer.resume_point$read_checkpoint.invokeStatic(resume_point.clj:56)
        at onyx.peer.resume_point$read_checkpoint.invoke(resume_point.clj:51)
        at onyx.peer.resume_point$recover_output.invokeStatic(resume_point.clj:112)
        at onyx.peer.resume_point$recover_output.invoke(resume_point.clj:106)
        at onyx.peer.task_lifecycle$recover_output.invokeStatic(task_lifecycle.clj:486)
        at onyx.peer.task_lifecycle$recover_output.invoke(task_lifecycle.clj:479)
        at onyx.peer.task_lifecycle.TaskStateMachine.exec(task_lifecycle.clj:1070)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:550)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:540)
        at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__43880.invoke(task_lifecycle.clj:1155)
        at clojure.core.async$thread_call$fn__11217.invoke(async.clj:442)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
@jgerman
Copy link

jgerman commented Oct 14, 2019

We actually see this issue in our long running jobs even when no changes to the job have been made. The stack trace is almost identical other than you're using S3 and we're using ZK.

I'm pretty sure ours is due to a task rebooting but haven't figured out why it happens, or why it appears to consistently be the same output task.

@sundbry
Copy link
Contributor Author

sundbry commented Oct 14, 2019

@jgerman it sounds like your issue is unrelated to missing lookups when loading checkpoints.

Check the back-pressure on your tasks. It is the most frustrating and hard to identify cause of random peer timeouts. Despite the brief section in the user guide, onyx 0.14 does not handle backpressure all that well. When the preceeding task is pushing data out faster than the downstream tasks can take, it can stop processing heartbeat messages.

If. you have a lot of back-pressure, consider writing to an intermediate kafka topic to decouple the imbalance between producer/consumer speed. Tuning the idle-sleeps in the config as the user guide suggests is actually not that effective in my experience.

http://www.onyxplatform.org/docs/user-guide/0.14.x/#backpressure

@jgerman
Copy link

jgerman commented Oct 14, 2019

Thanks! Any suggestions for measuring back pressure? I've only indirectly looked at it via consumer lag on our kafka topics.

I've definitely seen issues with checkpoints not being written which I assumed was due to heartbeats (barriers?) not being processed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants