Skip to content

Commit

Permalink
Add isComplete() to ConcurrentIterator (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Apr 8, 2022
1 parent de21737 commit dc38b82
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/ConcurrentArrayIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public function getPosition(): int
return $position;
}

public function isComplete(): bool
{
return $this->position >= $this->size;
}

public function dispose(): void
{
$this->disposed ??= new DisposedException;
Expand Down
5 changes: 5 additions & 0 deletions src/ConcurrentChainedIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public function getPosition(): int
return $this->iterators[$position]->getPosition();
}

public function isComplete(): bool
{
return $this->position->get() !== null;
}

public function dispose(): void
{
foreach ($this->iterators as $iterator) {
Expand Down
5 changes: 5 additions & 0 deletions src/ConcurrentIterableIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public function getPosition(): int
return $this->iterator->getPosition();
}

public function isComplete(): bool
{
return $this->iterator->isComplete();
}

public function dispose(): void
{
$this->iterator->dispose();
Expand Down
6 changes: 6 additions & 0 deletions src/ConcurrentIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public function getValue(): mixed;
*/
public function getPosition(): int;

/**
* @return bool {@code true} if the iterator has completed (either successfully or with an error) or {@code false}
* if the iterator may still emit more values.
*/
public function isComplete(): bool;

/**
* Disposes the iterator, indicating the consumer is no longer interested in the iterator output.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/Internal/ConcurrentClosureIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public function getPosition(): int
return $this->queue->getPosition();
}

public function isComplete(): bool
{
return $this->queue->isConsumed() || $this->queue->isDisposed();
}

public function dispose(): void
{
$this->queue->dispose();
Expand Down
5 changes: 5 additions & 0 deletions src/Internal/ConcurrentFlatMapIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public function dispose(): void
$this->iterator->dispose();
}

public function isComplete(): bool
{
return $this->iterator->isComplete();
}

public function getIterator(): \Traversable
{
while ($this->continue()) {
Expand Down
5 changes: 5 additions & 0 deletions src/Internal/ConcurrentQueueIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public function dispose(): void
$this->state->dispose();
}

public function isComplete(): bool
{
return $this->state->isConsumed() || $this->state->isDisposed();
}

public function getIterator(): \Traversable
{
while ($this->state->continue()) {
Expand Down
5 changes: 5 additions & 0 deletions src/Internal/QueueState.php
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public function getPosition(): int
return $position;
}

public function hasPending(): bool
{
return \array_key_exists($this->consumePosition, $this->emittedValues);
}

/**
* @see Pipeline::dispose()
*/
Expand Down
5 changes: 5 additions & 0 deletions test/ConcurrentDelayedArrayIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public function getPosition(): int
return $position;
}

public function isComplete(): bool
{
return $this->position >= $this->size;
}

public function dispose(): void
{
$this->disposed ??= new DisposedException;
Expand Down
10 changes: 10 additions & 0 deletions test/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ public function testEmit(): void
self::assertNull($future->await());

self::assertFalse($this->queue->isComplete());
self::assertFalse($iterator->isComplete());

$this->queue->complete();

self::assertTrue($this->queue->isComplete());
self::assertTrue($iterator->isComplete());

self::assertFalse($continue->await());
}
Expand All @@ -54,6 +56,7 @@ public function testFail(): void
self::assertTrue($this->queue->isComplete());

$pipeline = $this->queue->pipe()->getIterator();
self::assertTrue($pipeline->isComplete());

try {
$pipeline->continue();
Expand Down Expand Up @@ -123,6 +126,8 @@ public function testEmitAfterContinue(): void
return $pipeline->getValue();
});

delay(0); // Enter async function above.

$backPressure = $this->queue->pushAsync($value);

self::assertSame($value, $future->await());
Expand All @@ -136,7 +141,11 @@ public function testEmitAfterContinue(): void

self::assertNull($backPressure->await());

self::assertFalse($pipeline->isComplete());

$this->queue->complete();

self::assertTrue($pipeline->isComplete());
}

public function testContinueAfterComplete(): void
Expand Down Expand Up @@ -210,6 +219,7 @@ public function testEmitAfterDisposal(): void
{
$pipeline = $this->queue->pipe();
$pipeline->dispose();
self::assertTrue($pipeline->getIterator()->isComplete());
self::assertTrue($this->queue->isDisposed());

try {
Expand Down

0 comments on commit dc38b82

Please sign in to comment.