Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Endless pipeline loop when trying to import corrupted document #408

Open
marcominerva opened this issue Apr 16, 2024 · 15 comments
Open
Labels
enhancement New feature or request

Comments

@marcominerva
Copy link
Contributor

Context / Scenario

In service mode (using a queue), when trying to import a corrupted document (i.e., invalid PDF file), decoder will throw an exception, but then the message will be put again in the queue, generating an endless loop:

warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.100334.6972903.f0f10e85d780488d927f6c03891d20d5' processing failed with exception, 
      putting message back in the queue.
      Message content: {"index":"default","document_id":"98d5b5a2d38e4cf49dbda5b50c4502c2202404161003346000416","execution_id":"e6f9118e170745168f1d0bc1a9cd39b5","steps":["extract","partition","gen_embeddings","save_records"]}

What happened?

If decoding fails with an exception, I expect that the document will be marked as failed to process and not process again. I see also that in the DataPipelineStatus class, there is a Failed property, but it is always false:

public DataPipelineStatus ToDataPipelineStatus()
{
return new DataPipelineStatus
{
Completed = this.Complete,
Failed = false, // TODO
Empty = this.Files.Count == 0,
Index = this.Index,
DocumentId = this.DocumentId,
Tags = this.Tags,
Creation = this.Creation,
LastUpdate = this.LastUpdate,
Steps = this.Steps,
RemainingSteps = this.RemainingSteps,
CompletedSteps = this.CompletedSteps,
};
}

Importance

I cannot use Kernel Memory

Platform, Language, Versions

Kernel Memory v0.36.240415.2

Relevant log output

dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
      Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
      UglyToad.PdfPig.Core.PdfDocumentFormatException: Could not find the version header comment at the start of the document.
         at UglyToad.PdfPig.Parser.FileStructure.FileHeaderParser.Parse(ISeekableTokenScanner scanner, IInputBytes inputBytes, Boolean isLenientParsing, ILog log)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.OpenDocument(IInputBytes inputBytes, ISeekableTokenScanner scanner, InternalParsingOptions parsingOptions)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(IInputBytes inputBytes, ParsingOptions options)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(Stream stream, ParsingOptions options)
         at UglyToad.PdfPig.PdfDocument.Open(Stream stream, ParsingOptions options)
         at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(Stream data, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(BinaryData data, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Handlers.TextExtractionHandler.ExtractTextAsync(FileDetails uploadedFile, BinaryData fileContent, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Handlers.TextExtractionHandler.InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.RunPipelineStepAsync(DataPipeline pipeline, IPipelineStepHandler handler, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.<>c__DisplayClass5_0.<<AddHandlerAsync>b__0>d.MoveNext()
      --- End of stack trace from previous location ---
         at Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues.<>c__DisplayClass19_0.<<OnDequeue>b__0>d.MoveNext()
info: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message received
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
      Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
@marcominerva marcominerva added bug Something isn't working triage labels Apr 16, 2024
@dluc
Copy link
Collaborator

dluc commented Apr 16, 2024

Does this happen only with SimpleQueues? By design SimpleQueues doesn't support poison queues, so that at dev time one can debug without having to worry about the number of retries.

With AzureQueue and RabbitMQ the number of retries should be capped, so that eventually the service stops retrying.

@marcominerva
Copy link
Contributor Author

I have tried with Azure Queues, I have verified that the message is moved to a poison queue after a certain number of retries. However, the number of retries is hard-coded:

private const int MaxRetryBeforePoisonQueue = 20;

What do you think about adding this parameter in AzureQueuesConfig?

For what concerns RabbitMQ, it seems that the message is always put in the queue again:

public void OnDequeue(Func<string, Task<bool>> processMessageAction)
{
this._consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
try
{
this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
bool success = await processMessageAction.Invoke(message).ConfigureAwait(false);
if (success)
{
this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties.MessageId);
this._channel.BasicAck(args.DeliveryTag, multiple: false);
}
else
{
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue", args.BasicProperties.MessageId);
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
}
}
#pragma warning disable CA1031 // Must catch all to handle queue properly
catch (Exception e)
{
// Exceptions caught by this block:
// - message processing failed with exception
// - failed to delete message from queue
// - failed to unlock message in the queue
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue", args.BasicProperties.MessageId);
// TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages.
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
}
#pragma warning restore CA1031
};
}

@dluc
Copy link
Collaborator

dluc commented Apr 17, 2024

no problem about making it configurable if it helps. If I remember correctly Azure Queues uses a count to decide when to discard a message, while RabbitMQ uses an expiration date, time to live. I would check the logs for this message "Message '{0}' received, expires at {1}" and see if the message actually expires.

@dluc dluc added enhancement New feature or request and removed bug Something isn't working triage labels Apr 17, 2024
@marcominerva
Copy link
Contributor Author

If I correctly understand the meaning of the requeue parameter:

this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);

It enqueue the a new message again in the queue, with a new expiration time, endlessly.

@marcominerva
Copy link
Contributor Author

By the way, there are some issues in RabbitMQPipeline:

this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);

image
image

It seems that all the properties of BasicProperties are null.

@marcominerva
Copy link
Contributor Author

@dluc I have made a PR to set the missing properties in RabbitMQ: #454.

After that, I think that also in RabbitMQPipeline we should handle a max retries number and poison queues like in AzureQueuePipeline. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.

dluc added a commit that referenced this issue May 6, 2024
## Motivation and Context (Why the change? What's the scenario?)
See
#408 (comment).
This PR sets the `MessageId` and `Expiration` properties for RabbitMQ
messages.

---------

Co-authored-by: Devis Lucato <devis@microsoft.com>
@dluc
Copy link
Collaborator

dluc commented May 6, 2024

@dluc I have made a PR to set the missing properties in RabbitMQ: #454.

After that, I think that also in RabbitMQPipeline we should handle a max retries number and poison queues like in AzureQueuePipeline. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.

There's a couple of approaches, IIRC all of them require multiple queues per queue. Worth reading:

@marcominerva
Copy link
Contributor Author

So, do you prefer using such solutions instead of the quorum queue?

@dluc
Copy link
Collaborator

dluc commented May 7, 2024

So, do you prefer using such solutions instead of the quorum queue?

quorum queues don't seem to offer a delayed delivery, do they?

When a message delivery fails we need to allow a configurable retry strategy, e.g.

  • retry 10 times, waiting 1 minute
  • retry for 24 hours, using exponential backoff

AFAIK with RabbitMQ it will require multiple queues.

@marcominerva
Copy link
Contributor Author

I have understood, so at least we need something like this, right?

https://stackoverflow.com/a/73358042/1728189

@marcominerva
Copy link
Contributor Author

Hi @dluc! Finally, I can return to this issue.

What do you think if we start implementing also in RabbitMQ the same behavior we have in Azure Queues?

try
{
if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue)
{
bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
if (success)
{
this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.MessageId);
await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false);
}
else
{
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
}
}
else
{
this._log.LogError("Message '{0}' reached max attempts, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
}
}
#pragma warning disable CA1031 // Must catch all to handle queue properly
catch (Exception e)
{
// Exceptions caught by this block:
// - message processing failed with exception
// - failed to delete message from queue
// - failed to unlock message in the queue
// - failed to move message to poison queue
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
// Note: if this fails, the exception is caught by this.DispatchMessages()
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
}

@dluc
Copy link
Collaborator

dluc commented Jun 3, 2024

hey @marcominerva that would be nice, if possible. It might be a lot of work considering the extra queues to introduce - so, before starting, do you think we could break it down in multiple steps, to have separate PRs?

@marcominerva
Copy link
Contributor Author

Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?

@dluc
Copy link
Collaborator

dluc commented Jun 4, 2024

Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?

yes please go ahead - what would be the name of the queues? I'm assuming when missing, if deleted, these extra queues will be automatically recreated, without causing errors.

@marcominerva
Copy link
Contributor Author

Actually, adding poison queues support like in Azure Queues has required a smaller amount of changes than I thought, so I have make a single PR with all the changes: #648.

Let me know if it is OK, or do you prefer to split it in any case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants