Skip to content

Commit

Permalink
add: process support queue & map
Browse files Browse the repository at this point in the history
add: process event support
add: process timeout support
add: mutex example
fix: readme
fix: lock bugs
  • Loading branch information
cotrufoa committed Apr 1, 2023
1 parent 629a7cf commit f1ba6fa
Show file tree
Hide file tree
Showing 4 changed files with 427 additions and 268 deletions.
68 changes: 60 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# mprocess

Multi process task execution based on swoole, support queue/lock/atomic

### Installation
Expand All @@ -9,8 +10,7 @@ composer require zv/mprocess

### basic


```
```php

$process = new \ZV\MProcess(5);

Expand All @@ -35,9 +35,9 @@ $process->do(function ($process_index, $process_count) use ($task_list) {

```

### queue
### queue

```
```php

$process = new \ZV\MProcess(5);

Expand All @@ -61,9 +61,9 @@ $process->do(function ($process_index, $process_count) use ($process) {

```

### loop task use multi process
### loop task under multi process

```
```php

$tasks = range(1, 10);
$process = new \ZV\MProcess(count($tasks), 1024 * 10);
Expand Down Expand Up @@ -92,10 +92,9 @@ $process->loop(function ($task_data) use ($process) {

```


### locker && atomic

```
```php

$tasks = range(1, 20);
$process = new \ZV\MProcess(count($tasks), 1024 * 10);
Expand All @@ -116,4 +115,57 @@ $process->loop(function ($id) use ($process) {

```

### event

```php

$tasks = range(1, 20);
$process = new \ZV\MProcess(count($tasks), 1024 * 10);
// start count($tasks) process loop
$process->on_init(function() use($tasks) {
// init process by master process
foreach ($tasks as $id) {
$process->push($id);
}
})->loop(function ($id) use ($process) {
// push new task
$process->lock(function () use ($process, $id) {
if ($process->incr('tasks') < 5) {
$log = sprintf('[PID=%s][GotTask=%d]' . PHP_EOL, getmypid(), $id);
echo $log;
}
});
})->on_done(function(){
// when all process work done
})->on_timeout(function(){
// when process wait timeout
})->wait(600);// wait all process for 10 miniutes

```

### map && queue

```php

$tasks = range(1, 20);
$process = new \ZV\MProcess(count($tasks), 1024 * 10, 64);

// set key index
$process['test'] = 1;

// push to queue
foreach ($tasks as $id) {
$process->push($id);
}

// loop queue
$process->loop(function ($id) use ($process) {
echo $id,PHP_EOL;
})->wait();

// keep in memory
echo $process['test'] == 1, PHP_EOL;

```

more example see example/
46 changes: 46 additions & 0 deletions example/mutex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

include '../src/MProcess.php';
include '../src/QueueTable.php';
$retry = 100;
while ($retry--) {
$steps = rand(1, 9) * (10 ** rand(1, 3));
$zoom = rand(3, 9);
$process = new ZV\MProcess(10, $steps, 64);
$process['test'] = $steps;
$process->on_init(function () use ($process, $steps) {
for ($i = 0; $i < $steps; $i++) {
$process->push(9999);
}
})->do(function ($index, $count) use ($zoom, $process, $steps) {
if ($index == 0) {
for ($i = 0; $i < $steps * ($zoom - 1); $i++) {
$process->push(999);
}
}
while (true) {
// test for pop/shift
$task = rand(0, 1) ? $process->shift() : $process->pop();
//
$task > 0 && $process->incr('result');
if ($task === null) {
break;
}
}
})->wait();


echo '======TestRound(' . $retry . ')======', PHP_EOL;

if ($process->atomic('result') != $steps * $zoom) {
echo 'Failed=', $process->atomic('result'), PHP_EOL;
} else {
echo 'Succed=', $process->atomic('result'), PHP_EOL;
}
if ($process['test'] != $steps) {
echo 'KeyFailed', PHP_EOL;
} else {
echo 'KeySucced', PHP_EOL;
}
echo PHP_EOL;
}

0 comments on commit f1ba6fa

Please sign in to comment.