Skip to content

Commit

Permalink
Add Pipeline::buffer()
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Mar 1, 2024
1 parent d97d7a3 commit 38d9f2c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 8 deletions.
11 changes: 8 additions & 3 deletions src/Internal/ConcurrentFlatMapIterator.php
Expand Up @@ -24,9 +24,14 @@ final class ConcurrentFlatMapIterator implements ConcurrentIterator
* @param ConcurrentIterator<T> $iterator
* @param \Closure(T, int):iterable<R> $flatMap
*/
public function __construct(ConcurrentIterator $iterator, int $concurrency, bool $ordered, \Closure $flatMap)
{
$queue = new QueueState;
public function __construct(
ConcurrentIterator $iterator,
int $bufferSize,
int $concurrency,
bool $ordered,
\Closure $flatMap,
) {
$queue = new QueueState($bufferSize);
$this->iterator = new ConcurrentQueueIterator($queue);
$order = $ordered ? new Sequence : null;

Expand Down
4 changes: 2 additions & 2 deletions src/Internal/ConcurrentIterableIterator.php
Expand Up @@ -20,7 +20,7 @@ final class ConcurrentIterableIterator implements ConcurrentIterator
/**
* @param iterable<T> $iterable
*/
public function __construct(iterable $iterable)
public function __construct(iterable $iterable, int $bufferSize = 0)
{
if (\is_array($iterable)) {
$this->iterator = new ConcurrentArrayIterator($iterable);
Expand All @@ -36,7 +36,7 @@ public function __construct(iterable $iterable)
$iterable = $iterable->getIterator();
}

$queue = new QueueState();
$queue = new QueueState($bufferSize);
$this->iterator = new ConcurrentQueueIterator($queue);

async(static function () use ($queue, $iterable): void {
Expand Down
11 changes: 9 additions & 2 deletions src/Internal/FlatMapOperation.php
Expand Up @@ -23,6 +23,7 @@ public static function getStopMarker(): object
* @param \Closure(T, int):iterable<R> $flatMap
*/
public function __construct(
private readonly int $bufferSize,
private readonly int $concurrency,
private readonly bool $ordered,
private readonly \Closure $flatMap
Expand All @@ -45,9 +46,15 @@ public function __invoke(ConcurrentIterator $source): ConcurrentIterator
yield $item;
}
}
})());
})(), $this->bufferSize);
}

return new ConcurrentFlatMapIterator($source, $this->concurrency, $this->ordered, $this->flatMap);
return new ConcurrentFlatMapIterator(
$source,
$this->bufferSize,
$this->concurrency,
$this->ordered,
$this->flatMap,
);
}
}
24 changes: 23 additions & 1 deletion src/Pipeline.php
Expand Up @@ -8,6 +8,7 @@
use Amp\Pipeline\Internal\ConcurrentClosureIterator;
use Amp\Pipeline\Internal\ConcurrentIterableIterator;
use Amp\Pipeline\Internal\FlatMapOperation;
use Amp\Pipeline\Internal\IntermediateOperation;
use Amp\Pipeline\Internal\Sequence;
use Amp\Pipeline\Internal\SortOperation;
use function Amp\delay;
Expand Down Expand Up @@ -96,10 +97,15 @@ public static function concat(array $pipelines): self
));
}

/** @var non-negative-int */
private int $bufferSize = 0;

/** @var positive-int */
private int $concurrency = 1;

private bool $ordered = true;

/** @var list<IntermediateOperation> */
private array $intermediateOperations = [];

private bool $used = false;
Expand All @@ -119,6 +125,17 @@ public function __destruct()
}
}

public function buffer(int $bufferSize): self
{
if ($bufferSize < 0) {
throw new \ValueError('Argument #1 ($bufferSize) must be non-negative, got ' . $bufferSize);
}

$this->bufferSize = $bufferSize;

return $this;
}

public function concurrent(int $concurrency): self
{
if ($concurrency < 1) {
Expand Down Expand Up @@ -318,7 +335,12 @@ public function flatMap(\Closure $flatMap): self
throw new \Error('Pipeline consumption has already been started');
}

$this->intermediateOperations[] = new FlatMapOperation($this->concurrency, $this->ordered, $flatMap);
$this->intermediateOperations[] = new FlatMapOperation(
$this->bufferSize,
$this->concurrency,
$this->ordered,
$flatMap,
);

/** @var self<R> */
return $this;
Expand Down

0 comments on commit 38d9f2c

Please sign in to comment.