Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Way to cancel pending tokens #31

Open
Janpot opened this issue May 31, 2019 · 3 comments
Open

Way to cancel pending tokens #31

Janpot opened this issue May 31, 2019 · 3 comments

Comments

@Janpot
Copy link

Janpot commented May 31, 2019

I find I have the need to do a series of concurrent tasks, with a max concurrency, but also with a max total run time. After this time, whatever hasn't finished or started can be ignored.

I propose to add a new API

sema.cancel()

Which rejects all pending .acquire() calls. This way I can do the following

const timeout = setTimeout(() => sema.cancel(), 5000);
await Promise.all(jobs.map(async job => {
  try {
    sema.acquire();
  } catch (err) {
    if (err.code === 'CANCELLED') {
      return null
    }
    throw err
  }

  try {
    await doWork(job);
  } finally {
    sema.release();
  }
}));
clearTimeout(timeout);

If accepted I can make a PR

@OlliV
Copy link
Collaborator

OlliV commented Jul 23, 2019

Hi,

Sorry I have missed this issue. Sounds really good to me. How would that work then, by throwing maybe?

@Janpot
Copy link
Author

Janpot commented Jul 23, 2019

@OlliV I currently use a homegrown mini-version of async-sema that implements just the functionality I needed from it. But it's, apart from the cancellation, API-compatible with it. When .cancel is called it simply rejects all pending acquire calls with an error that has a .code property of 'CANCELLED'. Anything similar to this to work for me. I'll include the code for that:

Code
/**
 * @typedef {{ start: () => void, cancel: () => void }} Task
 */

// This is a very simplified version of https://github.com/zeit/async-sema
// But one that supports cancelling pending tokens
// can be replaced when/if this gets resolved
//   https://github.com/zeit/async-sema/issues/31
class AsyncSema {
  constructor (tokens = 1) {
    this._tokens = tokens;
    /** @type {Task[]} */
    this._taskQueue = [];
  }

  _createTask () {
    const task = { start: () => {}, cancel: () => {} };
    /** @type {Promise<void>} */
    const promise = new Promise((resolve, reject) => {
      task.start = () => resolve();
      task.cancel = () => reject(Object.assign(new Error('Task cancelled'), {
        code: 'CANCELLED'
      }));
    });
    this._taskQueue.push(task);
    return promise;
  }

  _scheduleNextTask () {
    if (this._tokens > 0) {
      const nextTask = this._taskQueue.shift();
      if (nextTask) {
        this._tokens -= 1;
        nextTask.start();
        this._scheduleNextTask();
      }
    }
  }

  async acquire () {
    const promise = this._createTask();
    this._scheduleNextTask();
    return promise;
  }

  release () {
    this._tokens += 1;
    this._scheduleNextTask();
  }

  async cancel () {
    const toCancel = this._taskQueue.splice(0, this._taskQueue.length);
    for (const task of toCancel) {
      task.cancel();
    }
  }
}

@OlliV
Copy link
Collaborator

OlliV commented Aug 12, 2019

Cool! 👍
Sorry for the delay, it's hard to keep up with all the notifications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants