From a041fb5ec9fc775d1a3efb6b647604da2b02b866 Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Thu, 29 Dec 2016 14:16:03 -0600 Subject: [PATCH] Refactor database and beanstalk queues. --- src/Illuminate/Queue/CalculatesDelays.php | 13 ++ src/Illuminate/Queue/DatabaseQueue.php | 145 +++++++++--------- src/Illuminate/Queue/Jobs/BeanstalkdJob.php | 20 +-- src/Illuminate/Queue/Jobs/DatabaseJob.php | 52 ++----- .../Queue/Jobs/DatabaseJobRecord.php | 63 ++++++++ tests/Queue/QueueDatabaseQueueUnitTest.php | 4 +- 6 files changed, 168 insertions(+), 129 deletions(-) create mode 100644 src/Illuminate/Queue/Jobs/DatabaseJobRecord.php diff --git a/src/Illuminate/Queue/CalculatesDelays.php b/src/Illuminate/Queue/CalculatesDelays.php index 5d05135833f2..dc4c3146adde 100644 --- a/src/Illuminate/Queue/CalculatesDelays.php +++ b/src/Illuminate/Queue/CalculatesDelays.php @@ -20,6 +20,19 @@ protected function secondsUntil($delay) : (int) $delay; } + /** + * Get the "available at" UNIX timestamp. + * + * @param \DateTimeInterface|int $delay + * @return int + */ + protected function availableAt($delay = 0) + { + return $delay instanceof DateTimeInterface + ? $delay->getTimestamp() + : Carbon::now()->addSeconds($delay)->getTimestamp(); + } + /** * Get the current system time as a UNIX timestamp. * diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index b516b50c7dac..8792db9ea08a 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -6,6 +6,7 @@ use Carbon\Carbon; use Illuminate\Database\Connection; use Illuminate\Queue\Jobs\DatabaseJob; +use Illuminate\Queue\Jobs\DatabaseJobRecord; use Illuminate\Contracts\Queue\Queue as QueueContract; class DatabaseQueue extends Queue implements QueueContract @@ -36,7 +37,7 @@ class DatabaseQueue extends Queue implements QueueContract * * @var int|null */ - protected $expire = 60; + protected $retryAfter = 60; /** * Create a new database queue instance. @@ -44,15 +45,15 @@ class DatabaseQueue extends Queue implements QueueContract * @param \Illuminate\Database\Connection $database * @param string $table * @param string $default - * @param int $expire + * @param int $retryAfter * @return void */ - public function __construct(Connection $database, $table, $default = 'default', $expire = 60) + public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60) { $this->table = $table; - $this->expire = $expire; $this->default = $default; $this->database = $database; + $this->retryAfter = $retryAfter; } /** @@ -78,7 +79,7 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data)); + return $this->pushToDatabase($queue, $this->createPayload($job, $data)); } /** @@ -91,7 +92,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->pushToDatabase(0, $queue, $payload); + return $this->pushToDatabase($queue, $payload); } /** @@ -105,7 +106,7 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); + return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay); } /** @@ -120,46 +121,63 @@ public function bulk($jobs, $data = '', $queue = null) { $queue = $this->getQueue($queue); - $availableAt = $this->getAvailableAt(0); + $availableAt = $this->availableAt(); - $records = array_map(function ($job) use ($queue, $data, $availableAt) { - return $this->buildDatabaseRecord( - $queue, $this->createPayload($job, $data), $availableAt - ); - }, (array) $jobs); - - return $this->database->table($this->table)->insert($records); + return $this->database->table($this->table)->insert(collect((array) $jobs)->map( + function ($job) use ($queue, $data, $availableAt) { + return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt); + } + )->all()); } /** * Release a reserved job back onto the queue. * * @param string $queue - * @param \StdClass $job + * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job * @param int $delay * @return mixed */ public function release($queue, $job, $delay) { - return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts); + return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts); } /** * Push a raw payload to the database with a given delay. * - * @param \DateTime|int $delay * @param string|null $queue * @param string $payload + * @param \DateTime|int $delay * @param int $attempts * @return mixed */ - protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) + protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) { - $attributes = $this->buildDatabaseRecord( - $this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts - ); + return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( + $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts + )); + } - return $this->database->table($this->table)->insertGetId($attributes); + /** + * Create an array to insert for the given job. + * + * @param string|null $queue + * @param string $payload + * @param int $availableAt + * @param int $attempts + * @return array + */ + protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0) + { + return [ + 'queue' => $queue, + 'payload' => $payload, + 'attempts' => $attempts, + 'reserved_at' => null, + 'available_at' => $availableAt, + 'created_at' => $this->currentTime(), + ]; } /** @@ -175,13 +193,7 @@ public function pop($queue = null) $this->database->beginTransaction(); if ($job = $this->getNextAvailableJob($queue)) { - $job = $this->markJobAsReserved($job); - - $this->database->commit(); - - return new DatabaseJob( - $this->container, $this, $job, $queue - ); + return $this->marshalJob($queue, $job); } $this->database->commit(); @@ -191,7 +203,7 @@ public function pop($queue = null) * Get the next available job for the queue. * * @param string|null $queue - * @return \StdClass|null + * @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null */ protected function getNextAvailableJob($queue) { @@ -205,7 +217,7 @@ protected function getNextAvailableJob($queue) ->orderBy('id', 'asc') ->first(); - return $job ? (object) $job : null; + return $job ? new DatabaseJobRecord((object) $job) : null; } /** @@ -217,8 +229,8 @@ protected function getNextAvailableJob($queue) protected function isAvailable($query) { $query->where(function ($query) { - $query->whereNull('reserved_at'); - $query->where('available_at', '<=', $this->currentTime()); + $query->whereNull('reserved_at') + ->where('available_at', '<=', $this->currentTime()); }); } @@ -230,27 +242,42 @@ protected function isAvailable($query) */ protected function isReservedButExpired($query) { - $expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp(); + $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp(); $query->orWhere(function ($query) use ($expiration) { $query->where('reserved_at', '<=', $expiration); }); } + /** + * Marshal the reserved job into a DatabaseJob instance. + * + * @param string $queue + * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job + * @return \Illuminate\Queue\Jobs\DatabaseJob + */ + protected function marshalJob($queue, $job) + { + $job = $this->markJobAsReserved($job); + + $this->database->commit(); + + return new DatabaseJob( + $this->container, $this, $job, $queue + ); + } + /** * Mark the given job ID as reserved. * - * @param \stdClass $job - * @return \stdClass + * @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job + * @return \Illuminate\Queue\Jobs\DatabaseJobRecord */ protected function markJobAsReserved($job) { - $job->attempts = $job->attempts + 1; - $job->reserved_at = $this->currentTime(); - $this->database->table($this->table)->where('id', $job->id)->update([ - 'reserved_at' => $job->reserved_at, - 'attempts' => $job->attempts, + 'reserved_at' => $job->touch(), + 'attempts' => $job->increment(), ]); return $job; @@ -274,40 +301,6 @@ public function deleteReserved($queue, $id) $this->database->commit(); } - /** - * Get the "available at" UNIX timestamp. - * - * @param \DateTime|int $delay - * @return int - */ - protected function getAvailableAt($delay) - { - $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay); - - return $availableAt->getTimestamp(); - } - - /** - * Create an array to insert for the given job. - * - * @param string|null $queue - * @param string $payload - * @param int $availableAt - * @param int $attempts - * @return array - */ - protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0) - { - return [ - 'queue' => $queue, - 'attempts' => $attempts, - 'reserved_at' => null, - 'available_at' => $availableAt, - 'created_at' => $this->currentTime(), - 'payload' => $payload, - ]; - } - /** * Get the queue or return the default. * diff --git a/src/Illuminate/Queue/Jobs/BeanstalkdJob.php b/src/Illuminate/Queue/Jobs/BeanstalkdJob.php index 198f2fb4ec84..7e65035085f0 100755 --- a/src/Illuminate/Queue/Jobs/BeanstalkdJob.php +++ b/src/Illuminate/Queue/Jobs/BeanstalkdJob.php @@ -40,16 +40,6 @@ public function __construct(Container $container, Pheanstalk $pheanstalk, Pheans $this->pheanstalk = $pheanstalk; } - /** - * Get the raw body string for the job. - * - * @return string - */ - public function getRawBody() - { - return $this->job->getData(); - } - /** * Release the job back into the queue. * @@ -111,6 +101,16 @@ public function getJobId() return $this->job->getId(); } + /** + * Get the raw body string for the job. + * + * @return string + */ + public function getRawBody() + { + return $this->job->getData(); + } + /** * Get the underlying Pheanstalk instance. * diff --git a/src/Illuminate/Queue/Jobs/DatabaseJob.php b/src/Illuminate/Queue/Jobs/DatabaseJob.php index 836355802874..69fd7cc01f35 100644 --- a/src/Illuminate/Queue/Jobs/DatabaseJob.php +++ b/src/Illuminate/Queue/Jobs/DatabaseJob.php @@ -40,30 +40,30 @@ public function __construct(Container $container, DatabaseQueue $database, $job, } /** - * Delete the job from the queue. + * Release the job back into the queue. * + * @param int $delay * @return void */ - public function delete() + public function release($delay = 0) { - parent::delete(); + parent::release($delay); - $this->database->deleteReserved($this->queue, $this->job->id); + $this->delete(); + + $this->database->release($this->queue, $this->job, $delay); } /** - * Release the job back into the queue. + * Delete the job from the queue. * - * @param int $delay * @return void */ - public function release($delay = 0) + public function delete() { - parent::release($delay); - - $this->delete(); + parent::delete(); - $this->database->release($this->queue, $this->job, $delay); + $this->database->deleteReserved($this->queue, $this->job->id); } /** @@ -95,34 +95,4 @@ public function getRawBody() { return $this->job->payload; } - - /** - * Get the IoC container instance. - * - * @return \Illuminate\Container\Container - */ - public function getContainer() - { - return $this->container; - } - - /** - * Get the underlying queue driver instance. - * - * @return \Illuminate\Queue\DatabaseQueue - */ - public function getDatabaseQueue() - { - return $this->database; - } - - /** - * Get the underlying database job. - * - * @return \StdClass - */ - public function getDatabaseJob() - { - return $this->job; - } } diff --git a/src/Illuminate/Queue/Jobs/DatabaseJobRecord.php b/src/Illuminate/Queue/Jobs/DatabaseJobRecord.php new file mode 100644 index 000000000000..b33a87159cfa --- /dev/null +++ b/src/Illuminate/Queue/Jobs/DatabaseJobRecord.php @@ -0,0 +1,63 @@ +record = $record; + } + + /** + * Increment the number of times the job has been attempted. + * + * @return int + */ + public function increment() + { + $this->record->attempts++; + + return $this->record->attempts; + } + + /** + * Update the "reserved at" timestamp of the job. + * + * @return int + */ + public function touch() + { + $this->record->reserved_at = $this->currentTime(); + + return $this->record->reserved_at; + } + + /** + * Dynamically access the underlying job information. + * + * @param string $key + * @return mixed + */ + public function __get($key) + { + return $this->record->{$key}; + } +} diff --git a/tests/Queue/QueueDatabaseQueueUnitTest.php b/tests/Queue/QueueDatabaseQueueUnitTest.php index 1846dec55208..c720291c718f 100644 --- a/tests/Queue/QueueDatabaseQueueUnitTest.php +++ b/tests/Queue/QueueDatabaseQueueUnitTest.php @@ -79,9 +79,9 @@ public function testFailureToCreatePayloadFromArray() public function testBulkBatchPushesOntoDatabase() { $database = m::mock('Illuminate\Database\Connection'); - $queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['currentTime', 'getAvailableAt'])->setConstructorArgs([$database, 'table', 'default'])->getMock(); + $queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['currentTime', 'availableAt'])->setConstructorArgs([$database, 'table', 'default'])->getMock(); $queue->expects($this->any())->method('currentTime')->will($this->returnValue('created')); - $queue->expects($this->any())->method('getAvailableAt')->will($this->returnValue('available')); + $queue->expects($this->any())->method('availableAt')->will($this->returnValue('available')); $database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass')); $query->shouldReceive('insert')->once()->andReturnUsing(function ($records) { $this->assertEquals([[