Skip to content

1.0.0 Beta 3

Pre-release
Pre-release
Compare
Choose a tag to compare
@trowski trowski released this 30 Jan 20:03
c2a5be2
  • Pipeline has been changed from an interface to a final class. ConcurrentIterator acts as the interface replacement
  • Pipeline::pipe() has been removed in favor of operator methods directly on Pipeline, such as map() and filter()
  • Emitter has been renamed to Queue
    • yield() has been renamed to push()
    • emit() has been renamed to pushAsync()
  • All functions in the Amp\Pipeline have been removed.
    • fromIterable() is available as Pipeline::fromIterable()
    • concat() is now Pipeline::concat()
    • Most operators are available directly on Pipeline
  • Added Pipeline::generate() that invokes a closure to create each pipeline value.

Example of using Pipeline for concurrency:

use Amp\Pipeline\Pipeline;
use function Amp\delay;

$pipeline = Pipeline::fromIterable(function (): \Generator {
    for ($i = 0; $i < 100; ++$i) {
        yield $i;
    }
});

$results = $pipeline->concurrent(10)
        ->tap(fn () => delay(\random_int(1, 10) / 10))  // Delay for 0.1 to 1 seconds, simulating I/O.
        ->map(fn (int $input): int => $input * 10)
        ->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3.

foreach ($results as $value) {
    echo $value, "\n";
}