Skip to content

Commit

Permalink
Merge pull request #26 from fhteam/maksimru-fix/laravel-5.7
Browse files Browse the repository at this point in the history
Maksimru fix/laravel 5.7
  • Loading branch information
god1dog authored Apr 22, 2019
2 parents 0ea949e + ccc620d commit 0cecf28
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 85 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,54 @@ In your ```config/queue.php``` file you have to provide the following:
'exchange_name' => null,
'exchange_type' => null,
'exchange_flags' => null,
'keepalive' > false,
'heartbeat' => 0,
'retry_after' => 0,
),
),
```

In your ```config/app.php``` add ```'Forumhouse\LaravelAmqp\ServiceProvider\LaravelAmqpServiceProvider'``` to the list of service
providers registered.

Improved worker stability (PHP 7.1+ is required)
------------

For better stability please add following code in app/Exceptions/Handler.php:

```php
class Handler extends ExceptionHandler
{
```

to

```php
class Handler extends ExceptionHandler
{
use AMQPFailureDetector;
```

And

```php
public function report(Exception $exception)
{
parent::report($exception);
}
```

to

```php
public function report(Exception $exception)
{
$this->catchAMQPConnectionFailure($exception);
parent::report($exception);
}
```


Usage
------------

Expand Down
13 changes: 8 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
"minimum-stability": "stable",
"license": "GPL-2.0",
"require": {
"php": ">=5.4.0",
"php-amqplib/php-amqplib": "2.6.*"
"php": ">=7.1",
"php-amqplib/php-amqplib": "2.6.*",
"ext-json": "*"
},
"require-dev": {
"laravel/framework": ">=5.3.0",
"laravel/framework": ">=5.7.0",
"squizlabs/php_codesniffer": "1.*",
"orchestra/testbench": "~3.4",
"phpunit/phpunit": "~6.1"
"orchestra/testbench": "3.8.*"
},
"autoload": {
"psr-4": {
"Forumhouse\\LaravelAmqp\\": "src/",
"Forumhouse\\LaravelAmqp\\Tests\\": "tests/"
}
},
"suggest": {
"ext-posix": "Restart workers if connection is lost to avoid unlimited loop"
}
}
4 changes: 2 additions & 2 deletions src/Connectors/AmqpConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public function connect(array $config)
}

return new AMQPQueue(
$connection, $config['queue'], $config['queue_flags'], $config['declare_queues'],
$connection, $config['queue'], $config['queue_flags'], isset($config['declare_queues']) ? $config['declare_queues'] : true,
$config['message_properties'], $config['channel_id'],
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags']
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags'], (isset($config['retry_after']) ? $config['retry_after'] : 0)
);
}
}
12 changes: 10 additions & 2 deletions src/Jobs/AMQPJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public function release($delay = 0)
$body['attempts'] = $this->attempts() + 1;
$job = $body['job'];

//if retry_after option is set use it on failure instead of traditional delay
if(isset($body['data']['retryAfter']) && $body['data']['retryAfter'] > 0)
$delay = $body['data']['retryAfter'];

/** @var QueueContract $queue */
$queue = $this->container['queue']->connection();
if ($delay > 0) {
Expand Down Expand Up @@ -119,10 +123,14 @@ public function getQueue()
* Get the job identifier.
*
* @return string
* @throws \OutOfBoundsException
*/
public function getJobId()
{
return $this->amqpMessage->get('message_id');
try {
return $this->amqpMessage->get('message_id');
} catch (\OutOfBoundsException $exception){
return null;
}
}

}
127 changes: 60 additions & 67 deletions src/Queue/AMQPQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Illuminate\Queue\InvalidPayloadException;
use Illuminate\Queue\Queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
Expand All @@ -30,7 +29,7 @@ class AMQPQueue extends Queue implements QueueContract
const EXCHANGE_TYPE_FANOUT = 'fanout';

/**
* @var AMQPConnection Connection to amqp compatible server
* @var AMQPStreamConnection Connection to amqp compatible server
*/
protected $connection;

Expand Down Expand Up @@ -69,6 +68,11 @@ class AMQPQueue extends Queue implements QueueContract
*/
private $declareQueues;

/**
* @var int
*/
private $retryAfter;

/**
* @param AMQPStreamConnection $connection
* @param string $defaultQueueName Default queue name
Expand All @@ -83,6 +87,7 @@ class AMQPQueue extends Queue implements QueueContract
* @param string $exchangeName Exchange name
* @param mixed $exchangeType Exchange type
* @param mixed $exchangeFlags Exchange flags
* @param mixed $retryAfter Optional timeout for failed jobs
*/
public function __construct(
AMQPStreamConnection $connection,
Expand All @@ -93,7 +98,8 @@ public function __construct(
$defaultChannelId = null,
$exchangeName = '',
$exchangeType = null,
$exchangeFlags = []
$exchangeFlags = [],
$retryAfter = 0
) {
$this->connection = $connection;
$this->defaultQueueName = $defaultQueueName ?: 'default';
Expand All @@ -103,6 +109,7 @@ public function __construct(
$this->defaultChannelId = $defaultChannelId;
$this->exchangeName = $exchangeName;
$this->channel = $connection->channel($this->defaultChannelId);
$this->retryAfter = $retryAfter;

if ($exchangeName !== null) {
$this->declareExchange($exchangeName, $exchangeType, $exchangeFlags);
Expand Down Expand Up @@ -135,22 +142,29 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
}

/**
* @return array
*/
public function getCustomMessageOptions(){
return ['retryAfter' => $this->retryAfter];
}

/**
* Push a new job onto the queue.
*
* @param string $job Job implementation class name
* @param mixed $data Job custom data. Usually array
* @param string $queue Queue name, if different from the default one
*
* @throws InvalidPayloadException
* @throws \Illuminate\Queue\InvalidPayloadException
* @throws AMQPException
* @return bool Always true
*/
public function push($job, $data = '', $queue = null)
{
$queue = $this->prepareQueue($queue);
$amqpMessage = $this->prepareMessage($job, $data);
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
$payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
$this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));

return true;
}
Expand All @@ -163,14 +177,14 @@ public function push($job, $data = '', $queue = null)
* @param string $queue Queue name, if different from the default one
*
* @return bool
* @throws InvalidPayloadException
* @throws \Illuminate\Queue\InvalidPayloadException
* @throws AMQPException
*/
public function addMessageToBatch($job, $data = '', $queue = null)
{
$queue = $this->prepareQueue($queue);
$amqpMessage = $this->prepareMessage($job, $data);
$this->channel->batch_basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
$payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
$this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));

return true;
}
Expand Down Expand Up @@ -270,9 +284,6 @@ protected function getRoutingKey($queue)
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
// NB: DYNAMIC PRIORITY IS NOT IMPLEMENTED FOR RAW MESSAGES,
// NB: NEED TO SET THE $this->messageProperties['priority'] FIELD

$queue = $this->prepareQueue($queue);
$amqpPayload = new AMQPMessage($payload, $this->messageProperties);
$this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
Expand All @@ -288,7 +299,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
* @param string $queue Queue name, if different from the default one
*
* @return bool Always true
* @throws InvalidPayloadException
* @throws \Illuminate\Queue\InvalidPayloadException
* @throws AMQPException
*/
public function later($delay, $job, $data = '', $queue = null)
Expand All @@ -300,8 +311,8 @@ public function later($delay, $job, $data = '', $queue = null)
$queue = $this->prepareQueue($queue);
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);

$amqpMessage = $this->prepareMessage($job, $data);
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $delayedQueueName);
$payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
$this->channel->basic_publish($payload, $this->exchangeName, $delayedQueueName);
return true;
}

Expand Down Expand Up @@ -334,7 +345,7 @@ public function declareDelayedQueue($destinationQueueName, $delay)
'arguments' => new AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $destinationQueueName,
'x-message-ttl' => $delay * 1000,
'x-message-ttl' => intval($delay * 1000),
]),
]);

Expand Down Expand Up @@ -414,73 +425,55 @@ private function prepareQueue($queue)
return $queue;
}



// FNX - ADD DYNAMIC PRIORITY TO MESSAGE

/**
* @param string $job
* @param mixed $data
* Create a payload string from the given job and data.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return string
*
* @return AMQPMessage
* @throws InvalidPayloadException
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function prepareMessage($job, $data)
protected function createPayload($job, $queue, $data = '')
{
$payloadJson = $this->createPayload($job, $data);
$arrPayload = json_decode($payloadJson, true);

// OVERRIDE PRIORITY
// NB: IMPLEMENT QUEUE DEFAULT PRIORITY USING THE FIELD `message_properties['priority']` IN THE CONFIG
$props = $this->messageProperties;
if (!empty($arrPayload['priority']) && $arrPayload['priority'] > 0)
$props['priority'] = $arrPayload['priority'];

return new AMQPMessage($payloadJson, $props);
$data = is_array($data) ? array_merge($data, $this->getCustomMessageOptions()) : $this->getCustomMessageOptions();
$payload = json_encode($this->createPayloadArray($job, $queue, $data));
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
'Unable to JSON encode payload. Error code: '.json_last_error()
);
}

return $payload;
}

// FNX - OVERRIDE PAYLOAD GENERATION
/**
* Create a payload for an object-based queue handler.
*
* @param mixed $job
* @param string $queue
* @return array
*/
protected function createObjectPayload($job)
protected function createObjectPayload($job, $queue)
{
return [
'data' => [
'command' => serialize(clone $job),
'commandName' => get_class($job),
],
$payload = $this->withCreatePayloadHooks($queue, [
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => empty($job->tries) ? null : $job->tries,
'timeout' => empty($job->timeout) ? null : $job->timeout,
'maxTries' => $job->tries ?? null,
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'priority' => empty($job->priority) ? null : $job->priority,
];
}
/**
* Create a typical, string based queue payload array.
*
* @param string $job
* @param mixed $payload
*
* @return array
*/
protected function createStringPayload($job, $payload)
{
return [
'displayName' => is_string($job) ? explode('@', $job)[0] : null,
'job' => $job,
'maxTries' => $payload['maxTries'],
'timeout' => $payload['timeout'],
'data' => $payload['data'],
// BUG-FIX FOR ATTEMPTS HERE (OTHERWISE THIS FUNCTION IS THE SAME AS THE BASE IMPLEMENTATION):
'attempts' => $payload['attempts'],
'priority' => empty($payload['priority']) ? null : $payload['priority'],
];
'data' => [
'commandName' => $job,
'command' => $job,
],
]);

return array_merge($payload, [
'data' => array_merge([
'commandName' => get_class($job),
'command' => serialize(clone $job),
],$this->getCustomMessageOptions()),
]);
}
}
Loading

0 comments on commit 0cecf28

Please sign in to comment.