Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async iteration is slow #383

Open
conartist6 opened this issue Nov 20, 2020 · 35 comments
Open

Async iteration is slow #383

conartist6 opened this issue Nov 20, 2020 · 35 comments

Comments

@conartist6
Copy link
Member

One of my goals for this library is to create APIs that will be suitable for working with streaming text, e.g. streams of characters read from disk on demand. The problem is that the async iteration API is relatively slow because it involves Promises which only resolve at the end of an event loop. Thus there is one event loop of overhead per character. Ouch.

The problem is the AsyncIterator API itself. The spec says an async iterator has a next() method which returns Promise<done, value>. It also specifies the execution order for promises, which is to say that something like setImmediate is needed before the promise can resolve.

If the problem is at this level, then I think so too is the solution. I think the solution is a new kind of iterator: an "asyncish" iterator which returns EITHER {done, value} or Promise<{done, value}>. It should be possible to reflectively distinguish between these two kinds of return from next(). It would also be necessary to define a new symbol, perhaps Symbol.for('asyncishIterator') to return objects conforming to that interface.

From there there's lots that could be done. A good consumer of asyncish iterators would be able to fall back to async-only iterators if only Symbol.asyncIterator were defined. A good producer of asyncish iterators would also define Symbol.asyncIterator to return an async-only iterator in case the consumer could not support asyncish iteration -- it would be relatively trivial to create an iterator wrapper which forced {done, value} into a Promise if it was not in one already.

There would be considerations around error handling, but I think none insurmountable.

Then finally the question would be how to work with such iterators. There I think coroutines would be useful. The babel transform that converts for await of loops into generators would be the ideal place to start, as it works in the desired way. The generator code could be modified to do such a conversion when generating the async versions of our methods.

@conartist6
Copy link
Member Author

I'm going to have to think about how I might benchmark to see what there is to be gained before I undertake such a large amount of work.

@conartist6
Copy link
Member Author

The bug report that started me thinking about this is here

@lukiano
Copy link

lukiano commented Jan 19, 2021

which returns EITHER {done, value} or Promise<{done, value}>

Hi, out of curiosity, isn't this similar to zalgo promises? No ordering guarantees.

@lukiano
Copy link

lukiano commented Jan 19, 2021

Never mind. I see you're acknowledging the issue in https://es.discourse.group/t/syncandasynciterator/554/17 .

@conartist6
Copy link
Member Author

Yeah I don't think it's an issue. It's generally acceptable to have APIs that return a value or a Promise. The original article about release zalgo specifically flags that as a good way to handle code which is sometimes synchronous. Also the existing async iterator protocol allows value to be (or not be) a Promise. If it is it will be awaited inside the for await..of loop, which, being usually unnecessary, is one reason that async iteration is slow.

@conartist6
Copy link
Member Author

OK I'm back to this. I had planned to put my build system ahead of it in my work queue, but now I think I may actually need (or very, very much want) to have this solved in order to build the build system. Time to dive back into iter-tools!!

@conartist6
Copy link
Member Author

The hard thing about fixing the problem is that I'm going to have to write my own code to transpile generators down to classes in order to have the level of control I need over the various promises. I do think it still makes sense to write our sources as generators, mostly because it's a significant convenience for readability.

@conartist6
Copy link
Member Author

It irks me that I couldn't get anyone who works on the language spec to engage seriously on this topic.

@conartist6
Copy link
Member Author

OK maybe instead of transpiling to classes I could base my transpilation off of babel's plugin which transpiles async generators down to coroutines of sync generators. It gets me from where I am now, working with language primitives that aren't flexible enough to do what I want, to something that is at least flexible. The transpilation is ugly as sin, but maybe it could be cleaned up.

@conartist6
Copy link
Member Author

Let's lay that all out. The input is:

async function *asyncMap(source, fn) {
  for await (const value of source) {
    yield fn(value);
  }
}

And once formatted the output is:

'use strict';

function _awaitAsyncGenerator(value) {
  return new _AwaitValue(value);
}

function _wrapAsyncGenerator(fn) {
  return function () {
    return new _AsyncGenerator(fn.apply(this, arguments));
  };
}

function _AsyncGenerator(gen) {
  var front, back;
  function send(key, arg) {
    return new Promise(function (resolve, reject) {
      var request = { key: key, arg: arg, resolve: resolve, reject: reject, next: null };
      if (back) {
        back = back.next = request;
      } else {
        front = back = request;
        resume(key, arg);
      }
    });
  }
  function resume(key, arg) {
    try {
      var result = gen[key](arg);
      var value = result.value;
      var wrappedAwait = value instanceof _AwaitValue;
      Promise.resolve(wrappedAwait ? value.wrapped : value).then(
        function (arg) {
          if (wrappedAwait) {
            resume(key === 'return' ? 'return' : 'next', arg);
            return;
          }
          settle(result.done ? 'return' : 'normal', arg);
        },
        function (err) {
          resume('throw', err);
        },
      );
    } catch (err) {
      settle('throw', err);
    }
  }
  function settle(type, value) {
    switch (type) {
      case 'return':
        front.resolve({ value: value, done: true });
        break;
      case 'throw':
        front.reject(value);
        break;
      default:
        front.resolve({ value: value, done: false });
        break;
    }
    front = front.next;
    if (front) {
      resume(front.key, front.arg);
    } else {
      back = null;
    }
  }
  this._invoke = send;
  if (typeof gen.return !== 'function') {
    this.return = undefined;
  }
}

_AsyncGenerator.prototype[
  (typeof Symbol === 'function' && Symbol.asyncIterator) || '@@asyncIterator'
] = function () {
  return this;
};

_AsyncGenerator.prototype.next = function (arg) {
  return this._invoke('next', arg);
};

_AsyncGenerator.prototype.throw = function (arg) {
  return this._invoke('throw', arg);
};

_AsyncGenerator.prototype.return = function (arg) {
  return this._invoke('return', arg);
};

function _AwaitValue(value) {
  this.wrapped = value;
}

function _asyncIterator(iterable) {
  var method,
    async,
    sync,
    retry = 2;
  for (
    'undefined' != typeof Symbol && ((async = Symbol.asyncIterator), (sync = Symbol.iterator));
    retry--;

  ) {
    if (async && null != (method = iterable[async])) return method.call(iterable);
    if (sync && null != (method = iterable[sync]))
      return new AsyncFromSyncIterator(method.call(iterable));
    (async = '@@asyncIterator'), (sync = '@@iterator');
  }
  throw new TypeError('Object is not async iterable');
}

function AsyncFromSyncIterator(s) {
  function AsyncFromSyncIteratorContinuation(r) {
    if (Object(r) !== r) return Promise.reject(new TypeError(r + ' is not an object.'));
    var done = r.done;
    return Promise.resolve(r.value).then(function (value) {
      return { value: value, done: done };
    });
  }
  return (
    (AsyncFromSyncIterator = function (s) {
      (this.s = s), (this.n = s.next);
    }),
    (AsyncFromSyncIterator.prototype = {
      s: null,
      n: null,
      next: function () {
        return AsyncFromSyncIteratorContinuation(this.n.apply(this.s, arguments));
      },
      return: function (value) {
        var ret = this.s.return;
        return void 0 === ret
          ? Promise.resolve({ value: value, done: !0 })
          : AsyncFromSyncIteratorContinuation(ret.apply(this.s, arguments));
      },
      throw: function (value) {
        var thr = this.s.return;
        return void 0 === thr
          ? Promise.reject(value)
          : AsyncFromSyncIteratorContinuation(thr.apply(this.s, arguments));
      },
    }),
    new AsyncFromSyncIterator(s)
  );
}

function asyncMap(_x, _x2) {
  return _asyncMap.apply(this, arguments);
}

function _asyncMap() {
  _asyncMap = _wrapAsyncGenerator(function* (source, fn) {
    var _iteratorAbruptCompletion = false;
    var _didIteratorError = false;

    var _iteratorError;

    try {
      for (
        var _iterator = _asyncIterator(source), _step;
        (_iteratorAbruptCompletion = !(_step = yield _awaitAsyncGenerator(_iterator.next())).done);
        _iteratorAbruptCompletion = false
      ) {
        const value = _step.value;
        yield fn(value);
      }
    } catch (err) {
      _didIteratorError = true;
      _iteratorError = err;
    } finally {
      try {
        if (_iteratorAbruptCompletion && _iterator.return != null) {
          yield _awaitAsyncGenerator(_iterator.return());
        }
      } finally {
        if (_didIteratorError) {
          throw _iteratorError;
        }
      }
    }
  });
  return _asyncMap.apply(this, arguments);
}

@conartist6
Copy link
Member Author

There's a lot to digest about this code. Like what the heck is going on here??

export function asyncMap(_x, _x2) {
  return _asyncMap.apply(this, arguments);
}

function _asyncMap() {
  _asyncMap = _wrapAsyncGenerator(function* (source, fn) {
    // ...
  });
  return _asyncMap.apply(this, arguments);
}

Why do we overwrite the definition of _asyncMap on the first call to it?

Actually I suspect it's in case a generator makes recursive calls to itself as part of its implementation, which would explain why it's a) present and b) unnecessary here.

@conartist6
Copy link
Member Author

conartist6 commented Apr 13, 2022

OK here's the whole thing again but cleaned up to be more stylistically in line with the rest of iter-tools. It's possible I've already broken something, but maybe not...

class _AwaitValue {
  constructor(value) {
    this.wrapped = value;
  }
}

function _wrapAsyncGenerator(fn) {
  return function () {
    return new _AsyncGenerator(fn.apply(this, arguments));
  };
}

class _AsyncGenerator {
  constructor(gen) {
    this._gen = gen;
    this._front = undefined;
    this._back = undefined;

    if (typeof gen.return !== 'function') {
      this.return = undefined;
    }
  }

  [Symbol.asyncIterator]() {
    return this;
  }

  _send(method, arg) {
    return new Promise((resolve, reject) => {
      const request = { method, arg, resolve, reject, next: null };
      if (this._back) {
        this._back = this._back.next = request;
      } else {
        this._front = this._back = request;
        this._resume(method, arg);
      }
    });
  }

  _resume(method, arg) {
    try {
      const step = this._gen[method](arg);
      const { value } = step;
      const wrappedAwait = value instanceof _AwaitValue;
      Promise.resolve(wrappedAwait ? value.wrapped : value).then(
        (arg) => {
          if (wrappedAwait) {
            // the sync generator hit an `await`
            this._resume(method === 'return' ? 'return' : 'next', arg);
          } else {
            // the sync generator hit a `yield`
            this._settle(step.done ? 'return' : 'normal', arg);
          }
        },
        (err) => {
          this._resume('throw', err);
        },
      );
    } catch (err) {
      this._settle('throw', err);
    }
  }

  _settle(type, value) {
    let front = this._front;
    switch (type) {
      case 'return':
        front.resolve({ value, done: true });
        break;
      case 'throw':
        front.reject(value);
        break;
      default:
        front.resolve({ value, done: false });
        break;
    }
    this._front = front = front.next;
    if (front) {
      this._resume(front.method, front.arg);
    } else {
      this._back = null;
    }
  }

  next(arg) {
    return this._send('next', arg);
  }

  throw(arg) {
    return this._send('throw', arg);
  }

  return(arg) {
    return this._send('return', arg);
  }
}

function _asyncIterator(iterable) {
  let method;

  if ((method = iterable[Symbol.asyncIterator]) != null) {
    return method.call(iterable);
  }
  if ((method = iterable[Symbol.iterator]) != null) {
    return new SyncAsAsyncIterator(method.call(iterable));
  }
  throw new TypeError('Object is not async iterable');
}

function AsyncFromSyncIteratorContinuation(step) {
  if (Object(step) !== step) return Promise.reject(new TypeError(step + ' is not an object.'));
  const done = step.done;
  return Promise.resolve(step.value).then((value) => {
    return { value, done };
  });
}

class SyncAsAsyncIterator {
  constructor(iter) {
    this._iter = iter;
  }

  next(...args) {
    const iter = this._iter;
    return AsyncFromSyncIteratorContinuation(iter.next(...args));
  }

  return(value, ...args) {
    const iter = this._iter;
    return undefined === iter.return
      ? Promise.resolve({ value, done: true })
      : AsyncFromSyncIteratorContinuation(iter.return(value, ...args));
  }

  throw(value, ...args) {
    const iter = this._iter;
    return undefined === iter.throw
      ? Promise.reject(value)
      : AsyncFromSyncIteratorContinuation(iter.throw(value, ...args));
  }
}

export function asyncMap(...args) {
  return _asyncMap(...args);
}

function _asyncMap() {
  _asyncMap = _wrapAsyncGenerator(function* (source, fn) {
    let _iterAbruptCompletion = false;
    let _didIterError = false;
    let _iter;
    let _iterError;

    try {
      _iter = _asyncIterator(source);
      for (
        let _step;
        (_iterAbruptCompletion = !(_step = yield new _AwaitValue(_iter.next())).done);
        _iterAbruptCompletion = false
      ) {
        const value = _step.value;
        yield fn(value);
      }
    } catch (err) {
      _didIterError = true;
      _iterError = err;
    } finally {
      try {
        if (_iterAbruptCompletion && _iter.return != null) {
          yield new _AwaitValue(_iter.return());
        }
      } finally {
        if (_didIterError) {
          throw _iterError;
        }
      }
    }
  });
  return _asyncMap.apply(this, arguments);
}

@conartist6
Copy link
Member Author

conartist6 commented Apr 13, 2022

Yeah ok, now that I understand this all (more or less) it definitely won't be too hard to shift it around so that I can have semi-sync-ness. The core or the optimization will be:

class _AsyncGenerator {
  _send(method, arg) {
    const step = this._gen[method](arg);
    const wrappedAwait = step.value instanceof _AwaitValue;
    return wrappedAwait
      ? new Promise((resolve, reject) => {
        // ...
        })
      : step;
  }
}

We'll also want to make sure that the synchronous coroutine doesn't make unnecessary awaits from for await .. of loops. The updated code there would look like this:

      for (
        let _step, __step;
        (__step = _iter.next()),
        (_iterAbruptCompletion = !(_step = !(__step instanceof Promise) && !(__step.value instanceof Promise) ? __step : yield new _AwaitValue(__step)).done);
        _iterAbruptCompletion = false
      ) {

@conartist6
Copy link
Member Author

conartist6 commented Apr 14, 2022

Now: am I going to need two sets of methods or three? Can "asyncish" methods completely replace async ones??

A lot of code will depend on the next() returning a promise, but that's fine because we'd never change the async iteration protocol itself. We'd just add an asyncish iteration protocol.

But! Under the hood, a shared implementation really would work differently. It would attempt to use asyncish sources when possible, and the timing of callbacks might be fundamentally altered, especially when using methods that mix sync and async data like asyncConcat(array, asyncIterable). In such a case the user would see the perf benefits for work on items coming from the array, but it would mean that some callbacks would be called in the same tick instead of different ticks, which could mean breakage in code which has come to depend on the passage of, say, at least one tick prior to a callback being called.

I'm thinking that that's probably fine in most of the workflows it is my primary intent to support, but I think the unaltered versions are too valuable not to include and could be called, say, @iter-tools/async/strict.

@conartist6
Copy link
Member Author

Next problem: I had been putting off doing all this until I could use the new macrome to do it. However, I'm coming here because I kind of want this done for the purposes of building macrome. I'd like for my accessor layer to consist purely of some functions over streams, and expect that the caller will feed a peekerator. That means the regex engine will go from (usually) 0 awaits per character to about 5 though, meaning a pretty huge slowdown.

Technically anything that makes macrome usable will be a huge win, and if I'm able to immediately use macrome to fix the perf problems in iter-tools, then I can use iter-tools to fix the perf problems in macrome. Wheeeee. I think this is probably what I should do.

@conartist6
Copy link
Member Author

I'm also still hoping that once I write all this up I can use it to pressure TC39 into acknowledging that a) there is a problem, b) they created it and c) my proposed solution makes sense.

@conartist6
Copy link
Member Author

The more I look into my ideal solution though the more I think it'll be really snarled with the perf problems. I'll need not just to read all the headers character by character through the endless awaits but the concurrent nature of the work will destroy locality and to do hashing I'll need to read entire documents character by character through several layers of tools (forkerate, hash, peekerate, peekerator.asIterator). 4 * 3 = 12 completely useless awaits likely accompanied by complete context switches for every single character. UGH!!!

@conartist6
Copy link
Member Author

conartist6 commented Apr 14, 2022

So maybe I should just use the current version of macrome to rebuild iter-tools. It'd seriously complicate the contribution process for a little while, but perhaps I could find a workaround and short-term difficulties in the contribution process are probably fine in a library nobody is contributing to.

@conartist6
Copy link
Member Author

Actually though, even better, I can use the macrome-2 branch to finish integrating with the current version of macrome and integrate the perf fixes, then I can do my special branch dance that only works when each commit is a package to have both branches bootstrap each other until by the time the dance is done I have a solution that is ready for the world. OK, let's do that.

@conartist6
Copy link
Member Author

conartist6 commented Apr 14, 2022

OK! I'm not going to get into any of the other changes I imagine being involved with deploying this in the long term since I just want to work out my dependency snarl. Here's what I managed to generate as impls/$map/async-map.js:

/* @macrome
 * @generatedby /generate/generators/impls/index.cjs
 * @generatedfrom ./$map.js#1643907550751
 * This file is autogenerated. Please do not edit it directly.
 * When editing run `npx macrome watch` then change the file this is generated from.
 */
import { _awaitAsyncGenerator, _wrapAsyncGenerator, _asyncIterator } from '../../internal/asyncish';

import { asyncIterableCurry } from '../../internal/async-iterable.js';

export function __asyncMap(_x, _x2) {
  return _$__map.apply(this, arguments);
}

function _$__map() {
  _$__map = _wrapAsyncGenerator(function* (source, func) {
    let c = 0;
    let _iteratorAbruptCompletion = false;
    let _didIteratorError = false;
    let _iterator;
    let _iteratorError;

    try {
      _iterator = _asyncIterator(source);
      for (
        let _step, _step2;
        (_step2 = _iterator.next()),
          (_iteratorAbruptCompletion = !(_step =
            !(_step2 instanceof Promise) && !(_step2.value instanceof Promise)
              ? _step2
              : yield _awaitAsyncGenerator(_step2)).done);
        _iteratorAbruptCompletion = false
      ) {
        const value = _step.value;
        yield yield _awaitAsyncGenerator(func(value, c++));
      }
    } catch (err) {
      _didIteratorError = true;
      _iteratorError = err;
    } finally {
      try {
        if (_iteratorAbruptCompletion && _iterator.return != null) {
          yield _awaitAsyncGenerator(_iterator.return());
        }
      } finally {
        if (_didIteratorError) {
          throw _iteratorError;
        }
      }
    }
  });

  return _$__map.apply(this, arguments);
}

export const asyncMap = /*#__PURE__*/ asyncIterableCurry(__asyncMap);

@conartist6
Copy link
Member Author

Tests (mostly) pass!

@conartist6
Copy link
Member Author

Lets see now. Anywhere I was making classes by hand, that is to say writing async next() {} I'll need to update. peekerate and forkerate definitely do this, and both are used in the macrome code I'm trying to rework.

@conartist6
Copy link
Member Author

conartist6 commented Apr 14, 2022

Asyncish peekerate is an interesting challenge since we can't really hide the fact that peekr.advance may or may not return a promise, which is particularly awkward for a method that returns this for chaining (well, for type checking actually).

@conartist6
Copy link
Member Author

That's sooo ugly. Consuming that peekr when the input is mixed would have to look like:

const peekr = await asyncishPeekerate(source);
while(!peekr.done) {
  doStuff(peekr.value);
  const peekr_ = peekr.advance();
  // js
  if(peekr_ instanceof Promise) await peekr_;
  // ts
  peekr = peekr_ instanceof Promise ? await peekr_ : peekr_;
}

@conartist6
Copy link
Member Author

It occurs to me that it probably makes sense for an async peekr to block access to current while it is advancing, since accesses to it during the time after advance has been called would be implicit race conditions.

@conartist6
Copy link
Member Author

conartist6 commented Apr 14, 2022

The obvious thing now that I have a moment to think about it is for current to become a promise. Usage then looks like:

const peekr = await asyncishPeekerate(source);
while(!peekr.done) {
  doStuff(peekr.value);
  peekr.advance();
  if (isPromise(peekr.current)) await peekr.current;
}

... peekr.done and peekr.value could actually throw if peekr.current is a promise. If I made done and value into promises some of the time it would risk errors with coercion, e.g. treating done: Promise as if it were done: true.

@conartist6
Copy link
Member Author

It's perf testing time!

@conartist6
Copy link
Member Author

conartist6 commented Apr 16, 2022

Welp, I've tested an my fancy strategy asyncish optimization does, ahem, fuck-all:

  • The "optimized" version is about a 40% slowdown over just using a native for await loop.
  • The "optimized" version is exactly the same speed as (actually very slightly slower than) the transpiled version with no attempt at optimization.

In addition:

  • My async-of-sync parser outperforms my async parser by 10x.
  • A parser composed out of iter-tools functionality (two asyncSplitOn's, an asyncMap, and an asyncStr) is another 70% slower than one not made by composition.
  • Just loading the whole file into memory and using a few string.split calls is 3x faster than even the async-of-sync parser, and thus 30x faster than any character-stream-based approach.

My full test cases and results are here. Times include reading the file from disk. The input was a ~5MB csv file (about 82 chunks on my system) with randomly generated data. Node v17.7.1 for ARM.

@conartist6
Copy link
Member Author

I think next I want to try to build a parser that loads the data synchronously (but still in chunks) so that I can see whether the slow part is generators or asyncness...

@conartist6
Copy link
Member Author

A fully sync chunked parser is about 2x slower than async-of-sync. That leaves a healthy chunk of the perf difference purely down to all the awaits.

@conartist6
Copy link
Member Author

conartist6 commented Apr 16, 2022

Hang on a sec. My optimization is not actually triggering yet... Hope remains!

@conartist6
Copy link
Member Author

Ah the optimization is being poisoned by my failure to correctly eliminate unnecessary promises in yield* which is part of join.

@conartist6
Copy link
Member Author

IT WORKS

@conartist6
Copy link
Member Author

This is so cool. The transpiled code with my optimization is now 3x faster than unoptimized untranspiled code. I can feed this optimization directly into iter-tools in a new major version, unlocking a lot of the potential of async iteration immediately! I can finish iter-tools@8 and really start promoting iter-tools to the public. I'll also use the newly-performant code in macrome, which needs it for its streaming parsing.

@conartist6
Copy link
Member Author

I've started a new thread on the TC39 forums: https://es.discourse.group/t/take-2-generator-prototype-symbol-mixediterator/1308/5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants