Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncCollection IsEmpty #264

Open
romerod opened this issue Sep 12, 2022 · 3 comments
Open

AsyncCollection IsEmpty #264

romerod opened this issue Sep 12, 2022 · 3 comments
Assignees

Comments

@romerod
Copy link

romerod commented Sep 12, 2022

We are using the AsyncCollection to queue messages that have to be sent to a server. The send operation usually takes some time and sending multiple messages at once is faster than sending each message separetely.

Currently we have something like this:

var cancelledToken = new CancellationToken(true);

while(true)
{
     var buffer = new Message[10000]();
     buffer.Add(await asyncQueue.TakeAsync());
     var count = 1;
     while(count < buffer.Length)
     {
          try
          {
               // take currently available items
               buffer[count] = await asyncQueue.TakeAsync(cancelledToken);
               count++;
          } catch (TaskCanceledException) {
                break;
          }
     }

    await SendMessagesAsync(buffer, count);
}

This works great except that the debug output is flooded with TaskCanceledExceptions, especially when the functionality is used in a request/response scenario when messages do not pile up.

Would it be possible to expose the Empty and/or Count property or have a method which gets a specified number of items which are currently available?

@romerod
Copy link
Author

romerod commented Jun 15, 2023

Any suggestions on this?

@CZEMacLeod
Copy link

@romerod @StephenCleary It seems to me that the inner wait loop in DoTakeAsync should probably be guarded on the cancellation token also:

while (Empty && !_completed)

while (Empty && !_completed && !cancellationToken.IsCancellationRequested)
{
  if (sync)
  {
      _completedOrNotEmpty.Wait(cancellationToken);
  }
  else
  {
      await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
  }
}

I appreciate that as this is async, it might still have the occasional TaskCanceledException if the token was cancelled between the test and the wait, but it would eliminate the problem in this scenario where the token is always cancelled.

I think DoOutputAvailableAsync also needs the change, and then the main loop could be modified to be

var cancelledToken = new CancellationToken(true);

while (true)
{
  IList<Message> buffer = new List<Message>(10000);

  buffer.Add(await asyncQueue.TakeAsync());
  var count = 1;
  while (count < buffer.Count && await asyncQueue.OutputAvailableAsync(cancelledToken))
  {
    try
    {
      // take currently available items
      var message = await asyncQueue.TakeAsync(cancelledToken);
      if (message == null) break;
      buffer[count] = message;
      count++;
    }
    catch (InvalidOperationException)   // Thown if the producer completes
    {
      break;
    }
  }

  await SendMessagesAsync(buffer, count);
}

I think that perhaps the AsyncCollection should have a TryTakeAsync(CancellationToken cancellationToken) method which would be almost the same as the TakeAsync implementation, except not throwing the InvalidOperationException if the underlying take fails.

@romerod As this is open source, you could just include a private copy of the AsyncCollection class, modified for your needs if necessary...

@StephenCleary
Copy link
Owner

Would it be possible to expose the Empty and/or Count property or have a method which gets a specified number of items which are currently available?

No; there's too much potential for misuse; with concurrent/asynchronous collections, any Empty or Count property can be outdated by the time your code even gets the value.

However, there is a better way to meet your need: a Try* method. In the past, I've avoided Try* methods because the meaning of "try" is ambiguous (the most common meaning is "don't throw", but it can also mean "don't behave asynchronously", which would be the meaning in this case). Since System.Threading.Channels has adopted the Try* approach with their high-performance asynchronous collections, I'm comfortable adding Try* methods to mine as well.

@StephenCleary StephenCleary self-assigned this Jun 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants