Skip to content

uginroot/async-symfony-process

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Description

Existing libraries work with an early generated pool of processes, this is poorly suited if there are a many processes and is generally not suitable if after the execution of the process it may be necessary to execute an additional one. This library to solve this problem.

This library works on the basis of the symfony/process

Install

composer require uginroot/async-symfony-process

Use

Basic

In the simplest case, you only need to specify a function that will generate new processes. If the function returns zero instead of a process, then execution will end after all current processes have finished executing.

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = range(1, 10);

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->execute();

Callback

If you need the result of a process, then you need to set a callback function that will be called after the completion of the process.

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;
use Uginroot\AsyncSymfonyProcess\ProcessWrapper;

$queue = range(1, 10);
$results = [];

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$callback = static function(ProcessWrapper $processWrapper) use (&$results):void{
    $results[] = (int)$processWrapper->getOutput();
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setCallback($callback);
$pool->execute();

Output listener

If the process is interactive, or you need to somehow react to the errors that the process generates, then you need to set the output listener. It will receive an instance of the current process, output type (Process :: ERR or Process :: OUT) and data generated by the process.

use Symfony\Component\Process\InputStream;
use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = range(1, 10);

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    $process = Process::fromShellCommandline(sprintf('echo %d', $value));
    $process->setInput(new InputStream());
    return $process;
};

$outputListener = static function(Process $process, string $type, string $data):void{
    /** @var InputStream $input */
    $input = $process->getInput();
    if($type === Process::ERR){
        $input->write('exit;');
    } elseif ($type === Process::OUT){
        if($data === 'Say yes:'){
            $input->write('yes');
        }
    }
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setOutputListener($outputListener);
$pool->execute();

While listener

To perform any actions at each iteration of the process execution loop, you must set a loop listener.

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = range(1, 10);

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$whileListener = static function():void{
    // pass
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setWhileListener($whileListener);
$pool->execute();

Endless execution loop

In order for the process execution cycle not to stop when the process factory returns zero, it is necessary to inform this when configuring the pool. You can react and replenish the list of processes in any of the called functions, for example, in the whileListener. If you need to halt infinite execution, then you need to specify this to the pool.

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;

$queue = [];
$iteration = 0;

$processFactory = static function() use (&$queue):?Process{
    if(count($queue) === 0){
        return null;
    }
    
    $value = array_shift($queue);
    return Process::fromShellCommandline(sprintf('echo %d', $value));
};

$whileListener = static function() use (&$queue, &$iteration, &$pool):void{
    $iteration++;
    
    if($iteration === 20){
        // Will end execution after executed and 
        // newly generated processes have finished
        $pool->setIsEternal(false);
    }
    
    if($iteration % 5 === 0){
        $queue[] = $iteration;
    }
};

$pool = new Pool();
$pool->setProcessFactory($processFactory);
$pool->setWhileListener($whileListener);
$pool->setIsEternal(true);
$pool->execute();

Use in class

use Symfony\Component\Process\Process;
use Uginroot\AsyncSymfonyProcess\Pool;
use Uginroot\AsyncSymfonyProcess\ProcessWrapper;

class AsyncProcess
{
    private array $indexes;
    private array $results = [];

    public function __construct() {
        $this->indexes = range(1, 10);
    }

    public function processFactory():?Process
    {
        if(count($this->indexes) === 0){
            return null;
        }

        $index = array_shift($this->indexes);
        return Process::fromShellCommandline(sprintf('echo %d', $index));
    }

    public function processCallback(ProcessWrapper $processWrapper):void
    {
        $index = (int)$processWrapper->getOutput();
        $this->results[] = $index;
    }

    public function run():void
    {
        $pool = new Pool();
        $pool
            ->setProcessFactory([$this, 'processFactory'])
            ->setCallback([$this, 'processCallback'])
            ->execute()
        ;
    }
}