diff --git a/README.md b/README.md index 045b14c..695ea18 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,9 @@ In your ```config/queue.php``` file you have to provide the following: 'exchange_name' => null, 'exchange_type' => null, 'exchange_flags' => null, + 'keepalive' > false, + 'heartbeat' => 0, + 'retry_after' => 0, ), ), ``` @@ -48,6 +51,44 @@ In your ```config/queue.php``` file you have to provide the following: In your ```config/app.php``` add ```'Forumhouse\LaravelAmqp\ServiceProvider\LaravelAmqpServiceProvider'``` to the list of service providers registered. +Improved worker stability (PHP 7.1+ is required) +------------ + +For better stability please add following code in app/Exceptions/Handler.php: + +```php +class Handler extends ExceptionHandler +{ +``` + +to + +```php +class Handler extends ExceptionHandler +{ + use AMQPFailureDetector; +``` + +And + +```php +public function report(Exception $exception) +{ + parent::report($exception); +} +``` + +to + +```php +public function report(Exception $exception) +{ + $this->catchAMQPConnectionFailure($exception); + parent::report($exception); +} +``` + + Usage ------------ diff --git a/composer.json b/composer.json index cfe0a25..f9f2f28 100644 --- a/composer.json +++ b/composer.json @@ -4,19 +4,22 @@ "minimum-stability": "stable", "license": "GPL-2.0", "require": { - "php": ">=5.4.0", - "php-amqplib/php-amqplib": "2.6.*" + "php": ">=7.1", + "php-amqplib/php-amqplib": "2.6.*", + "ext-json": "*" }, "require-dev": { - "laravel/framework": ">=5.3.0", + "laravel/framework": ">=5.7.0", "squizlabs/php_codesniffer": "1.*", - "orchestra/testbench": "~3.4", - "phpunit/phpunit": "~6.1" + "orchestra/testbench": "3.8.*" }, "autoload": { "psr-4": { "Forumhouse\\LaravelAmqp\\": "src/", "Forumhouse\\LaravelAmqp\\Tests\\": "tests/" } + }, + "suggest": { + "ext-posix": "Restart workers if connection is lost to avoid unlimited loop" } } diff --git a/src/Connectors/AmqpConnector.php b/src/Connectors/AmqpConnector.php index c7568a6..f4d6e39 100644 --- a/src/Connectors/AmqpConnector.php +++ b/src/Connectors/AmqpConnector.php @@ -48,9 +48,9 @@ public function connect(array $config) } return new AMQPQueue( - $connection, $config['queue'], $config['queue_flags'], $config['declare_queues'], + $connection, $config['queue'], $config['queue_flags'], isset($config['declare_queues']) ? $config['declare_queues'] : true, $config['message_properties'], $config['channel_id'], - $config['exchange_name'], $config['exchange_type'], $config['exchange_flags'] + $config['exchange_name'], $config['exchange_type'], $config['exchange_flags'], (isset($config['retry_after']) ? $config['retry_after'] : 0) ); } } diff --git a/src/Jobs/AMQPJob.php b/src/Jobs/AMQPJob.php index ec9c31c..55f0b36 100644 --- a/src/Jobs/AMQPJob.php +++ b/src/Jobs/AMQPJob.php @@ -70,6 +70,10 @@ public function release($delay = 0) $body['attempts'] = $this->attempts() + 1; $job = $body['job']; + //if retry_after option is set use it on failure instead of traditional delay + if(isset($body['data']['retryAfter']) && $body['data']['retryAfter'] > 0) + $delay = $body['data']['retryAfter']; + /** @var QueueContract $queue */ $queue = $this->container['queue']->connection(); if ($delay > 0) { @@ -119,10 +123,14 @@ public function getQueue() * Get the job identifier. * * @return string - * @throws \OutOfBoundsException */ public function getJobId() { - return $this->amqpMessage->get('message_id'); + try { + return $this->amqpMessage->get('message_id'); + } catch (\OutOfBoundsException $exception){ + return null; + } } + } diff --git a/src/Queue/AMQPQueue.php b/src/Queue/AMQPQueue.php index b90ce95..72e2440 100644 --- a/src/Queue/AMQPQueue.php +++ b/src/Queue/AMQPQueue.php @@ -9,7 +9,6 @@ use Illuminate\Queue\InvalidPayloadException; use Illuminate\Queue\Queue; use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -30,7 +29,7 @@ class AMQPQueue extends Queue implements QueueContract const EXCHANGE_TYPE_FANOUT = 'fanout'; /** - * @var AMQPConnection Connection to amqp compatible server + * @var AMQPStreamConnection Connection to amqp compatible server */ protected $connection; @@ -69,6 +68,11 @@ class AMQPQueue extends Queue implements QueueContract */ private $declareQueues; + /** + * @var int + */ + private $retryAfter; + /** * @param AMQPStreamConnection $connection * @param string $defaultQueueName Default queue name @@ -83,6 +87,7 @@ class AMQPQueue extends Queue implements QueueContract * @param string $exchangeName Exchange name * @param mixed $exchangeType Exchange type * @param mixed $exchangeFlags Exchange flags + * @param mixed $retryAfter Optional timeout for failed jobs */ public function __construct( AMQPStreamConnection $connection, @@ -93,7 +98,8 @@ public function __construct( $defaultChannelId = null, $exchangeName = '', $exchangeType = null, - $exchangeFlags = [] + $exchangeFlags = [], + $retryAfter = 0 ) { $this->connection = $connection; $this->defaultQueueName = $defaultQueueName ?: 'default'; @@ -103,6 +109,7 @@ public function __construct( $this->defaultChannelId = $defaultChannelId; $this->exchangeName = $exchangeName; $this->channel = $connection->channel($this->defaultChannelId); + $this->retryAfter = $retryAfter; if ($exchangeName !== null) { $this->declareExchange($exchangeName, $exchangeType, $exchangeFlags); @@ -135,6 +142,13 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange call_user_func_array([$this->channel, 'exchange_declare'], $flags); } + /** + * @return array + */ + public function getCustomMessageOptions(){ + return ['retryAfter' => $this->retryAfter]; + } + /** * Push a new job onto the queue. * @@ -142,15 +156,15 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange * @param mixed $data Job custom data. Usually array * @param string $queue Queue name, if different from the default one * - * @throws InvalidPayloadException + * @throws \Illuminate\Queue\InvalidPayloadException * @throws AMQPException * @return bool Always true */ public function push($job, $data = '', $queue = null) { $queue = $this->prepareQueue($queue); - $amqpMessage = $this->prepareMessage($job, $data); - $this->channel->basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue)); + $payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties); + $this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue)); return true; } @@ -163,14 +177,14 @@ public function push($job, $data = '', $queue = null) * @param string $queue Queue name, if different from the default one * * @return bool - * @throws InvalidPayloadException + * @throws \Illuminate\Queue\InvalidPayloadException * @throws AMQPException */ public function addMessageToBatch($job, $data = '', $queue = null) { $queue = $this->prepareQueue($queue); - $amqpMessage = $this->prepareMessage($job, $data); - $this->channel->batch_basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue)); + $payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties); + $this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue)); return true; } @@ -270,9 +284,6 @@ protected function getRoutingKey($queue) */ public function pushRaw($payload, $queue = null, array $options = []) { - // NB: DYNAMIC PRIORITY IS NOT IMPLEMENTED FOR RAW MESSAGES, - // NB: NEED TO SET THE $this->messageProperties['priority'] FIELD - $queue = $this->prepareQueue($queue); $amqpPayload = new AMQPMessage($payload, $this->messageProperties); $this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue); @@ -288,7 +299,7 @@ public function pushRaw($payload, $queue = null, array $options = []) * @param string $queue Queue name, if different from the default one * * @return bool Always true - * @throws InvalidPayloadException + * @throws \Illuminate\Queue\InvalidPayloadException * @throws AMQPException */ public function later($delay, $job, $data = '', $queue = null) @@ -300,8 +311,8 @@ public function later($delay, $job, $data = '', $queue = null) $queue = $this->prepareQueue($queue); $delayedQueueName = $this->declareDelayedQueue($queue, $delay); - $amqpMessage = $this->prepareMessage($job, $data); - $this->channel->basic_publish($amqpMessage, $this->exchangeName, $delayedQueueName); + $payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties); + $this->channel->basic_publish($payload, $this->exchangeName, $delayedQueueName); return true; } @@ -334,7 +345,7 @@ public function declareDelayedQueue($destinationQueueName, $delay) 'arguments' => new AMQPTable([ 'x-dead-letter-exchange' => '', 'x-dead-letter-routing-key' => $destinationQueueName, - 'x-message-ttl' => $delay * 1000, + 'x-message-ttl' => intval($delay * 1000), ]), ]); @@ -414,73 +425,55 @@ private function prepareQueue($queue) return $queue; } - - - // FNX - ADD DYNAMIC PRIORITY TO MESSAGE - /** - * @param string $job - * @param mixed $data + * Create a payload string from the given job and data. + * + * @param string $job + * @param string $queue + * @param mixed $data + * @return string * - * @return AMQPMessage - * @throws InvalidPayloadException + * @throws \Illuminate\Queue\InvalidPayloadException */ - protected function prepareMessage($job, $data) + protected function createPayload($job, $queue, $data = '') { - $payloadJson = $this->createPayload($job, $data); - $arrPayload = json_decode($payloadJson, true); - - // OVERRIDE PRIORITY - // NB: IMPLEMENT QUEUE DEFAULT PRIORITY USING THE FIELD `message_properties['priority']` IN THE CONFIG - $props = $this->messageProperties; - if (!empty($arrPayload['priority']) && $arrPayload['priority'] > 0) - $props['priority'] = $arrPayload['priority']; - - return new AMQPMessage($payloadJson, $props); + $data = is_array($data) ? array_merge($data, $this->getCustomMessageOptions()) : $this->getCustomMessageOptions(); + $payload = json_encode($this->createPayloadArray($job, $queue, $data)); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new InvalidPayloadException( + 'Unable to JSON encode payload. Error code: '.json_last_error() + ); + } + return $payload; } - // FNX - OVERRIDE PAYLOAD GENERATION /** * Create a payload for an object-based queue handler. * * @param mixed $job + * @param string $queue * @return array */ - protected function createObjectPayload($job) + protected function createObjectPayload($job, $queue) { - return [ - 'data' => [ - 'command' => serialize(clone $job), - 'commandName' => get_class($job), - ], + $payload = $this->withCreatePayloadHooks($queue, [ 'displayName' => $this->getDisplayName($job), 'job' => 'Illuminate\Queue\CallQueuedHandler@call', - 'maxTries' => empty($job->tries) ? null : $job->tries, - 'timeout' => empty($job->timeout) ? null : $job->timeout, + 'maxTries' => $job->tries ?? null, + 'timeout' => $job->timeout ?? null, 'timeoutAt' => $this->getJobExpiration($job), - 'priority' => empty($job->priority) ? null : $job->priority, - ]; - } - /** - * Create a typical, string based queue payload array. - * - * @param string $job - * @param mixed $payload - * - * @return array - */ - protected function createStringPayload($job, $payload) - { - return [ - 'displayName' => is_string($job) ? explode('@', $job)[0] : null, - 'job' => $job, - 'maxTries' => $payload['maxTries'], - 'timeout' => $payload['timeout'], - 'data' => $payload['data'], - // BUG-FIX FOR ATTEMPTS HERE (OTHERWISE THIS FUNCTION IS THE SAME AS THE BASE IMPLEMENTATION): - 'attempts' => $payload['attempts'], - 'priority' => empty($payload['priority']) ? null : $payload['priority'], - ]; + 'data' => [ + 'commandName' => $job, + 'command' => $job, + ], + ]); + + return array_merge($payload, [ + 'data' => array_merge([ + 'commandName' => get_class($job), + 'command' => serialize(clone $job), + ],$this->getCustomMessageOptions()), + ]); } } diff --git a/src/Traits/AMQPFailureDetector.php b/src/Traits/AMQPFailureDetector.php new file mode 100644 index 0000000..56c783f --- /dev/null +++ b/src/Traits/AMQPFailureDetector.php @@ -0,0 +1,38 @@ +runningInConsole() && $this->causedByLostConnection($exception) && extension_loaded('posix')){ + posix_kill(getmypid(), SIGTERM); + } + } + + protected function causedByLostConnection(Exception $e) + { + $message = $e->getMessage(); + + return Str::contains($message, [ + 'server has gone away', + 'no connection to the server', + 'Lost connection', + 'is dead or not enabled', + 'Error while sending', + 'decryption failed or bad record mac', + 'server closed the connection unexpectedly', + 'SSL connection has been closed unexpectedly', + 'Error writing data to the connection', + 'Resource deadlock avoided', + 'Transaction() on null', + 'child connection forced to terminate due to client_idle_limit', + 'Channel connection is closed', + "Broken pipe", + ]); + } +} \ No newline at end of file diff --git a/tests/BaseFeaturesTest.php b/tests/BaseFeaturesTest.php index 678620c..3ee65f0 100644 --- a/tests/BaseFeaturesTest.php +++ b/tests/BaseFeaturesTest.php @@ -18,15 +18,6 @@ class BaseFeaturesTest extends LaravelAmqpTestBase */ const TEST_JOB_CLASS = 'Forumhouse\LaravelAmqp\Tests\TestJobHandler'; - /** - * - */ - public function setUp() - { - parent::setUp(); - } - - /** * Test for a job, ending with a delete() call (deleted from queue) */