Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: swarrot/SwarrotBundle
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.0.2
Choose a base ref
...
head repository: swarrot/SwarrotBundle
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.1.0
Choose a head ref
  • 12 commits
  • 7 files changed
  • 6 contributors

Commits on Aug 19, 2014

  1. Improved exception message.

    Before: ```Unknown message type "johndoe". Available are [foobar].```
    
    After: ```Unknown message type "johndoe". Available are [foo,bar].```
    blaugueux committed Aug 19, 2014
    Copy the full SHA
    00a4143 View commit details

Commits on Aug 23, 2014

  1. Merge pull request #22 from blaugueux/patch-2

    Improved exception message.
    Olivier Dolbeau committed Aug 23, 2014
    Copy the full SHA
    51f3d0e View commit details

Commits on Sep 1, 2014

  1. Copy the full SHA
    39f718b View commit details
  2. Copy the full SHA
    2bb8b64 View commit details
  3. Copy the full SHA
    dcca712 View commit details

Commits on Sep 2, 2014

  1. Merge pull request #24 from ThisIsAreku/amqplib

    Added php-amqplib support
    Olivier Dolbeau committed Sep 2, 2014
    Copy the full SHA
    155c9d8 View commit details
  2. Fixed the DI configuration

    This fixes a mistakes introduced in #24.
    Closes #25
    stof committed Sep 2, 2014
    Copy the full SHA
    9373cb3 View commit details
  3. Merge pull request #26 from stof/fix_config

    Fixed the DI configuration
    Olivier Dolbeau committed Sep 2, 2014
    Copy the full SHA
    045194b View commit details

Commits on Sep 18, 2014

  1. Add a retry-attempts option.

    antoox committed Sep 18, 2014
    Copy the full SHA
    98df5fb View commit details
  2. Merge pull request #28 from antoox/add-retry-attempts-option

    Add a retry-attempts option.
    Olivier Dolbeau committed Sep 18, 2014
    Copy the full SHA
    4aa2f5c View commit details
  3. Prepare new version

    odolbeau committed Sep 18, 2014
    Copy the full SHA
    eb465cb View commit details
  4. Merge pull request #29 from odolbeau/prepare-new-verison

    Prepare new version
    Olivier Dolbeau committed Sep 18, 2014
    Copy the full SHA
    280ce31 View commit details
Showing with 148 additions and 10 deletions.
  1. +124 −0 Broker/AmqpLibFactory.php
  2. +1 −1 Broker/Publisher.php
  3. +5 −0 Command/SwarrotCommand.php
  4. +8 −4 DependencyInjection/Configuration.php
  5. +4 −4 DependencyInjection/SwarrotExtension.php
  6. +2 −0 Resources/config/swarrot.xml
  7. +4 −1 composer.json
124 changes: 124 additions & 0 deletions Broker/AmqpLibFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace Swarrot\SwarrotBundle\Broker;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use Swarrot\Broker\MessageProvider\PhpAmqpLibMessageProvider;
use Swarrot\Broker\MessagePublisher\PhpAmqpLibMessagePublisher;

class AmqpLibFactory implements FactoryInterface
{
protected $connections = array();
protected $channels = array();
protected $messageProviders = array();
protected $messagePublishers = array();

/**
* {@inheritDoc}
*/
public function addConnection($name, array $connection)
{
$this->connections[$name] = $connection;
}

/**
* {@inheritDoc}
*/
public function getMessageProvider($name, $connection)
{
if (!isset($this->messageProviders[$connection][$name])) {
if (!isset($this->messageProviders[$connection])) {
$this->messageProviders[$connection] = array();
}

$channel = $this->getChannel($connection);

$this->messageProviders[$connection][$name] = new PhpAmqpLibMessageProvider($channel, $name);
}

return $this->messageProviders[$connection][$name];
}

/**
* {@inheritDoc}
*/
public function getMessagePublisher($name, $connection)
{
if (!isset($this->messagePublishers[$connection][$name])) {
if (!isset($this->messagePublishers[$connection])) {
$this->messagePublishers[$connection] = array();
}

$channel = $this->getChannel($connection);

$this->messagePublishers[$connection][$name] = new PhpAmqpLibMessagePublisher($channel, $name);
}

return $this->messagePublishers[$connection][$name];
}

/**
* getChannel
*
* @param string $connection
*
* @return AMQPChannel
*/
protected function getChannel($connection)
{
if (isset($this->channels[$connection])) {
return $this->channels[$connection];
}

if (!isset($this->connections[$connection])) {
throw new \InvalidArgumentException(sprintf(
'Unknown connection "%s". Available: [%s]',
$connection,
implode(', ', array_keys($this->connections))
));
}

if (!isset($this->channels[$connection])) {
$this->channels[$connection] = array();
}

if (isset($this->connections[$connection]['ssl']) && $this->connections[$connection]['ssl']) {
if (empty($this->connections[$connection]['ssl_options'])) {
$ssl_opts = array(
'verify_peer' => true
);
} else {
$ssl_opts = array();
foreach ($this->connections[$connection]['ssl_options'] as $key => $value) {
if (!empty($value)) {
$ssl_opts[$key] = $value;
}
}
}

$conn = new AMQPSSLConnection(
$this->connections[$connection]['host'],
$this->connections[$connection]['port'],
$this->connections[$connection]['login'],
$this->connections[$connection]['password'],
$this->connections[$connection]['vhost'],
$ssl_opts
);
} else {
$conn = new AMQPConnection(
$this->connections[$connection]['host'],
$this->connections[$connection]['port'],
$this->connections[$connection]['login'],
$this->connections[$connection]['password'],
$this->connections[$connection]['vhost']
);
}
//$conn->connect();

$this->channels[$connection] = $conn->channel();

return $this->channels[$connection];
}
}
2 changes: 1 addition & 1 deletion Broker/Publisher.php
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ public function publish($messageType, Message $message, array $overridenConfig =
throw new \InvalidArgumentException(sprintf(
'Unknown message type "%s". Available are [%s].',
$messageType,
implode(array_keys($this->messageTypes))
implode(',', array_keys($this->messageTypes))
));
}

5 changes: 5 additions & 0 deletions Command/SwarrotCommand.php
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ protected function configure()
}
if (array_key_exists('retry', $this->processorStack)) {
$this->addOption('no-retry', 'R', InputOption::VALUE_NONE, 'Deactivate retry.');
$this->addOption('retry-attempts', null, InputOption::VALUE_REQUIRED, 'Number of maximum retry attempts (if it exists, override the extra data parameter)');
}
}

@@ -149,6 +150,10 @@ protected function getOptions(InputInterface $input)
if (isset($this->extras['retry_attempts'])) {
$attempts = $this->extras['retry_attempts'];
}

if ($input->hasOption('retry-attempts')) {
$attempts = (int) $input->getOption('retry-attempts');
}
$options['retry_attempts'] = $attempts;
}

12 changes: 8 additions & 4 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
@@ -33,10 +33,6 @@ public function getConfigTreeBuilder()
->scalarNode('provider')
->defaultValue('pecl')
->isRequired()
->validate()
->ifNotInArray(array('pecl', 'amqp_lib'))
->thenInvalid('Invalid provider "%s"')
->end()
->end()
->scalarNode('default_connection')->defaultValue(null)->end()
->scalarNode('default_command')->defaultValue('swarrot.command.base')->cannotBeEmpty()->end()
@@ -51,6 +47,14 @@ public function getConfigTreeBuilder()
->scalarNode('login')->defaultValue('guest')->cannotBeEmpty()->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->cannotBeEmpty()->end()
->booleanNode('ssl')->defaultValue(false)->end()
->arrayNode('ssl_options')
->children()
->booleanNode('verify_peer')->end()
->scalarNode('cafile')->end()
->scalarNode('local_cert')->end()
->end()
->end()
->end()
->end()
->end()
8 changes: 4 additions & 4 deletions DependencyInjection/SwarrotExtension.php
Original file line number Diff line number Diff line change
@@ -29,11 +29,11 @@ public function load(array $configs, ContainerBuilder $container)
$loader = new XmlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
$loader->load('swarrot.xml');

if ('pecl' === $config['provider']) {
$id = 'swarrot.factory.pecl';
} else {
throw new \InvalidArgumentException('Only pecl is supported for now');
$id = 'swarrot.factory.'.$config['provider'];
if (!$container->has($id)) {
throw new \InvalidArgumentException('Unsupported provider');
}

$definition = $container->getDefinition($id);

foreach ($config['connections'] as $name => $connectionConfig) {
2 changes: 2 additions & 0 deletions Resources/config/swarrot.xml
Original file line number Diff line number Diff line change
@@ -4,12 +4,14 @@

<parameters>
<parameter key="swarrot.factory.pecl.class">Swarrot\SwarrotBundle\Broker\PeclFactory</parameter>
<parameter key="swarrot.factory.amqp_lib.class">Swarrot\SwarrotBundle\Broker\AmqpLibFactory</parameter>
<parameter key="swarrot.command.base.class">Swarrot\SwarrotBundle\Command\SwarrotCommand</parameter>
<parameter key="swarrot.publisher.class">Swarrot\SwarrotBundle\Broker\Publisher</parameter>
</parameters>

<services>
<service id="swarrot.factory.pecl" class="%swarrot.factory.pecl.class%" />
<service id="swarrot.factory.amqp_lib" class="%swarrot.factory.amqp_lib.class%" />

<service id="swarrot.command.base" class="%swarrot.command.base.class%" abstract="true">
<argument />
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
@@ -20,6 +20,9 @@
"phpunit/phpunit": "~3.7",
"phpspec/prophecy": "~1.1"
},
"suggest": {
"videlalvaro/php-amqplib": "Required if using amqp_lib"
},
"config": {
"bin-dir": "bin"
},
@@ -28,7 +31,7 @@
},
"extra": {
"branch-alias": {
"dev-master": "1.1-dev"
"dev-master": "1.2-dev"
}
}
}