Skip to content

Abstract some Message Queuing system using the Adapter pattern

License

Notifications You must be signed in to change notification settings

benjamindulau/BarbeQ

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

43 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

BarbeQ

Abstract some Message Queuing system using the Adapter pattern

Build Status

Work In Progress

This is a work in progress and unfinished business!

Usage example

producer.php

<?php

require_once 'autoload.php';

use Symfony\Component\EventDispatcher\EventDispatcher;
use BarbeQ\BarbeQ;
use BarbeQ\Adapter\AmqpAdapter;
use BarbeQ\Model\Message;

$messageDispatcher = new EventDispatcher();
$dispatcher = new EventDispatcher();

// AMQP
$connection = array('host' => 'localhost');
$exchange = array('name' => 'test_direct');
$queues = array(array('name' => 'test'));
$adapter = new AmqpAdapter($connection, $exchange, $queues);

// PDO
//$pdo = new \PDO('mysql:dbname=barbeq', 'root', '');
//$adapter = new \BarbeQ\Adapter\PdoAdapter($pdo, array(
//    'table' => 'queuing',
//));

$barbeQ = new BarbeQ($adapter, $messageDispatcher, $dispatcher);

$barbeQ->cook('test', new Message(array(
    'id' => 1,
    'foo' => 'bar',
)));
// or $barbeQ->publish(...), same action

consumer.php

<?php

require_once 'autoload.php';

use Symfony\Component\EventDispatcher\EventDispatcher;
use BarbeQ\Adapter\AmqpAdapter;
use BarbeQ\BarbeQ;
use Acme\PocBundle\Consumer\TestConsumer;

$messageDispatcher = new EventDispatcher();
$dispatcher = new EventDispatcher();

// AMQP
$connection = array('host' => 'localhost');
$exchange = array('name' => 'test_direct');
$queues = array(array('name' => 'test'));
$adapter = new AmqpAdapter($connection, $exchange, $queues);

// PDO
//$pdo = new \PDO('mysql:dbname=barbeq', 'root', '');
//$adapter = new \BarbeQ\Adapter\PdoAdapter($pdo, array(
//    'table' => 'queuing',
//));

$barbeQ = new BarbeQ($adapter, $messageDispatcher, $dispatcher);

$testConsumer = new TestConsumer();
$barbeQ->addConsumer('test', $testConsumer);

// Trace what's happening
$barbeQ->addListener('barbeq.pre_consume', function(ConsumeEvent $event) {
    echo sprintf("Start consuming message #%d\n", $event->getMessage()->getMetadataValue('index'));
});

$barbeQ->addListener('barbeq.post_consume', function(ConsumeEvent $event) {
    echo sprintf("Memory: %s, Time: %0.04fs\n", $event->getMessage()->getMemory(), $event->getMessage()->getTime());
});

$barbeQ->eat('test', 5);
// or $barbeQ->consume(...), same action

Testing

$ php composer.phar update --dev
$ phpunit

License

This bundle is under the MIT license. See the complete license in the bundle:

LICENSE

Credits

Logo by Yuminette

About

Abstract some Message Queuing system using the Adapter pattern

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages