From e5e67c4b85ac2b46e67657b6f6a9ff2b9578f3a8 Mon Sep 17 00:00:00 2001 From: Sebastien Armand Date: Fri, 3 Nov 2023 06:47:52 -0700 Subject: [PATCH] [10.x] Allow placing a batch on a chain (#48633) * Allow placing a batch on a chain * missing file * Chains of batches of chains of batches * remove auto-format * remove auto-format * just collection * chained comes from queueable * better function name * reflop * messed up indentation * styleci * allow batch to fail but chain to succeed * formatting * formatting * formatting --------- Co-authored-by: Taylor Otwell --- src/Illuminate/Bus/ChainedBatch.php | 130 +++++++++++ src/Illuminate/Bus/Dispatcher.php | 1 + tests/Integration/Queue/JobChainingTest.php | 236 ++++++++++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 src/Illuminate/Bus/ChainedBatch.php diff --git a/src/Illuminate/Bus/ChainedBatch.php b/src/Illuminate/Bus/ChainedBatch.php new file mode 100644 index 000000000000..6d4f92107400 --- /dev/null +++ b/src/Illuminate/Bus/ChainedBatch.php @@ -0,0 +1,130 @@ +jobs = static::prepareNestedBatches($batch->jobs); + + $this->name = $batch->name; + $this->options = $batch->options; + } + + /** + * Prepare any nested batches within the given collection of jobs. + * + * @param \Illuminate\Support\Collection $jobs + * @return \Illuminate\Support\Collection + */ + public static function prepareNestedBatches(Collection $jobs): Collection + { + return $jobs->map(fn ($job) => match (true) { + is_array($job) => static::prepareNestedBatches(collect($job))->all(), + $job instanceof Collection => static::prepareNestedBatches($job), + $job instanceof PendingBatch => new ChainedBatch($job), + default => $job, + }); + } + + /** + * Handle the job. + * + * @return void + */ + public function handle() + { + $batch = new PendingBatch(Container::getInstance(), $this->jobs); + + $batch->name = $this->name; + $batch->options = $this->options; + + if ($this->queue) { + $batch->onQueue($this->queue); + } + + if ($this->connection) { + $batch->onConnection($this->connection); + } + + $this->dispatchRemainderOfChainAfterBatch($batch); + + foreach ($this->chainCatchCallbacks ?? [] as $callback) { + $batch->catch(function (Batch $batch, ?Throwable $exception) use ($callback) { + if (! $batch->allowsFailures()) { + $callback($exception); + } + }); + } + + $batch->dispatch(); + } + + /** + * Move the remainder of the chain to a "finally" batch callback. + * + * @param \Illuminate\Bus\PendingBatch $batch + * @return + */ + protected function dispatchRemainderOfChainAfterBatch(PendingBatch $batch) + { + if (! empty($this->chained)) { + $next = unserialize(array_shift($this->chained)); + + $next->chained = $this->chained; + + $next->onConnection($next->connection ?: $this->chainConnection); + $next->onQueue($next->queue ?: $this->chainQueue); + + $next->chainConnection = $this->chainConnection; + $next->chainQueue = $this->chainQueue; + $next->chainCatchCallbacks = $this->chainCatchCallbacks; + + $batch->finally(function (Batch $batch) use ($next) { + if (! $batch->cancelled()) { + Container::getInstance()->make(Dispatcher::class)->dispatch($next); + } + }); + + $this->chained = []; + } + } +} diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index 8ed3a21b7c4f..86b19dfea394 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -163,6 +163,7 @@ public function batch($jobs) public function chain($jobs) { $jobs = Collection::wrap($jobs); + $jobs = ChainedBatch::prepareNestedBatches($jobs); return new PendingChain($jobs->shift(), $jobs->toArray()); } diff --git a/tests/Integration/Queue/JobChainingTest.php b/tests/Integration/Queue/JobChainingTest.php index b1d7b955fcc6..9226d3bd6dae 100644 --- a/tests/Integration/Queue/JobChainingTest.php +++ b/tests/Integration/Queue/JobChainingTest.php @@ -2,13 +2,16 @@ namespace Illuminate\Tests\Integration\Queue; +use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; +use Illuminate\Database\Schema\Blueprint; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Bus; use Illuminate\Support\Facades\Queue; +use Illuminate\Support\Facades\Schema; use Orchestra\Testbench\TestCase; class JobChainingTest extends TestCase @@ -26,6 +29,26 @@ protected function getEnvironmentSetUp($app) ]); } + protected function setUp(): void + { + parent::setUp(); + + Schema::create('job_batches', function (Blueprint $table) { + $table->string('id')->primary(); + $table->string('name'); + $table->integer('total_jobs'); + $table->integer('pending_jobs'); + $table->integer('failed_jobs'); + $table->longText('failed_job_ids'); + $table->mediumText('options')->nullable(); + $table->integer('cancelled_at')->nullable(); + $table->integer('created_at'); + $table->integer('finished_at')->nullable(); + }); + + JobRunRecorder::reset(); + } + protected function tearDown(): void { JobChainingTestFirstJob::$ran = false; @@ -244,6 +267,136 @@ public function testChainJobsCanBeAppendedWithoutExistingChain() $this->assertNotNull(JobChainAddingAddedJob::$ranAt); } + + public function testBatchCanBeAddedToChain() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2'), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + public function testDynamicBatchCanBeAddedToChain() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2', times: 4), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + public function testChainBatchChain() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + [ + new JobChainingNamedTestJob('bc1'), + new JobChainingNamedTestJob('bc2'), + ], + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2', times: 4), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'bc1', 'bc2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + public function testChainBatchChainBatch() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + [ + new JobChainingNamedTestJob('bc1'), + new JobChainingNamedTestJob('bc2'), + Bus::batch([ + new JobChainingTestBatchedJob('bb1'), + new JobChainingTestBatchedJob('bb2'), + ]), + ], + new JobChainingTestBatchedJob('b1'), + new JobChainingTestBatchedJob('b2', times: 4), + new JobChainingTestBatchedJob('b3'), + new JobChainingTestBatchedJob('b4'), + ]), + new JobChainingNamedTestJob('c3'), + ])->dispatch(); + + $this->assertEquals(['c1', 'c2', 'bc1', 'bc2', 'bb1', 'bb2', 'b1', 'b2-0', 'b2-1', 'b2-2', 'b2-3', 'b2', 'b3', 'b4', 'c3'], JobRunRecorder::$results); + } + + public function testBatchCatchCallbacks() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestFailingBatchedJob('fb1'), + ])->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + new JobChainingNamedTestJob('c3'), + ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); + + $this->assertEquals(['c1', 'c2'], JobRunRecorder::$results); + $this->assertEquals(['batch failed', 'chain failed'], JobRunRecorder::$failures); + } + + public function testChainBatchFailureAllowed() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestFailingBatchedJob('b2'), + new JobChainingTestBatchedJob('b3'), + ])->allowFailures()->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + new JobChainingNamedTestJob('c3'), + ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b3', 'c3'], JobRunRecorder::$results); + // Only the batch failed, but the chain should keep going since the batch allows failures + $this->assertEquals(['batch failed'], JobRunRecorder::$failures); + } + + public function testChainBatchFailureNotAllowed() + { + Bus::chain([ + new JobChainingNamedTestJob('c1'), + new JobChainingNamedTestJob('c2'), + Bus::batch([ + new JobChainingTestBatchedJob('b1'), + new JobChainingTestFailingBatchedJob('b2'), + new JobChainingTestBatchedJob('b3'), + ])->allowFailures(false)->catch(fn () => JobRunRecorder::recordFailure('batch failed')), + new JobChainingNamedTestJob('c3'), + ])->catch(fn () => JobRunRecorder::recordFailure('chain failed'))->dispatch(); + + $this->assertEquals(['c1', 'c2', 'b1', 'b3'], JobRunRecorder::$results); + $this->assertEquals(['batch failed', 'chain failed'], JobRunRecorder::$failures); + } } class JobChainingTestFirstJob implements ShouldQueue @@ -251,7 +404,9 @@ class JobChainingTestFirstJob implements ShouldQueue use Dispatchable, Queueable; public static $ran = false; + public static $usedQueue = null; + public static $usedConnection = null; public function handle() @@ -267,7 +422,9 @@ class JobChainingTestSecondJob implements ShouldQueue use Dispatchable, Queueable; public static $ran = false; + public static $usedQueue = null; + public static $usedConnection = null; public function handle() @@ -283,7 +440,9 @@ class JobChainingTestThirdJob implements ShouldQueue use Dispatchable, Queueable; public static $ran = false; + public static $usedQueue = null; + public static $usedConnection = null; public function handle() @@ -382,3 +541,80 @@ public function handle() throw new \Exception(); } } + +class JobChainingNamedTestJob implements ShouldQueue +{ + use Batchable, Dispatchable, InteractsWithQueue, Queueable; + + public static $results = []; + + public string $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function handle() + { + JobRunRecorder::record($this->id); + } +} + +class JobChainingTestBatchedJob implements ShouldQueue +{ + use Batchable, Dispatchable, InteractsWithQueue, Queueable; + + public string $id; + + public int $times; + + public function __construct(string $id, int $times = 0) + { + $this->id = $id; + $this->times = $times; + } + + public function handle() + { + for ($i = 0; $i < $this->times; $i++) { + $this->batch()->add(new JobChainingTestBatchedJob($this->id.'-'.$i)); + } + JobRunRecorder::record($this->id); + } +} + +class JobChainingTestFailingBatchedJob implements ShouldQueue +{ + use Batchable, Dispatchable, InteractsWithQueue, Queueable; + + public function handle() + { + $this->fail(); + } +} + +class JobRunRecorder +{ + public static $results = []; + + public static $failures = []; + + public static function record(string $id) + { + self::$results[] = $id; + } + + public static function recordFailure(string $message) + { + self::$failures[] = $message; + + return $message; + } + + public static function reset() + { + self::$results = []; + self::$failures = []; + } +}