Messenger component: deal with interruption of long running messages processing #54707
Unanswered
victor-upmeet
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello,
I use the messenger component to deal with messages that can take up to an hour to process. These processes are run with supervisor in docker containers. During a deployment, new containers are started, and then old ones are stopped. When this happens, supervisor receives and forwards the SIGTERM signal to my processes, which the messenger component handles to stop after the current process finishes the current message (if any).
Problem is, processes have a maximum of 2 minutes to stop after receiving a SIGTERM signal during deployment (tasks are run on AWS with Fargate), after which they are force killed. The message is stored in a SQS queue message, which has a visibility timeout of 2 hours. The consequence is, if my message is being processed for 10 minutes, and then killed, I'll have to wait for 1 hour and 50 minutes for the message to be visible again, and for a worker to continue the process.
These long running processes have intermediate steps, so the issue is not that the message will be reprocessed entirely (it will continue at the last step it previously reached), but rather that there will be a pretty long time between the interruption of the process, and the start of the next one, and I feel like something could be done to fix this.
So, here is the question. Is there a way for my message handler to be notified of the interruption of the process, so that it can stop as soon as possible, and send the message back to the transport so another consumer can process it and finish the job if not already finished?
I thought about listening to SIGTERM / SIGINT signals with pcntl_signal, and throw a RecoverableMessageHandlingException so that the message will be sent back to its transport. It looks like it is the most straightforward way to deal with this. But if it works, it looks like it is something Symfony could take care of, by allowing message handlers to implement some interface having a method called when stop is requested, and return some result, like "has the message been processed or not", and if not, messenger could be in charge of throwing a RecoverableMessageHandlingException. I would be happy to open a PR implementing that, but before starting working on this I would be happy to know what you think about this. Maybe you have a better solution, or maybe there is already something in place.
Thanks!
EDIT: after a quick look at how messenger deals with message handlers, it would be quite hard to implement the described PR, specifically the part where messenger would listen to some results to know if it should throw a RecoverableMessageHandlingException or not. Still, I think it should be its responsability to dispatch somehow a signal to handlers to know that they should stop. If handlers are configured to receive these stop signals, they could do whatever they want (ie. throwing a RecoverableMessageHandlingException if the message is still being processed). A "StoppableMessageHandler" parent class or "StoppableMessageHandlerTrait" trait could even be provided by Symfony to implement a "shouldStop" boolean attribute, a method handling the stop signal setting this attribute to true, and a method throwing a RecoverableMessageHandlingException if shouldStop is true that can be called by the message handler each time it finishes a step.
Beta Was this translation helpful? Give feedback.
All reactions