Skip to content

Commit

Permalink
Add merge
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 20, 2023
1 parent f4df01b commit 4f36887
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 9 deletions.
107 changes: 107 additions & 0 deletions src/Internal/ConcurrentMergedIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?php declare(strict_types=1);

namespace Amp\Pipeline\Internal;

use Amp\Cancellation;
use Amp\DeferredCancellation;
use Amp\Future;
use Amp\Pipeline\ConcurrentIterator;
use Amp\Pipeline\Queue;
use Revolt\EventLoop;
use function Amp\async;

/**
* @internal
*
* @template-covariant T
* @template-implements ConcurrentIterator<T>
*/
final class ConcurrentMergedIterator implements ConcurrentIterator
{
/** @var ConcurrentIterator<T> */
private readonly ConcurrentIterator $iterator;

private readonly DeferredCancellation $deferredCancellation;

/**
* @param ConcurrentIterator<T>[] $iterators
*/
public function __construct(array $iterators)
{
foreach ($iterators as $key => $iterator) {
if (!$iterator instanceof ConcurrentIterator) {
throw new \TypeError(\sprintf(
'Argument #1 ($iterators) must be of type array<%s>, %s given at key %s',
ConcurrentIterator::class,
\get_debug_type($iterator),
$key
));
}
}

$queue = new Queue(\count($iterators));
$this->iterator = $queue->iterate();

$this->deferredCancellation = $deferredCancellation = new DeferredCancellation();
$cancellation = $this->deferredCancellation->getCancellation();

$futures = [];
foreach ($iterators as $iterator) {
$futures[] = async(static function () use ($iterator, $queue, $cancellation): void {
try {
while ($iterator->continue($cancellation)) {
if ($queue->isComplete()) {
return;
}

$queue->push($iterator->getValue());
}
} finally {
$iterator->dispose();
}
});
}

EventLoop::queue(static function () use ($futures, $queue, $deferredCancellation): void {
try {
Future\await($futures);
$queue->complete();
} catch (\Throwable $exception) {
$queue->error($exception);
} finally {
$deferredCancellation->cancel();
}
});
}

public function continue(?Cancellation $cancellation = null): bool
{
return $this->iterator->continue($cancellation);
}

public function getValue(): mixed
{
return $this->iterator->getValue();
}

public function getPosition(): int
{
return $this->iterator->getPosition();
}

public function isComplete(): bool
{
return $this->iterator->isComplete();
}

public function dispose(): void
{
$this->iterator->dispose();
$this->deferredCancellation->cancel();
}

public function getIterator(): \Traversable
{
return $this->iterator;
}
}
46 changes: 37 additions & 9 deletions src/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

namespace Amp\Pipeline;

use Amp\Cancellation;
use Amp\Pipeline\Internal\ConcurrentArrayIterator;
use Amp\Pipeline\Internal\ConcurrentChainedIterator;
use Amp\Pipeline\Internal\ConcurrentClosureIterator;
use Amp\Pipeline\Internal\ConcurrentIterableIterator;
use Amp\Pipeline\Internal\ConcurrentMergedIterator;
use Amp\Pipeline\Internal\FlatMapOperation;
use Amp\Pipeline\Internal\Sequence;
use Amp\Pipeline\Internal\SortOperation;
Expand Down Expand Up @@ -58,7 +60,7 @@ public static function fromIterable(\Closure|iterable $iterable): self
*
* @template Ts
*
* @param \Closure(\Amp\Cancellation): Ts $supplier Elements to emit.
* @param \Closure(Cancellation): Ts $supplier Elements to emit.
*
* @return self<Ts>
*/
Expand All @@ -68,31 +70,57 @@ public static function generate(\Closure $supplier): Pipeline
}

/**
* Concatenates the given pipelines into a single pipeline in sequential order.
* Merges the given iterables into a single pipeline. The returned pipeline emits a value anytime one of the
* merged iterables produces a value.
*
* @template Ts
*
* @param array<iterable<Ts>> $pipelines
*f
* @return self<Ts>
*/
public static function merge(array $pipelines): self
{
return new self(new ConcurrentMergedIterator(self::mapToConcurrentIterators($pipelines)));
}

/**
* Concatenates the given iterables into a single pipeline in sequential order.
*
* The prior pipeline must complete before values are taken from any subsequent pipelines.
*
* @template Ts
*
* @param iterable<array-key, Ts>[] $pipelines
* @param array<iterable<Ts>> $pipelines
*
* @return self<Ts>
*/
public static function concat(array $pipelines): self
{
foreach ($pipelines as $key => $pipeline) {
if (!\is_iterable($pipeline)) {
return new self(new ConcurrentChainedIterator(self::mapToConcurrentIterators($pipelines)));
}

/**
* @template Tk of array-key
* @template Ts
*
* @param array<Tk, iterable<Ts>> $iterables
*
* @return array<Tk, ConcurrentIterator<Ts>>
*/
private static function mapToConcurrentIterators(array $iterables): array
{
foreach ($iterables as $key => $iterable) {
if (!\is_iterable($iterable)) {
throw new \TypeError(\sprintf(
'Argument #1 ($pipelines) must be of type array<iterable>, %s given at key %s',
\get_debug_type($pipeline),
\get_debug_type($iterable),
$key,
));
}
}

return new self(new ConcurrentChainedIterator(
\array_map(static fn (iterable $pipeline) => self::fromIterable($pipeline)->getIterator(), $pipelines)
));
return \array_map(static fn (iterable $pipeline) => self::fromIterable($pipeline)->getIterator(), $iterables);
}

private int $concurrency = 1;
Expand Down
123 changes: 123 additions & 0 deletions test/MergeTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
<?php

namespace Amp\Pipeline;

use Amp\Future;
use Amp\PHPUnit\AsyncTestCase;
use Amp\PHPUnit\TestException;
use function Amp\async;
use function Amp\delay;

class MergeTest extends AsyncTestCase
{
public function getArrays(): array
{
return [
[[\range(1, 3), \range(4, 6)], [1, 4, 2, 5, 3, 6]],
[[\range(1, 5), \range(6, 8)], [1, 6, 2, 7, 3, 8, 4, 5]],
[[\range(1, 4), \range(5, 10)], [1, 5, 2, 6, 3, 7, 4, 8, 9, 10]],
];
}

/**
* @dataProvider getArrays
*
* @param array $array
* @param array $expected
*/
public function testMerge(array $array, array $expected): void
{
$pipelines = \array_map(static function (array $iterator): Pipeline {
return Pipeline::fromIterable($iterator)->tap(fn () => delay(0.01));
}, $array);

$pipeline = Pipeline::merge($pipelines);

self::assertSame($expected, $pipeline->toArray());
}

/**
* @depends testMerge
*/
public function testMergeWithDelayedYields(): void
{
$pipelines = [];
$values1 = [$this->asyncValue(0.01, 1), $this->asyncValue(0.05, 2), $this->asyncValue(0.07, 3)];
$values2 = [$this->asyncValue(0.02, 4), $this->asyncValue(0.04, 5), $this->asyncValue(0.06, 6)];
$expected = [1, 4, 5, 2, 6, 3];

$pipelines[] = Pipeline::fromIterable(function () use ($values1) {
foreach ($values1 as $value) {
yield $value->await();
}
});

$pipelines[] = Pipeline::fromIterable(function () use ($values2) {
foreach ($values2 as $value) {
yield $value->await();
}
});

$pipeline = Pipeline::merge($pipelines);

self::assertSame($expected, $pipeline->toArray());
}

/**
* @depends testMerge
*/
public function testDisposedMerge(): void
{
$pipelines = [];

$pipelines[] = Pipeline::fromIterable([1, 2, 3, 4, 5])->tap(fn () => delay(0.1));
$pipelines[] = Pipeline::fromIterable([6, 7, 8, 9, 10])->tap(fn () => delay(0.1));

$iterator = Pipeline::merge($pipelines)->getIterator();

$this->expectException(DisposedException::class);
$this->setTimeout(0.3);

while ($iterator->continue()) {
if ($iterator->getValue() === 7) {
$iterator->dispose();
}
}
}

/**
* @depends testMerge
*/
public function testMergeWithFailedPipeline(): void
{
$exception = new TestException;
$generator = Pipeline::fromIterable(static function () use ($exception) {
yield 1; // Emit once before failing.
throw $exception;
});

$pipeline = Pipeline::merge([$generator, Pipeline::fromIterable(\range(1, 5))]);

try {
$pipeline->forEach(fn () => null);
self::fail("The exception used to fail the pipeline should be thrown from continue()");
} catch (TestException $reason) {
self::assertSame($exception, $reason);
}
}

public function testNonPipeline(): void
{
$this->expectException(\TypeError::class);

Pipeline::merge([1]);
}

private function asyncValue(float $delay, mixed $value): Future
{
return async(static function () use ($delay, $value): mixed {
delay($delay);
return $value;
});
}
}

0 comments on commit 4f36887

Please sign in to comment.