From 5d45370dbdefbc54a234bb6dd51b36de1caa3f27 Mon Sep 17 00:00:00 2001 From: Seb Date: Tue, 19 Sep 2023 17:08:33 -0700 Subject: [PATCH 01/15] Allow placing a batch on a chain --- src/Illuminate/Bus/Dispatcher.php | 13 +- tests/Integration/Queue/JobChainingTest.php | 131 ++++++++++++++++++++ 2 files changed, 139 insertions(+), 5 deletions(-) diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index 8ed3a21b7c4f..c131bdf71aad 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -54,8 +54,6 @@ class Dispatcher implements QueueingDispatcher /** * Create a new command dispatcher instance. * - * @param \Illuminate\Contracts\Container\Container $container - * @param \Closure|null $queueResolver * @return void */ public function __construct(Container $container, Closure $queueResolver = null) @@ -135,7 +133,6 @@ public function dispatchNow($command, $handler = null) /** * Attempt to find the batch with the given ID. * - * @param string $batchId * @return \Illuminate\Bus\Batch|null */ public function findBatch(string $batchId) @@ -164,6 +161,14 @@ public function chain($jobs) { $jobs = Collection::wrap($jobs); + $jobs = $jobs->map(function ($job) { + if ($job instanceof PendingBatch) { + return new ChainedBatch($job); + } + + return $job; + }); + return new PendingChain($jobs->shift(), $jobs->toArray()); } @@ -270,7 +275,6 @@ public function dispatchAfterResponse($command, $handler = null) /** * Set the pipes through which commands should be piped before dispatching. * - * @param array $pipes * @return $this */ public function pipeThrough(array $pipes) @@ -283,7 +287,6 @@ public function pipeThrough(array $pipes) /** * Map a command to a handler. * - * @param array $map * @return $this */ public function map(array $map) diff --git a/tests/Integration/Queue/JobChainingTest.php b/tests/Integration/Queue/JobChainingTest.php index b1d7b955fcc6..4b5c8ddbcbda 100644 --- a/tests/Integration/Queue/JobChainingTest.php +++ b/tests/Integration/Queue/JobChainingTest.php @@ -2,13 +2,16 @@ namespace Illuminate\Tests\Integration\Queue; +use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Database\Schema\Blueprint; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Bus; use Illuminate\Support\Facades\Queue; +use Illuminate\Support\Facades\Schema; use Orchestra\Testbench\TestCase; class JobChainingTest extends TestCase @@ -26,6 +29,26 @@ protected function getEnvironmentSetUp($app) ]); } + protected function setUp(): void + { + parent::setUp(); + + Schema::create('job_batches', function (Blueprint $table) { + $table->string('id')->primary(); + $table->string('name'); + $table->integer('total_jobs'); + $table->integer('pending_jobs'); + $table->integer('failed_jobs'); + $table->longText('failed_job_ids'); + $table->mediumText('options')->nullable(); + $table->integer('cancelled_at')->nullable(); + $table->integer('created_at'); + $table->integer('finished_at')->nullable(); + }); + + JobRunRecorder::reset(); + } + protected function tearDown(): void { JobChainingTestFirstJob::$ran = false; @@ -244,6 +267,37 @@ public function testChainJobsCanBeAppendedWithoutExistingChain() $this->assertNotNull(JobChainAddingAddedJob::$ranAt); } + + public function testBatchCanBeAddedToChain() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2'), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + public function testBatchCatchCallbacks() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestFailingBatchedJob('fb1'), + ])->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); + + $this->assertEquals(['c1', 'c2'], JobRunRecorder::$results); + $this->assertEquals(['batch failed', 'chain failed'], JobRunRecorder::$failures); + } } class JobChainingTestFirstJob implements ShouldQueue @@ -251,7 +305,9 @@ class JobChainingTestFirstJob implements ShouldQueue use Dispatchable, Queueable; public static $ran = false; + public static $usedQueue = null; + public static $usedConnection = null; public function handle() @@ -267,7 +323,9 @@ class JobChainingTestSecondJob implements ShouldQueue use Dispatchable, Queueable; public static $ran = false; + public static $usedQueue = null; + public static $usedConnection = null; public function handle() @@ -283,7 +341,9 @@ class JobChainingTestThirdJob implements ShouldQueue use Dispatchable, Queueable; public static $ran = false; + public static $usedQueue = null; + public static $usedConnection = null; public function handle() @@ -382,3 +442,74 @@ public function handle() throw new \Exception(); } } + +class JobChainingNamedTestJob implements ShouldQueue +{ + use Dispatchable, InteractsWithQueue, Queueable; + + public static $results = []; + + public string $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function handle() + { + JobRunRecorder::record($this->id); + } +} + +class JobChainingTestBatchedJob implements ShouldQueue +{ + use Batchable, Dispatchable, InteractsWithQueue, Queueable; + + public string $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function handle() + { + JobRunRecorder::record($this->id); + } +} + +class JobChainingTestFailingBatchedJob implements ShouldQueue +{ + use Batchable, Dispatchable, InteractsWithQueue, Queueable; + + public function handle() + { + $this->fail(); + } +} + +class JobRunRecorder +{ + public static $results = []; + + public static $failures = []; + + public static function record(string $id) + { + self::$results[] = $id; + } + + public static function recordFailure(string $message) + { + self::$failures[] = $message; + + return $message; + } + + public static function reset() + { + self::$results = []; + self::$failures = []; + } +} From 56f1fb81f2003d72af575052b8c4e9ff754cb121 Mon Sep 17 00:00:00 2001 From: Seb Date: Wed, 20 Sep 2023 11:44:39 -0700 Subject: [PATCH 02/15] missing file --- src/Illuminate/Bus/ChainedBatch.php | 69 +++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 src/Illuminate/Bus/ChainedBatch.php diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php new file mode 100644 index 000000000000..2542c6f5e09e --- /dev/null +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -0,0 +1,69 @@ +jobs = $batch->jobs; + $this->options = $batch->options; + $this->name = $batch->name; + } + + public function handle(Container $container) + { + $batch = new PendingBatch($container, $this->jobs); + $batch->name = $this->name; + $batch->options = $this->options; + + $this->hijackChain($batch); + + if ($this->queue) { + $batch->onQueue($this->queue); + } + + if ($this->connection) { + $batch->onConnection($this->connection); + } + + foreach ($this->chainCatchCallbacks as $cb) { + $batch->catch($cb); + } + + $batch->dispatch(); + } + + protected function hijackChain(PendingBatch $batch) + { + if (property_exists($this, 'chained') && ! empty($this->chained)) { + $next = unserialize(array_shift($this->chained)); + $next->chained = $this->chained; + + $next->onConnection($next->connection ?: $this->chainConnection); + $next->onQueue($next->queue ?: $this->chainQueue); + + $next->chainConnection = $this->chainConnection; + $next->chainQueue = $this->chainQueue; + $next->chainCatchCallbacks = $this->chainCatchCallbacks; + + $batch->then(fn () => dispatch($next)); + + $this->chained = []; + } + } +} From cca9b284c342d9ce40c114cc757b190804eb1e93 Mon Sep 17 00:00:00 2001 From: Seb Date: Thu, 21 Sep 2023 13:46:21 -0700 Subject: [PATCH 03/15] Chains of batches of chains of batches --- src/Illuminate/Bus/ChainedBatch.php | 22 +++++- src/Illuminate/Bus/Dispatcher.php | 9 +-- tests/Integration/Queue/JobChainingTest.php | 78 ++++++++++++++++++++- 3 files changed, 97 insertions(+), 12 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 2542c6f5e09e..4e09aece18af 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -20,7 +20,7 @@ class ChainedBatch implements ShouldQueue public function __construct(PendingBatch $batch) { - $this->jobs = $batch->jobs; + $this->jobs = static::prepareNestedBatches($batch->jobs); $this->options = $batch->options; $this->name = $batch->name; } @@ -41,7 +41,7 @@ public function handle(Container $container) $batch->onConnection($this->connection); } - foreach ($this->chainCatchCallbacks as $cb) { + foreach ($this->chainCatchCallbacks ?? [] as $cb) { $batch->catch($cb); } @@ -66,4 +66,22 @@ protected function hijackChain(PendingBatch $batch) $this->chained = []; } } + + public static function prepareNestedBatches(Collection $jobs): Collection + { + foreach ($jobs as $k => $job) { + if (is_array($job)) { + $jobs[$k] = static::prepareNestedBatches(collect($job))->all(); + } + if ($job instanceof Collection) { + $jobs[$k] = static::prepareNestedBatches($job); + } + + if ($job instanceof PendingBatch) { + $jobs[$k] = new ChainedBatch($job); + } + } + + return $jobs; + } } diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index c131bdf71aad..7d86eec79882 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -160,14 +160,7 @@ public function batch($jobs) public function chain($jobs) { $jobs = Collection::wrap($jobs); - - $jobs = $jobs->map(function ($job) { - if ($job instanceof PendingBatch) { - return new ChainedBatch($job); - } - - return $job; - }); + $jobs = ChainedBatch::prepareNestedBatches($jobs); return new PendingChain($jobs->shift(), $jobs->toArray()); } diff --git a/tests/Integration/Queue/JobChainingTest.php b/tests/Integration/Queue/JobChainingTest.php index 4b5c8ddbcbda..160fcc754d29 100644 --- a/tests/Integration/Queue/JobChainingTest.php +++ b/tests/Integration/Queue/JobChainingTest.php @@ -285,6 +285,74 @@ public function testBatchCanBeAddedToChain() $this->assertEquals(['c1', 'c2', 'b1', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); } + public function testDynamicBatchCanBeAddedToChain() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2', times: 4), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + public function testChainBatchChain() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + [ + new JobChainingNamedTestJob('bc1'), + new JobChainingNamedTestJob('bc2'), + ], + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2', times: 4), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'bc1', 'bc2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + /** + * @group debug + * + * @return void + */ + public function testChainBatchChainBatch() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + [ + new JobChainingNamedTestJob('bc1'), + new JobChainingNamedTestJob('bc2'), + Bus::batch([ + new JobChainingTestBatchedJob('bb1'), + new JobChainingTestBatchedJob('bb2'), + ]), + ], + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2', times: 4), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'bc1', 'bc2', 'bb1', 'bb2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + public function testBatchCatchCallbacks() { Bus::chain([ @@ -445,7 +513,7 @@ public function handle() class JobChainingNamedTestJob implements ShouldQueue { - use Dispatchable, InteractsWithQueue, Queueable; + use Batchable, Dispatchable, InteractsWithQueue, Queueable; public static $results = []; @@ -468,13 +536,19 @@ class JobChainingTestBatchedJob implements ShouldQueue public string $id; - public function __construct(string $id) + public int $times; + + public function __construct(string $id, int $times = 0) { $this->id = $id; + $this->times = $times; } public function handle() { + for ($i = 0; $i < $this->times; $i++) { + $this->batch()->add(new JobChainingTestBatchedJob($this->id.'-'.$i)); + } JobRunRecorder::record($this->id); } } From d572e569c54a168dd635456833dd4e25e10fcafd Mon Sep 17 00:00:00 2001 From: Seb Date: Fri, 22 Sep 2023 09:51:58 -0700 Subject: [PATCH 04/15] remove auto-format --- src/Illuminate/Bus/Dispatcher.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index 7d86eec79882..8ed3a21b7c4f 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -54,6 +54,8 @@ class Dispatcher implements QueueingDispatcher /** * Create a new command dispatcher instance. * + * @param \Illuminate\Contracts\Container\Container $container + * @param \Closure|null $queueResolver * @return void */ public function __construct(Container $container, Closure $queueResolver = null) @@ -133,6 +135,7 @@ public function dispatchNow($command, $handler = null) /** * Attempt to find the batch with the given ID. * + * @param string $batchId * @return \Illuminate\Bus\Batch|null */ public function findBatch(string $batchId) @@ -160,7 +163,6 @@ public function batch($jobs) public function chain($jobs) { $jobs = Collection::wrap($jobs); - $jobs = ChainedBatch::prepareNestedBatches($jobs); return new PendingChain($jobs->shift(), $jobs->toArray()); } @@ -268,6 +270,7 @@ public function dispatchAfterResponse($command, $handler = null) /** * Set the pipes through which commands should be piped before dispatching. * + * @param array $pipes * @return $this */ public function pipeThrough(array $pipes) @@ -280,6 +283,7 @@ public function pipeThrough(array $pipes) /** * Map a command to a handler. * + * @param array $map * @return $this */ public function map(array $map) From 02ccd5f2031957de1ecbd38575b3206d04875232 Mon Sep 17 00:00:00 2001 From: Seb Date: Fri, 22 Sep 2023 09:52:36 -0700 Subject: [PATCH 05/15] remove auto-format --- src/Illuminate/Bus/Dispatcher.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index 8ed3a21b7c4f..86b19dfea394 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -163,6 +163,7 @@ public function batch($jobs) public function chain($jobs) { $jobs = Collection::wrap($jobs); + $jobs = ChainedBatch::prepareNestedBatches($jobs); return new PendingChain($jobs->shift(), $jobs->toArray()); } From ad65b7ebe485aa9d8da8adda0d6cf83b0e58c840 Mon Sep 17 00:00:00 2001 From: Seb Date: Fri, 22 Sep 2023 09:53:50 -0700 Subject: [PATCH 06/15] just collection --- src/Illuminate/Bus/ChainedBatch.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 4e09aece18af..b4a64cfd7d2b 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -12,7 +12,7 @@ class ChainedBatch implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable; - public array|Collection $jobs; + public Collection $jobs; public array $options; From 2d0c72d7af02191e8813568450a6acc5e475d365 Mon Sep 17 00:00:00 2001 From: Seb Date: Fri, 22 Sep 2023 09:55:40 -0700 Subject: [PATCH 07/15] chained comes from queueable --- src/Illuminate/Bus/ChainedBatch.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index b4a64cfd7d2b..5dd574c4014e 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -50,7 +50,7 @@ public function handle(Container $container) protected function hijackChain(PendingBatch $batch) { - if (property_exists($this, 'chained') && ! empty($this->chained)) { + if (! empty($this->chained)) { $next = unserialize(array_shift($this->chained)); $next->chained = $this->chained; From 2cb4ff66eded4c78c7b76f9fb486465302cd56b6 Mon Sep 17 00:00:00 2001 From: Seb Date: Mon, 25 Sep 2023 16:07:06 -0700 Subject: [PATCH 08/15] better function name --- src/Illuminate/Bus/ChainedBatch.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 5dd574c4014e..ec35bd5c8b0b 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -31,7 +31,7 @@ public function handle(Container $container) $batch->name = $this->name; $batch->options = $this->options; - $this->hijackChain($batch); + $this->moveChainToEndOfBatch($batch); if ($this->queue) { $batch->onQueue($this->queue); @@ -48,7 +48,7 @@ public function handle(Container $container) $batch->dispatch(); } - protected function hijackChain(PendingBatch $batch) + protected function moveChainToEndOfBatch(PendingBatch $batch) { if (! empty($this->chained)) { $next = unserialize(array_shift($this->chained)); @@ -80,8 +80,8 @@ public static function prepareNestedBatches(Collection $jobs): Collection if ($job instanceof PendingBatch) { $jobs[$k] = new ChainedBatch($job); } - } +} return $jobs; } } From 36bf937e105358f98662ff39fbb2f64f7cc651a4 Mon Sep 17 00:00:00 2001 From: Seb Date: Mon, 25 Sep 2023 17:46:40 -0700 Subject: [PATCH 09/15] reflop From 9e93e0faa3b34e7862a752d2f4703490b3f22968 Mon Sep 17 00:00:00 2001 From: Seb Date: Wed, 4 Oct 2023 16:05:43 -0700 Subject: [PATCH 10/15] messed up indentation --- src/Illuminate/Bus/ChainedBatch.php | 3 ++- tests/Integration/Queue/JobChainingTest.php | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index ec35bd5c8b0b..0ba53d98a9d6 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -81,7 +81,8 @@ public static function prepareNestedBatches(Collection $jobs): Collection $jobs[$k] = new ChainedBatch($job); } -} + } + return $jobs; } } diff --git a/tests/Integration/Queue/JobChainingTest.php b/tests/Integration/Queue/JobChainingTest.php index 160fcc754d29..a18ec9e68676 100644 --- a/tests/Integration/Queue/JobChainingTest.php +++ b/tests/Integration/Queue/JobChainingTest.php @@ -323,11 +323,6 @@ public function testChainBatchChain() $this->assertEquals(['c1', 'c2', 'bc1', 'bc2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); } - /** - * @group debug - * - * @return void - */ public function testChainBatchChainBatch() { Bus::chain([ @@ -361,6 +356,7 @@ public function testBatchCatchCallbacks() Bus::batch([ new JobChainingTestFailingBatchedJob('fb1'), ])->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + new JobChainingNamedTestJob('c3'), ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); $this->assertEquals(['c1', 'c2'], JobRunRecorder::$results); From 905159033f7462addb27580fa1f7558c65f9b76b Mon Sep 17 00:00:00 2001 From: Seb Date: Wed, 4 Oct 2023 16:10:54 -0700 Subject: [PATCH 11/15] styleci --- src/Illuminate/Bus/ChainedBatch.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 0ba53d98a9d6..385fbc418f89 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -80,7 +80,6 @@ public static function prepareNestedBatches(Collection $jobs): Collection if ($job instanceof PendingBatch) { $jobs[$k] = new ChainedBatch($job); } - } return $jobs; From bbfb795b939cf9bece49082600d24fea1d50f9af Mon Sep 17 00:00:00 2001 From: Seb Date: Wed, 11 Oct 2023 12:49:49 -0700 Subject: [PATCH 12/15] allow batch to fail but chain to succeed --- src/Illuminate/Bus/ChainedBatch.php | 17 ++++++++-- tests/Integration/Queue/JobChainingTest.php | 35 +++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 385fbc418f89..9b519745c1df 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -7,6 +7,7 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Collection; +use Throwable; class ChainedBatch implements ShouldQueue { @@ -42,7 +43,13 @@ public function handle(Container $container) } foreach ($this->chainCatchCallbacks ?? [] as $cb) { - $batch->catch($cb); + $batch->catch(function (Batch $batch, ?Throwable $exception) use ($cb) { + if ($batch->allowsFailures()) { + return; + } + + $cb($exception); + }); } $batch->dispatch(); @@ -61,7 +68,13 @@ protected function moveChainToEndOfBatch(PendingBatch $batch) $next->chainQueue = $this->chainQueue; $next->chainCatchCallbacks = $this->chainCatchCallbacks; - $batch->then(fn () => dispatch($next)); + $batch->finally(function (Batch $batch) use ($next) { + if ($batch->canceled()) { + return; + } + + dispatch($next); + }); $this->chained = []; } diff --git a/tests/Integration/Queue/JobChainingTest.php b/tests/Integration/Queue/JobChainingTest.php index a18ec9e68676..9226d3bd6dae 100644 --- a/tests/Integration/Queue/JobChainingTest.php +++ b/tests/Integration/Queue/JobChainingTest.php @@ -362,6 +362,41 @@ public function testBatchCatchCallbacks() $this->assertEquals(['c1', 'c2'], JobRunRecorder::$results); $this->assertEquals(['batch failed', 'chain failed'], JobRunRecorder::$failures); } + + public function testChainBatchFailureAllowed() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestFailingBatchedJob('b2'), + new JobChainingTestBatchedJob('b3'), + ])->allowFailures()->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + new JobChainingNamedTestJob('c3'), + ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b3', 'c3'], JobRunRecorder::$results); + // Only the batch failed, but the chain should keep going since the batch allows failures + $this->assertEquals(['batch failed'], JobRunRecorder::$failures); + } + + public function testChainBatchFailureNotAllowed() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestFailingBatchedJob('b2'), + new JobChainingTestBatchedJob('b3'), + ])->allowFailures(false)->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + new JobChainingNamedTestJob('c3'), + ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b3'], JobRunRecorder::$results); + $this->assertEquals(['batch failed', 'chain failed'], JobRunRecorder::$failures); + } } class JobChainingTestFirstJob implements ShouldQueue From 696202e80c05b7cda087764f35d31b6ce84d6203 Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Wed, 1 Nov 2023 15:12:49 -0500 Subject: [PATCH 13/15] formatting --- src/Illuminate/Bus/ChainedBatch.php | 87 +++++++++++++++++++---------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 9b519745c1df..561f706f9d97 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -13,26 +13,54 @@ class ChainedBatch implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable; + /** + * The collection of batched jobs. + * + * @var \Illuminate\Support\Collection + */ public Collection $jobs; - public array $options; - + /** + * The name of the batch. + * + * @var string + */ public string $name; + /** + * The batch options. + * + * @var array + */ + public array $options; + + /** + * Create a new chained batch instance. + * + * @param \Illuminate\Bus\PendingBatch $batch + * @return void + */ public function __construct(PendingBatch $batch) { $this->jobs = static::prepareNestedBatches($batch->jobs); - $this->options = $batch->options; + $this->name = $batch->name; + $this->options = $batch->options; } + /** + * Handle the job. + * + * @param \Illuminate\Container\Container $container + */ public function handle(Container $container) { $batch = new PendingBatch($container, $this->jobs); + $batch->name = $this->name; $batch->options = $this->options; - $this->moveChainToEndOfBatch($batch); + $this->dispatchRemainderOfChainAfterBatch($batch); if ($this->queue) { $batch->onQueue($this->queue); @@ -42,23 +70,28 @@ public function handle(Container $container) $batch->onConnection($this->connection); } - foreach ($this->chainCatchCallbacks ?? [] as $cb) { - $batch->catch(function (Batch $batch, ?Throwable $exception) use ($cb) { - if ($batch->allowsFailures()) { - return; + foreach ($this->chainCatchCallbacks ?? [] as $callback) { + $batch->catch(function (Batch $batch, ?Throwable $exception) use ($callback) { + if (! $batch->allowsFailures()) { + $callback($exception); } - - $cb($exception); }); } $batch->dispatch(); } - protected function moveChainToEndOfBatch(PendingBatch $batch) + /** + * Move the remainder of the chain to a "finally" batch callback. + * + * @param \Illuminate\Bus\PendingBatch $batch + * @return + */ + protected function dispatchRemainderOfChainAfterBatch(PendingBatch $batch) { if (! empty($this->chained)) { $next = unserialize(array_shift($this->chained)); + $next->chained = $this->chained; $next->onConnection($next->connection ?: $this->chainConnection); @@ -69,32 +102,28 @@ protected function moveChainToEndOfBatch(PendingBatch $batch) $next->chainCatchCallbacks = $this->chainCatchCallbacks; $batch->finally(function (Batch $batch) use ($next) { - if ($batch->canceled()) { - return; + if (! $batch->canceled()) { + dispatch($next); } - - dispatch($next); }); $this->chained = []; } } + /** + * Prepare any nested batches within the given collection of jobs. + * + * @param \Illuminate\Support\Collection $jobs + * @return \Illuminate\Support\Collection + */ public static function prepareNestedBatches(Collection $jobs): Collection { - foreach ($jobs as $k => $job) { - if (is_array($job)) { - $jobs[$k] = static::prepareNestedBatches(collect($job))->all(); - } - if ($job instanceof Collection) { - $jobs[$k] = static::prepareNestedBatches($job); - } - - if ($job instanceof PendingBatch) { - $jobs[$k] = new ChainedBatch($job); - } - } - - return $jobs; + return $jobs->map(fn ($job) => match (true) { + is_array($job) => static::prepareNestedBatches(collect($job))->all(), + $job instanceof Collection => static::prepareNestedBatches($job), + $job instanceof PendingBatch => new ChainedBatch($job), + default => $job, + }); } } From bd9f4415f5f019dd9cafbd032e0e715ae7c9a4b4 Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Wed, 1 Nov 2023 15:16:09 -0500 Subject: [PATCH 14/15] formatting --- src/Illuminate/Bus/ChainedBatch.php | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index 561f706f9d97..cf97013e5606 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -2,7 +2,8 @@ namespace Illuminate\Bus; -use Illuminate\Contracts\Container\Container; +use Illuminate\Container\Container; +use Illuminate\Contracts\Bus\Dispatcher; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; @@ -51,11 +52,11 @@ public function __construct(PendingBatch $batch) /** * Handle the job. * - * @param \Illuminate\Container\Container $container + * @return void */ - public function handle(Container $container) + public function handle() { - $batch = new PendingBatch($container, $this->jobs); + $batch = new PendingBatch(Container::getInstance(), $this->jobs); $batch->name = $this->name; $batch->options = $this->options; @@ -103,7 +104,7 @@ protected function dispatchRemainderOfChainAfterBatch(PendingBatch $batch) $batch->finally(function (Batch $batch) use ($next) { if (! $batch->canceled()) { - dispatch($next); + Container::getInstance()->make(Dispatcher::class)->dispatch($next); } }); From e35803b7e24daa8a33f90ba3eb1357f9396270c0 Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Thu, 2 Nov 2023 13:25:27 -0500 Subject: [PATCH 15/15] formatting --- src/Illuminate/Bus/ChainedBatch.php | 38 ++++++++++++++--------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php index cf97013e5606..6d4f92107400 100644 --- a/src/Illuminate/Bus/ChainedBatch.php +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -49,6 +49,22 @@ public function __construct(PendingBatch $batch) $this->options = $batch->options; } + /** + * Prepare any nested batches within the given collection of jobs. + * + * @param \Illuminate\Support\Collection $jobs + * @return \Illuminate\Support\Collection + */ + public static function prepareNestedBatches(Collection $jobs): Collection + { + return $jobs->map(fn ($job) => match (true) { + is_array($job) => static::prepareNestedBatches(collect($job))->all(), + $job instanceof Collection => static::prepareNestedBatches($job), + $job instanceof PendingBatch => new ChainedBatch($job), + default => $job, + }); + } + /** * Handle the job. * @@ -61,8 +77,6 @@ public function handle() $batch->name = $this->name; $batch->options = $this->options; - $this->dispatchRemainderOfChainAfterBatch($batch); - if ($this->queue) { $batch->onQueue($this->queue); } @@ -71,6 +85,8 @@ public function handle() $batch->onConnection($this->connection); } + $this->dispatchRemainderOfChainAfterBatch($batch); + foreach ($this->chainCatchCallbacks ?? [] as $callback) { $batch->catch(function (Batch $batch, ?Throwable $exception) use ($callback) { if (! $batch->allowsFailures()) { @@ -103,7 +119,7 @@ protected function dispatchRemainderOfChainAfterBatch(PendingBatch $batch) $next->chainCatchCallbacks = $this->chainCatchCallbacks; $batch->finally(function (Batch $batch) use ($next) { - if (! $batch->canceled()) { + if (! $batch->cancelled()) { Container::getInstance()->make(Dispatcher::class)->dispatch($next); } }); @@ -111,20 +127,4 @@ protected function dispatchRemainderOfChainAfterBatch(PendingBatch $batch) $this->chained = []; } } - - /** - * Prepare any nested batches within the given collection of jobs. - * - * @param \Illuminate\Support\Collection $jobs - * @return \Illuminate\Support\Collection - */ - public static function prepareNestedBatches(Collection $jobs): Collection - { - return $jobs->map(fn ($job) => match (true) { - is_array($job) => static::prepareNestedBatches(collect($job))->all(), - $job instanceof Collection => static::prepareNestedBatches($job), - $job instanceof PendingBatch => new ChainedBatch($job), - default => $job, - }); - } }