Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[💡 FEATURE REQUEST]: preserve message publication order after re-queuing #1941

Closed
shieldz80 opened this issue Jun 11, 2024 · 30 comments · Fixed by roadrunner-server/jobs#127 or roadrunner-php/jobs#66
Assignees
Labels
C-feature-accepted Category: Feature discussed and accepted P-jobs Plugin: Jobs
Milestone

Comments

@shieldz80
Copy link

Plugin

JOBS

I have an idea!

Hi,

First of let me just say thank you for the awesome project!

As discussed in https://github.com/orgs/roadrunner-server/discussions/1932 when using RoadRunner with RabbitMQ, if, during the processing of a message, an error occurs and the message is re-queued it will end up at the back of the queue. This breaks the expected behavior of RabbitMQ which states that (https://www.rabbitmq.com/docs/semantics#ordering)

From RabbitMQ release 2.7.0, messages are always held in the queue in publication order, even in the presence of requeueing or channel closure.

A sample log of this issue presented below

[2024-06-11T05:45:09.336463+00:00] q1.INFO: Processing message with id 1 from queue q1 {"id":1,"queue":"q1"} []
[2024-06-11T05:45:14.337513+00:00] q1.INFO: Message with id 1 from queue q1 was processed successfully {"id":1,"queue":"q1"} []
[2024-06-11T05:45:14.340208+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-11T05:45:15.340502+00:00] q1.INFO: Message with id 2 from queue q1 was processed successfully {"id":2,"queue":"q1"} []
[2024-06-11T05:45:15.342380+00:00] q1.INFO: Processing message with id 3 from queue q1 {"id":3,"queue":"q1"} []
[2024-06-11T05:45:16.342790+00:00] q1.INFO: Message with id 3 from queue q1 was processed successfully {"id":3,"queue":"q1"} []
[2024-06-11T05:45:16.344111+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-11T05:45:17.344381+00:00] q1.INFO: Message with id 4 from queue q1 was processed successfully {"id":4,"queue":"q1"} []
[2024-06-11T05:45:17.344941+00:00] q1.INFO: Processing message with id 5 from queue q1 {"id":5,"queue":"q1"} []
[2024-06-11T05:45:18.345077+00:00] q1.INFO: Message with id 5 from queue q1 was processed successfully {"id":5,"queue":"q1"} []
[2024-06-11T05:45:18.346084+00:00] q1.INFO: Processing message with id 6 from queue q1 {"id":6,"queue":"q1"} []
[2024-06-11T05:45:21.346338+00:00] q1.INFO: Message with id 6 from queue q1 was processed successfully {"id":6,"queue":"q1"} []
[2024-06-11T05:45:21.348139+00:00] q1.INFO: Processing message with id 7 from queue q1 {"id":7,"queue":"q1"} []
[2024-06-11T05:45:25.348523+00:00] q1.INFO: Message with id 7 from queue q1 was processed successfully {"id":7,"queue":"q1"} []
[2024-06-11T05:45:25.350130+00:00] q1.INFO: Processing message with id 8 from queue q1 {"id":8,"queue":"q1"} []
[2024-06-11T05:45:27.350445+00:00] q1.INFO: Message with id 8 from queue q1 was processed successfully {"id":8,"queue":"q1"} []
[2024-06-11T05:45:27.352355+00:00] q1.INFO: Processing message with id 9 from queue q1 {"id":9,"queue":"q1"} []
[2024-06-11T05:45:28.352569+00:00] q1.INFO: Message with id 9 from queue q1 was processed successfully {"id":9,"queue":"q1"} []
[2024-06-11T05:45:28.353735+00:00] q1.INFO: Processing message with id 10 from queue q1 {"id":10,"queue":"q1"} []
[2024-06-11T05:45:29.353907+00:00] q1.INFO: Message with id 10 from queue q1 was processed successfully {"id":10,"queue":"q1"} []
[2024-06-11T05:45:29.354316+00:00] q1.INFO: Processing message with id 11 from queue q1 {"id":11,"queue":"q1"} []
[2024-06-11T05:45:30.354478+00:00] q1.INFO: Message with id 11 from queue q1 was processed successfully {"id":11,"queue":"q1"} []
[2024-06-11T05:45:30.373054+00:00] q1.INFO: Processing message with id 12 from queue q1 {"id":12,"queue":"q1"} []
[2024-06-11T05:45:32.373611+00:00] q1.INFO: Message with id 12 from queue q1 was processed successfully {"id":12,"queue":"q1"} []
[2024-06-11T05:45:32.375043+00:00] q1.INFO: Processing message with id 13 from queue q1 {"id":13,"queue":"q1"} []
[2024-06-11T05:45:34.375274+00:00] q1.INFO: Message with id 13 from queue q1 was processed successfully {"id":13,"queue":"q1"} []
[2024-06-11T05:45:34.384759+00:00] q1.INFO: Processing message with id 14 from queue q1 {"id":14,"queue":"q1"} []
[2024-06-11T05:45:35.385047+00:00] q1.INFO: Message with id 14 from queue q1 was processed successfully {"id":14,"queue":"q1"} []
[2024-06-11T05:45:35.385643+00:00] q1.INFO: Processing message with id 15 from queue q1 {"id":15,"queue":"q1"} []
[2024-06-11T05:45:38.385851+00:00] q1.INFO: Message with id 15 from queue q1 was processed successfully {"id":15,"queue":"q1"} []
[2024-06-11T05:45:38.387073+00:00] q1.INFO: Processing message with id 16 from queue q1 {"id":16,"queue":"q1"} []
[2024-06-11T05:45:40.387259+00:00] q1.INFO: Message with id 16 from queue q1 was processed successfully {"id":16,"queue":"q1"} []
[2024-06-11T05:45:40.387666+00:00] q1.INFO: Processing message with id 17 from queue q1 {"id":17,"queue":"q1"} []
[2024-06-11T05:45:41.387875+00:00] q1.INFO: Message with id 17 from queue q1 was processed successfully {"id":17,"queue":"q1"} []
[2024-06-11T05:45:41.388831+00:00] q1.INFO: Processing message with id 18 from queue q1 {"id":18,"queue":"q1"} []
[2024-06-11T05:45:42.388996+00:00] q1.INFO: Message with id 18 from queue q1 was processed successfully {"id":18,"queue":"q1"} []
[2024-06-11T05:45:42.389602+00:00] q1.INFO: Processing message with id 19 from queue q1 {"id":19,"queue":"q1"} []
[2024-06-11T05:45:44.389864+00:00] q1.INFO: Message with id 19 from queue q1 was processed successfully {"id":19,"queue":"q1"} []
[2024-06-11T05:45:44.391741+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-11T05:45:47.392085+00:00] q1.INFO: Message with id 20 from queue q1 was processed successfully {"id":20,"queue":"q1"} []
[2024-06-11T05:45:47.394067+00:00] q1.INFO: Processing message with id 21 from queue q1 {"id":21,"queue":"q1"} []
[2024-06-11T05:45:47.394269+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-11T05:45:47.414444+00:00] q1.INFO: Processing message with id 22 from queue q1 {"id":22,"queue":"q1"} []
[2024-06-11T05:45:49.415176+00:00] q1.INFO: Message with id 22 from queue q1 was processed successfully {"id":22,"queue":"q1"} []
[2024-06-11T05:45:49.417059+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-11T05:45:54.417277+00:00] q1.INFO: Message with id 23 from queue q1 was processed successfully {"id":23,"queue":"q1"} []
[2024-06-11T05:45:54.418509+00:00] q1.INFO: Processing message with id 24 from queue q1 {"id":24,"queue":"q1"} []
[2024-06-11T05:45:56.418626+00:00] q1.INFO: Message with id 24 from queue q1 was processed successfully {"id":24,"queue":"q1"} []
[2024-06-11T05:45:56.419485+00:00] q1.INFO: Processing message with id 25 from queue q1 {"id":25,"queue":"q1"} []
[2024-06-11T05:45:57.419698+00:00] q1.INFO: Message with id 25 from queue q1 was processed successfully {"id":25,"queue":"q1"} []
[2024-06-11T05:45:57.420709+00:00] q1.INFO: Processing message with id 21 from queue q1 {"id":21,"queue":"q1"} []
[2024-06-11T05:45:58.420976+00:00] q1.INFO: Message with id 21 from queue q1 was processed successfully {"id":21,"queue":"q1"} []

Here we can see that during the processing of message with id 21 an exception was thrown and the message is re-queued and processed again last.

If possible, the expected behavior of RabbitMQ should be preserved.

@shieldz80 shieldz80 added the C-feature-request Category: feature requested, but need to be discussed label Jun 11, 2024
@rustatian
Copy link
Member

Hey @shieldz80 👋
I didn't see a requeue_on_fail option set to true in your configuration (in the discussion). Could you please retest with that option?

@shieldz80
Copy link
Author

shieldz80 commented Jun 12, 2024

Hi, @rustatian,

The RR config

version: '3'

amqp:
  addr: amqp://admin:123@rabbitmq:5672

rpc:
  listen: tcp://127.0.0.1:6001

server:
  command: php consumer.php
  relay: pipes

jobs:
  pool:
    num_workers: ${RR_WORKERS:-4}
    max_jobs: 10

  consume: [ "q1", "q2", "q3", "q4" ]
  pipelines:
    q1:
      driver: amqp
      config:
        prefetch: ${RR_Q1_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q1-rk
        queue: q1
        durable: true
        requeue_on_fail: true
    q2:
      driver: amqp
      config:
        prefetch: ${RR_Q2_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q2-rk
        queue: q2
        durable: true
        requeue_on_fail: true
    q3:
      driver: amqp
      config:
        prefetch: ${RR_Q3_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q3-rk
        queue: q3
        durable: true
        requeue_on_fail: true
    q4:
      driver: amqp
      config:
        prefetch: ${RR_Q4_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q4-rk
        queue: q4
        durable: true
        requeue_on_fail: true

RR_WORKERS=4
RR_Q1_PREFETCH=1
RR_Q2_PREFETCH=1
RR_Q3_PREFETCH=1
RR_Q4_PREFETCH=1

Processing log for one of the queues

[2024-06-12T03:46:49.356968+00:00] q1.INFO: Processing message with id 1 from queue q1 {"id":1,"queue":"q1"} []
[2024-06-12T03:46:53.358140+00:00] q1.INFO: Message with id 1 from queue q1 was processed successfully {"id":1,"queue":"q1"} []
[2024-06-12T03:46:53.361934+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-12T03:46:53.362141+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(3) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-12T03:46:53.363823+00:00] q1.INFO: Processing message with id 3 from queue q1 {"id":3,"queue":"q1"} []
[2024-06-12T03:46:57.364105+00:00] q1.INFO: Message with id 3 from queue q1 was processed successfully {"id":3,"queue":"q1"} []
[2024-06-12T03:46:57.364897+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-12T03:46:57.364948+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(5) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-12T03:46:57.365597+00:00] q1.INFO: Processing message with id 5 from queue q1 {"id":5,"queue":"q1"} []
[2024-06-12T03:46:59.365825+00:00] q1.INFO: Message with id 5 from queue q1 was processed successfully {"id":5,"queue":"q1"} []
[2024-06-12T03:46:59.367722+00:00] q1.INFO: Processing message with id 6 from queue q1 {"id":6,"queue":"q1"} []
[2024-06-12T03:47:02.368020+00:00] q1.INFO: Message with id 6 from queue q1 was processed successfully {"id":6,"queue":"q1"} []
[2024-06-12T03:47:02.368880+00:00] q1.INFO: Processing message with id 7 from queue q1 {"id":7,"queue":"q1"} []
[2024-06-12T03:47:06.369010+00:00] q1.INFO: Message with id 7 from queue q1 was processed successfully {"id":7,"queue":"q1"} []
[2024-06-12T03:47:06.369764+00:00] q1.INFO: Processing message with id 8 from queue q1 {"id":8,"queue":"q1"} []
[2024-06-12T03:47:09.369919+00:00] q1.INFO: Message with id 8 from queue q1 was processed successfully {"id":8,"queue":"q1"} []
[2024-06-12T03:47:09.371341+00:00] q1.INFO: Processing message with id 9 from queue q1 {"id":9,"queue":"q1"} []
[2024-06-12T03:47:14.371596+00:00] q1.INFO: Message with id 9 from queue q1 was processed successfully {"id":9,"queue":"q1"} []
[2024-06-12T03:47:14.373173+00:00] q1.INFO: Processing message with id 10 from queue q1 {"id":10,"queue":"q1"} []
[2024-06-12T03:47:15.373583+00:00] q1.INFO: Message with id 10 from queue q1 was processed successfully {"id":10,"queue":"q1"} []
[2024-06-12T03:47:15.375500+00:00] q1.INFO: Processing message with id 11 from queue q1 {"id":11,"queue":"q1"} []
[2024-06-12T03:47:19.375756+00:00] q1.INFO: Message with id 11 from queue q1 was processed successfully {"id":11,"queue":"q1"} []
[2024-06-12T03:47:19.390277+00:00] q1.INFO: Processing message with id 12 from queue q1 {"id":12,"queue":"q1"} []
[2024-06-12T03:47:21.391060+00:00] q1.INFO: Message with id 12 from queue q1 was processed successfully {"id":12,"queue":"q1"} []
[2024-06-12T03:47:21.393342+00:00] q1.INFO: Processing message with id 13 from queue q1 {"id":13,"queue":"q1"} []
[2024-06-12T03:47:23.393757+00:00] q1.INFO: Message with id 13 from queue q1 was processed successfully {"id":13,"queue":"q1"} []
[2024-06-12T03:47:23.395396+00:00] q1.INFO: Processing message with id 14 from queue q1 {"id":14,"queue":"q1"} []
[2024-06-12T03:47:27.395655+00:00] q1.INFO: Message with id 14 from queue q1 was processed successfully {"id":14,"queue":"q1"} []
[2024-06-12T03:47:27.397606+00:00] q1.INFO: Processing message with id 15 from queue q1 {"id":15,"queue":"q1"} []
[2024-06-12T03:47:29.397886+00:00] q1.INFO: Message with id 15 from queue q1 was processed successfully {"id":15,"queue":"q1"} []
[2024-06-12T03:47:29.399989+00:00] q1.INFO: Processing message with id 16 from queue q1 {"id":16,"queue":"q1"} []
[2024-06-12T03:47:32.400372+00:00] q1.INFO: Message with id 16 from queue q1 was processed successfully {"id":16,"queue":"q1"} []
[2024-06-12T03:47:32.402220+00:00] q1.INFO: Processing message with id 17 from queue q1 {"id":17,"queue":"q1"} []
[2024-06-12T03:47:35.402455+00:00] q1.INFO: Message with id 17 from queue q1 was processed successfully {"id":17,"queue":"q1"} []
[2024-06-12T03:47:35.403190+00:00] q1.INFO: Processing message with id 18 from queue q1 {"id":18,"queue":"q1"} []
[2024-06-12T03:47:37.403457+00:00] q1.INFO: Message with id 18 from queue q1 was processed successfully {"id":18,"queue":"q1"} []
[2024-06-12T03:47:37.405668+00:00] q1.INFO: Processing message with id 19 from queue q1 {"id":19,"queue":"q1"} []
[2024-06-12T03:47:41.405954+00:00] q1.INFO: Message with id 19 from queue q1 was processed successfully {"id":19,"queue":"q1"} []
[2024-06-12T03:47:41.407684+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-12T03:47:42.407894+00:00] q1.INFO: Message with id 20 from queue q1 was processed successfully {"id":20,"queue":"q1"} []
[2024-06-12T03:47:42.409138+00:00] q1.INFO: Processing message with id 21 from queue q1 {"id":21,"queue":"q1"} []
[2024-06-12T03:47:43.409408+00:00] q1.INFO: Message with id 21 from queue q1 was processed successfully {"id":21,"queue":"q1"} []
[2024-06-12T03:47:43.428206+00:00] q1.INFO: Processing message with id 22 from queue q1 {"id":22,"queue":"q1"} []
[2024-06-12T03:47:47.428758+00:00] q1.INFO: Message with id 22 from queue q1 was processed successfully {"id":22,"queue":"q1"} []
[2024-06-12T03:47:47.440209+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-12T03:47:49.440940+00:00] q1.INFO: Message with id 23 from queue q1 was processed successfully {"id":23,"queue":"q1"} []
[2024-06-12T03:47:49.442779+00:00] q1.INFO: Processing message with id 24 from queue q1 {"id":24,"queue":"q1"} []
[2024-06-12T03:47:50.443154+00:00] q1.INFO: Message with id 24 from queue q1 was processed successfully {"id":24,"queue":"q1"} []
[2024-06-12T03:47:50.446913+00:00] q1.INFO: Processing message with id 25 from queue q1 {"id":25,"queue":"q1"} []
[2024-06-12T03:47:51.447201+00:00] q1.INFO: Message with id 25 from queue q1 was processed successfully {"id":25,"queue":"q1"} []
[2024-06-12T03:47:51.448520+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-12T03:47:54.448823+00:00] q1.INFO: Message with id 2 from queue q1 was processed successfully {"id":2,"queue":"q1"} []
[2024-06-12T03:47:54.449815+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-12T03:47:59.449968+00:00] q1.INFO: Message with id 4 from queue q1 was processed successfully {"id":4,"queue":"q1"} []

Processing of messages with id 2 and 4 threw an exception and unfortunately the behavior is the same as observed before

Let me know if you want me to test anything else.

@shieldz80
Copy link
Author

Code for the consumer just in case

<?php

declare(strict_types=1);

use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Psr\Log\LoggerInterface;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Spiral\RoadRunner\Jobs\Consumer;

require realpath(__DIR__ . '/vendor/autoload.php');

$logDir = realpath(__DIR__ . '/var/log');

$consumer = new Consumer();
$loggers = [
    $q = 'q1' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
    $q = 'q2' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
    $q = 'q3' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
    $q = 'q4' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
];

while ($task = $consumer->waitTask()) {
    try {
        $queue = $task->getQueue();
        /** @var LoggerInterface $logger */
        $logger = $loggers[$queue];

        (new GenericHandler($logger))->process($task);

        $task->complete();
    } catch (Throwable $e) {
        $logger->error($e);
        $task->fail($e, true);
    }
}

@rustatian
Copy link
Member

Thanks, @shieldz80 👍

I guess that the problem is in the priority. Are all the messages are already in the RabbitMQ queue, or they're pushed one by one after consume?

@rustatian
Copy link
Member

Also, if that's possible, could you please show the code with amqp extension? Where everything was in order.

@shieldz80
Copy link
Author

Are all the messages are already in the RabbitMQ queue, or they're pushed one by one after consume?

When I test, I bring up all the containers, then I execute the command that publishes the 100 messages to the "main" exchange. So they're produced more or less at once, definitely not one by one.

Also, if that's possible, could you please show the code with amqp extension? Where everything was in order.

Sure, here it is

<?php

declare(strict_types=1);

namespace Shieldz\ConsumerApp\Command;

use AMQPChannel;
use AMQPConnection;
use AMQPEnvelope;
use AMQPQueue;
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Shieldz\ConsumerApp\Task\Task;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;

use function extension_loaded;
use function sprintf;

use const AMQP_REQUEUE;
use const SIGINT;
use const SIGQUIT;
use const SIGTERM;

#[AsCommand(name: 'consume-messages')]
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
{
    private AMQPConnection $connection;
    private int $signal = -1;
    private bool $shouldStop = false;

    public function __construct(private Logger $loggerProto)
    {
        parent::__construct();
        $this->setAmqpConnection();
    }

    protected function configure(): void
    {
        $this->addArgument('queueName', InputArgument::REQUIRED);
        $this->addOption(name: 'max-jobs', mode: InputOption::VALUE_REQUIRED, default: 10);
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $queueName = $input->getArgument('queueName');
        $maxJobs = $input->getOption('max-jobs');

        $logger = $this->loggerProto
            ->withName($queueName)
            ->pushHandler(
                new StreamHandler(sprintf('%s/%s.log', APP_LOG_DIR, $queueName), Level::Debug)
            );

        $queue = new AMQPQueue(new AMQPChannel($this->connection));
        $queue->setName($queueName);

        $executedJobs = 0;
        while (false === $this->shouldStop) {
            $message = $queue->get();
            if (!$message instanceof AMQPEnvelope) {
                continue;
            }

            try {
                (new GenericHandler($logger))
                    ->process(new Task($message->getBody(), $queue->getName()));
                $queue->ack($message->getDeliveryTag());
            } catch (Throwable $e) {
                $queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
                $logger->error($e);
            }

            $this->shouldStop = ++$executedJobs >= $maxJobs;
        }

        return match ($this->signal) {
            -1 => Command::SUCCESS,
            default => 128 + $this->signal,
        };
    }

    public function getSubscribedSignals(): array
    {
        return extension_loaded('pcntl') ? [SIGINT, SIGTERM, SIGQUIT] : [];
    }

    public function handleSignal(int $signal, false|int $previousExitCode = 0): int|false
    {
        $this->signal = $signal;
        $this->shouldStop = true;
        return false;
    }

    private function setAmqpConnection(): void
    {
        if (isset($this->connection)) {
            return;
        }

        $connection = new AMQPConnection();
        $connection->setHost('rabbitmq');
        $connection->setLogin('admin');
        $connection->setPassword('123');
        $connection->connect();

        $this->connection = $connection;
    }
}

The processing code is pretty much the same as in the RR version

class GenericHandler
{
    public function __construct(private LoggerInterface $logger)
    {
    }

    public function process(Task $task): void
    {
        $queue = $task->getQueue();
        /** @var array{id: int, load: int} $payload */
        $payload = json_decode($task->getPayload(), true);

        $this->logger->info(
            'Processing message with id {id} from queue {queue}',
            ['id' => $payload['id'], 'queue' => $queue]
        );

        $this->doProcessing($payload['load']);

        $this->logger->info(
            'Message with id {id} from queue {queue} was processed successfully',
            ['id' => $payload['id'], 'queue' => $queue]
        );
    }

    private function doProcessing(int $load): void
    {
        $throwRandomExceptions = $_ENV['THROW_RANDOM_EXCEPTIONS'] ?? '0' === '1';
        if ($throwRandomExceptions && random_int(1, 20) === 5) {
            // 5% chance to throw an exception
            throw new RuntimeException('Processing failed');
        }

        // this emulates processing for $load seconds
        sleep($load);
    }
}

Consuming is then started by running php bin/console.php consume-messages --max-jobs=10 q1

@rustatian
Copy link
Member

Hm, yeah, code is just about the same...
Could you please, use debug logging for RR and show me the debug logs?
Also, if I understand correctly, you're using only 1 queue, but declared 4 of them. So, 3 of them are not used, right?

@shieldz80
Copy link
Author

Could you please, use debug logging for RR and show me the debug logs?

Sure, I'll post the logs soon.

if I understand correctly, you're using only 1 queue, but declared 4 of them. So, 3 of them are not used, right?

No, all four are used, the container uses s6-overlay to start 4 supervised processes that each consume from one queue

Process tree looks like this in the container that uses plain php to consume from rabbitmq

$ docker top classic-consumer axf o "pid,user,args"
PID                 USER                COMMAND
3754558             root                \_ /package/admin/s6/command/s6-svscan -d4 -- /run/service
3754664             root                \_ s6-supervise s6-linux-init-shutdownd
3754666             root                | \_ /package/admin/s6-linux-init/command/s6-linux-init-shutdownd -d3 -c /run/s6/basedir -g 3000 -C -B
3754672             root                \_ s6-supervise q1-consumer
3754718             shieldz              | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q1
3754673             root                \_ s6-supervise s6rc-oneshot-runner
3754687             root                | \_ /package/admin/s6/command/s6-ipcserverd -1 -- /package/admin/s6/command/s6-ipcserver-access -v0 -E -l0 -i data/rules -- /package/admin/s6/command/s6-sudod -t 30000 -- /package/admin/s6-rc/command/s6-rc-oneshot-run -l ../.. --
3754674             root                \_ s6-supervise q3-consumer
3754717             shieldz              | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q3
3754675             root                \_ s6-supervise q4-consumer
3754715             shieldz              | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q4
3754676             root                \_ s6-supervise s6rc-fdholder
3754677             root                \_ s6-supervise q2-consumer
3754719             shieldz              \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q2

@rustatian
Copy link
Member

I mean, for the RR. You also used all 4 queues, right?

@shieldz80
Copy link
Author

Here are logs (this is what the container outputted during the processing of messages)

rr_logs.txt

If you wanted other logs let me know ...

@shieldz80
Copy link
Author

I mean, for the RR. You also used all 4 queues, right?

Yes, I produce 25 messages for each queue, and all are consumed in my setup. I only showed you processing logs for 1 queue to avoid cluttering the comments

@shieldz80
Copy link
Author

Here's the processing logs for all queues for the latest run

q1.log
q2.log
q3.log
q4.log

Let me know, if you need anything else ...

@rustatian
Copy link
Member

Ok, debug logs helped me to figure out what's happening.
As you may see, there is an entry with the message: job was re-queued. That means, that RR failed to process the JOB (message, envelope) and it was re-queued. But actually, re-queue means the following - update headers with new from the PHP worker, then push the job back (same payload, new headers), confirm that, Ack the previous one.
Thus, you see these failed jobs at the end of the queue.

@shieldz80
Copy link
Author

So exposing that multiply option could alter this behavior?

@rustatian
Copy link
Member

Actually, no. We should add a new option to Nack the message, instead of Re-queueing it.

@rustatian
Copy link
Member

@msmakouz Here is the updated protocol after our discussion to handle this case:

  1. Add 3 separate methods to the roadrunner-php/jobs client library: ack, nack(data), requeue(data).
  2. Add 3 new types: ACK, NACK, REQUEUE, after the currently existing NoError and Error.
  3. ACK is the same as NoError, so, RR just ACK's the message. No data expected.
  4. NACK. RR NACK's the message and log the reason (if sent by PHP worker). Json field - message.
  5. REQUEUE - the same as ERROR, but using the new method requeue(data). Data payload is the same.

@shieldz80 Thanks again for the report. We're going to update our internal RR<->PHP protocol to handle raw NACK's. In your case, should be able to just NACK the message to put it back to the same queue at the same position. Keep in mind, however, that pipelines are consumed in parallel, not synchronously 1-by-1.

@shieldz80
Copy link
Author

We're going to update our internal RR<->PHP protocol to handle raw NACK's. In your case, should be able to just NACK the message to put it back to the same queue at the same position. Keep in mind, however, that pipelines are consumed in parallel, not synchronously 1-by-1.

Great! Thank you!

@rustatian
Copy link
Member

Hey @shieldz80 👋
New protocol handlers are added to the RR. We'll need to sync a PHP part and you'll be able to re-test your case using the usual ack/nack instead of requeue.

@rustatian rustatian added C-feature-accepted Category: Feature discussed and accepted and removed C-feature-request Category: feature requested, but need to be discussed labels Jun 20, 2024
@rustatian rustatian added this to the v2024.1.5 milestone Jun 20, 2024
@rustatian rustatian moved this to ✅ Done in Jira 😄 Jun 20, 2024
@rustatian rustatian mentioned this issue Jun 20, 2024
6 tasks
@shieldz80
Copy link
Author

Hi @rustatian,

Thanks! Looking forward to test it. Is there some way to know when the PHP SDK is updated?

@rustatian
Copy link
Member

Hey @shieldz80 ,
This or early next week.

@shieldz80
Copy link
Author

shieldz80 commented Jun 25, 2024

Hi @rustatian , @msmakouz

I saw that the ack/nack functionality was merged into the 4.x branch of the roadrunner-php/jobs project and I decided to test it, but unfortunately something doesn't look right to me. When there's an error even though the message is nacked it seems it just gets lost from what I see in my logs. I attached the RR debug logs.

rr_logs.txt

Here's also what I changed in my code. The consumer code now looks like this to use ack/nack

<?php

declare(strict_types=1);

use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Psr\Log\LoggerInterface;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Spiral\RoadRunner\Jobs\Consumer;

require realpath(__DIR__ . '/vendor/autoload.php');

$logDir = realpath(__DIR__ . '/var/log');

$consumer = new Consumer();
$loggers = [
    $q = 'q1' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
    $q = 'q2' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
    $q = 'q3' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
    $q = 'q4' => (new Logger($q))
        ->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
        ->pushProcessor(new PsrLogMessageProcessor()),
];

while ($task = $consumer->waitTask()) {
    try {
        $queue = $task->getQueue();
        /** @var LoggerInterface $logger */
        $logger = $loggers[$queue];

        (new GenericHandler($logger))->process($task);

        $task->ack();
    } catch (Throwable $e) {
        $logger->error($e);
        $task->nack($e, redelivery: true);
    }
}

And I've updated the composer.json file to use the 4.x branch since no 4.5.0 tag was added yet.

{
    "name": "shieldz/rr-consumer-app",
    "type": "project",
    "autoload": {
        "psr-4": {
            "Shieldz\\ConsumerApp\\": "src/"
        }
    },
    "require": {
        "php": "^8.3",
        "ext-sockets": "*",
        "spiral/roadrunner-jobs": "4.x-dev",
        "monolog/monolog": "^3.6"
    }
}

The output I get in my application for one of the queues, looks like this

[2024-06-25T05:33:19.155856+00:00] q3.INFO: Processing message with id 1 from queue q3 {"id":1,"queue":"q3"} []
[2024-06-25T05:33:20.156814+00:00] q3.INFO: Message with id 1 from queue q3 was processed successfully {"id":1,"queue":"q3"} []
[2024-06-25T05:33:20.157521+00:00] q3.INFO: Processing message with id 2 from queue q3 {"id":2,"queue":"q3"} []
[2024-06-25T05:33:21.157766+00:00] q3.INFO: Message with id 2 from queue q3 was processed successfully {"id":2,"queue":"q3"} []
[2024-06-25T05:33:21.159260+00:00] q3.INFO: Processing message with id 3 from queue q3 {"id":3,"queue":"q3"} []
[2024-06-25T05:33:22.159621+00:00] q3.INFO: Message with id 3 from queue q3 was processed successfully {"id":3,"queue":"q3"} []
[2024-06-25T05:33:22.161287+00:00] q3.INFO: Processing message with id 4 from queue q3 {"id":4,"queue":"q3"} []
[2024-06-25T05:33:23.161657+00:00] q3.INFO: Message with id 4 from queue q3 was processed successfully {"id":4,"queue":"q3"} []
[2024-06-25T05:33:23.163091+00:00] q3.INFO: Processing message with id 5 from queue q3 {"id":5,"queue":"q3"} []
[2024-06-25T05:33:24.163319+00:00] q3.INFO: Message with id 5 from queue q3 was processed successfully {"id":5,"queue":"q3"} []
[2024-06-25T05:33:24.164279+00:00] q3.INFO: Processing message with id 6 from queue q3 {"id":6,"queue":"q3"} []
[2024-06-25T05:33:25.164484+00:00] q3.INFO: Message with id 6 from queue q3 was processed successfully {"id":6,"queue":"q3"} []
[2024-06-25T05:33:25.165324+00:00] q3.INFO: Processing message with id 7 from queue q3 {"id":7,"queue":"q3"} []
[2024-06-25T05:33:26.165612+00:00] q3.INFO: Message with id 7 from queue q3 was processed successfully {"id":7,"queue":"q3"} []
[2024-06-25T05:33:26.167605+00:00] q3.INFO: Processing message with id 8 from queue q3 {"id":8,"queue":"q3"} []
[2024-06-25T05:33:26.167810+00:00] q3.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:33:26.169076+00:00] q3.INFO: Processing message with id 9 from queue q3 {"id":9,"queue":"q3"} []
[2024-06-25T05:33:27.169272+00:00] q3.INFO: Message with id 9 from queue q3 was processed successfully {"id":9,"queue":"q3"} []
[2024-06-25T05:33:27.169856+00:00] q3.INFO: Processing message with id 10 from queue q3 {"id":10,"queue":"q3"} []
[2024-06-25T05:33:28.170024+00:00] q3.INFO: Message with id 10 from queue q3 was processed successfully {"id":10,"queue":"q3"} []
[2024-06-25T05:33:28.181617+00:00] q3.INFO: Processing message with id 11 from queue q3 {"id":11,"queue":"q3"} []
[2024-06-25T05:33:29.181850+00:00] q3.INFO: Message with id 11 from queue q3 was processed successfully {"id":11,"queue":"q3"} []
[2024-06-25T05:33:29.184378+00:00] q3.INFO: Processing message with id 12 from queue q3 {"id":12,"queue":"q3"} []
[2024-06-25T05:33:30.184946+00:00] q3.INFO: Message with id 12 from queue q3 was processed successfully {"id":12,"queue":"q3"} []
[2024-06-25T05:33:30.185439+00:00] q3.INFO: Processing message with id 13 from queue q3 {"id":13,"queue":"q3"} []
[2024-06-25T05:33:31.185579+00:00] q3.INFO: Message with id 13 from queue q3 was processed successfully {"id":13,"queue":"q3"} []
[2024-06-25T05:33:31.187067+00:00] q3.INFO: Processing message with id 14 from queue q3 {"id":14,"queue":"q3"} []
[2024-06-25T05:33:32.187288+00:00] q3.INFO: Message with id 14 from queue q3 was processed successfully {"id":14,"queue":"q3"} []
[2024-06-25T05:33:32.188656+00:00] q3.INFO: Processing message with id 15 from queue q3 {"id":15,"queue":"q3"} []
[2024-06-25T05:33:33.188872+00:00] q3.INFO: Message with id 15 from queue q3 was processed successfully {"id":15,"queue":"q3"} []
[2024-06-25T05:33:33.189483+00:00] q3.INFO: Processing message with id 16 from queue q3 {"id":16,"queue":"q3"} []
[2024-06-25T05:33:34.189598+00:00] q3.INFO: Message with id 16 from queue q3 was processed successfully {"id":16,"queue":"q3"} []
[2024-06-25T05:33:34.190953+00:00] q3.INFO: Processing message with id 17 from queue q3 {"id":17,"queue":"q3"} []
[2024-06-25T05:33:35.191347+00:00] q3.INFO: Message with id 17 from queue q3 was processed successfully {"id":17,"queue":"q3"} []
[2024-06-25T05:33:35.191998+00:00] q3.INFO: Processing message with id 18 from queue q3 {"id":18,"queue":"q3"} []
[2024-06-25T05:33:36.192140+00:00] q3.INFO: Message with id 18 from queue q3 was processed successfully {"id":18,"queue":"q3"} []
[2024-06-25T05:33:36.192856+00:00] q3.INFO: Processing message with id 19 from queue q3 {"id":19,"queue":"q3"} []
[2024-06-25T05:33:37.193063+00:00] q3.INFO: Message with id 19 from queue q3 was processed successfully {"id":19,"queue":"q3"} []
[2024-06-25T05:33:37.194655+00:00] q3.INFO: Processing message with id 20 from queue q3 {"id":20,"queue":"q3"} []
[2024-06-25T05:33:38.195043+00:00] q3.INFO: Message with id 20 from queue q3 was processed successfully {"id":20,"queue":"q3"} []
[2024-06-25T05:33:38.196647+00:00] q3.INFO: Processing message with id 21 from queue q3 {"id":21,"queue":"q3"} []
[2024-06-25T05:33:39.196991+00:00] q3.INFO: Message with id 21 from queue q3 was processed successfully {"id":21,"queue":"q3"} []
[2024-06-25T05:33:39.200426+00:00] q3.INFO: Processing message with id 22 from queue q3 {"id":22,"queue":"q3"} []
[2024-06-25T05:33:40.200676+00:00] q3.INFO: Message with id 22 from queue q3 was processed successfully {"id":22,"queue":"q3"} []
[2024-06-25T05:33:40.215815+00:00] q3.INFO: Processing message with id 23 from queue q3 {"id":23,"queue":"q3"} []
[2024-06-25T05:33:40.216080+00:00] q3.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:33:40.217224+00:00] q3.INFO: Processing message with id 24 from queue q3 {"id":24,"queue":"q3"} []
[2024-06-25T05:33:41.217606+00:00] q3.INFO: Message with id 24 from queue q3 was processed successfully {"id":24,"queue":"q3"} []
[2024-06-25T05:33:41.219604+00:00] q3.INFO: Processing message with id 25 from queue q3 {"id":25,"queue":"q3"} []
[2024-06-25T05:33:42.219870+00:00] q3.INFO: Message with id 25 from queue q3 was processed successfully {"id":25,"queue":"q3"} []

Here, processing of messages 8 and 23 failed but I don't see them being reprocessed anywhere in the log, not even at the back like before.

@shieldz80
Copy link
Author

Ah, sorry, my mistake, apparently I also need requeue_on_fail: true in the config, and now it looks good

version: '3'

amqp:
  addr: amqp://admin:123@rabbitmq:5672

rpc:
  listen: tcp://127.0.0.1:6001

server:
  command: php consumer.php
  relay: pipes

jobs:
  pool:
    num_workers: ${RR_WORKERS:-4}
    max_jobs: 10

  consume: [ "q1", "q2", "q3", "q4" ]
  pipelines:
    q1:
      driver: amqp
      config:
        prefetch: ${RR_Q1_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q1-rk
        queue: q1
        durable: true
        requeue_on_fail: true
    q2:
      driver: amqp
      config:
        prefetch: ${RR_Q2_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q2-rk
        queue: q2
        durable: true
        requeue_on_fail: true
    q3:
      driver: amqp
      config:
        prefetch: ${RR_Q3_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q3-rk
        queue: q3
        durable: true
        requeue_on_fail: true
    q4:
      driver: amqp
      config:
        prefetch: ${RR_Q4_PREFETCH:-10}
        consume_all: true
        exchange: main
        exchange_type: direct
        exchange_durable: true
        routing_key: q4-rk
        queue: q4
        durable: true
        requeue_on_fail: true
[2024-06-25T05:54:05.880121+00:00] q1.INFO: Processing message with id 1 from queue q1 {"id":1,"queue":"q1"} []
[2024-06-25T05:54:06.881515+00:00] q1.INFO: Message with id 1 from queue q1 was processed successfully {"id":1,"queue":"q1"} []
[2024-06-25T05:54:06.884101+00:00] q1.INFO: Processing message with id 2 from queue q1 {"id":2,"queue":"q1"} []
[2024-06-25T05:54:07.884481+00:00] q1.INFO: Message with id 2 from queue q1 was processed successfully {"id":2,"queue":"q1"} []
[2024-06-25T05:54:07.886239+00:00] q1.INFO: Processing message with id 3 from queue q1 {"id":3,"queue":"q1"} []
[2024-06-25T05:54:08.886648+00:00] q1.INFO: Message with id 3 from queue q1 was processed successfully {"id":3,"queue":"q1"} []
[2024-06-25T05:54:08.887972+00:00] q1.INFO: Processing message with id 4 from queue q1 {"id":4,"queue":"q1"} []
[2024-06-25T05:54:09.888359+00:00] q1.INFO: Message with id 4 from queue q1 was processed successfully {"id":4,"queue":"q1"} []
[2024-06-25T05:54:09.889725+00:00] q1.INFO: Processing message with id 5 from queue q1 {"id":5,"queue":"q1"} []
[2024-06-25T05:54:10.889894+00:00] q1.INFO: Message with id 5 from queue q1 was processed successfully {"id":5,"queue":"q1"} []
[2024-06-25T05:54:10.890779+00:00] q1.INFO: Processing message with id 6 from queue q1 {"id":6,"queue":"q1"} []
[2024-06-25T05:54:11.890956+00:00] q1.INFO: Message with id 6 from queue q1 was processed successfully {"id":6,"queue":"q1"} []
[2024-06-25T05:54:11.892676+00:00] q1.INFO: Processing message with id 7 from queue q1 {"id":7,"queue":"q1"} []
[2024-06-25T05:54:12.892978+00:00] q1.INFO: Message with id 7 from queue q1 was processed successfully {"id":7,"queue":"q1"} []
[2024-06-25T05:54:12.894760+00:00] q1.INFO: Processing message with id 8 from queue q1 {"id":8,"queue":"q1"} []
[2024-06-25T05:54:13.895215+00:00] q1.INFO: Message with id 8 from queue q1 was processed successfully {"id":8,"queue":"q1"} []
[2024-06-25T05:54:13.897047+00:00] q1.INFO: Processing message with id 9 from queue q1 {"id":9,"queue":"q1"} []
[2024-06-25T05:54:14.897424+00:00] q1.INFO: Message with id 9 from queue q1 was processed successfully {"id":9,"queue":"q1"} []
[2024-06-25T05:54:14.899369+00:00] q1.INFO: Processing message with id 10 from queue q1 {"id":10,"queue":"q1"} []
[2024-06-25T05:54:15.899739+00:00] q1.INFO: Message with id 10 from queue q1 was processed successfully {"id":10,"queue":"q1"} []
[2024-06-25T05:54:15.920888+00:00] q1.INFO: Processing message with id 11 from queue q1 {"id":11,"queue":"q1"} []
[2024-06-25T05:54:16.921557+00:00] q1.INFO: Message with id 11 from queue q1 was processed successfully {"id":11,"queue":"q1"} []
[2024-06-25T05:54:16.922120+00:00] q1.INFO: Processing message with id 12 from queue q1 {"id":12,"queue":"q1"} []
[2024-06-25T05:54:17.922297+00:00] q1.INFO: Message with id 12 from queue q1 was processed successfully {"id":12,"queue":"q1"} []
[2024-06-25T05:54:17.923652+00:00] q1.INFO: Processing message with id 13 from queue q1 {"id":13,"queue":"q1"} []
[2024-06-25T05:54:18.923892+00:00] q1.INFO: Message with id 13 from queue q1 was processed successfully {"id":13,"queue":"q1"} []
[2024-06-25T05:54:18.925309+00:00] q1.INFO: Processing message with id 14 from queue q1 {"id":14,"queue":"q1"} []
[2024-06-25T05:54:19.925642+00:00] q1.INFO: Message with id 14 from queue q1 was processed successfully {"id":14,"queue":"q1"} []
[2024-06-25T05:54:19.927307+00:00] q1.INFO: Processing message with id 15 from queue q1 {"id":15,"queue":"q1"} []
[2024-06-25T05:54:20.927726+00:00] q1.INFO: Message with id 15 from queue q1 was processed successfully {"id":15,"queue":"q1"} []
[2024-06-25T05:54:20.929539+00:00] q1.INFO: Processing message with id 16 from queue q1 {"id":16,"queue":"q1"} []
[2024-06-25T05:54:21.930034+00:00] q1.INFO: Message with id 16 from queue q1 was processed successfully {"id":16,"queue":"q1"} []
[2024-06-25T05:54:21.931781+00:00] q1.INFO: Processing message with id 17 from queue q1 {"id":17,"queue":"q1"} []
[2024-06-25T05:54:22.932127+00:00] q1.INFO: Message with id 17 from queue q1 was processed successfully {"id":17,"queue":"q1"} []
[2024-06-25T05:54:22.933836+00:00] q1.INFO: Processing message with id 18 from queue q1 {"id":18,"queue":"q1"} []
[2024-06-25T05:54:23.934196+00:00] q1.INFO: Message with id 18 from queue q1 was processed successfully {"id":18,"queue":"q1"} []
[2024-06-25T05:54:23.938761+00:00] q1.INFO: Processing message with id 19 from queue q1 {"id":19,"queue":"q1"} []
[2024-06-25T05:54:24.939162+00:00] q1.INFO: Message with id 19 from queue q1 was processed successfully {"id":19,"queue":"q1"} []
[2024-06-25T05:54:24.959832+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-25T05:54:24.960439+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:54:24.961387+00:00] q1.INFO: Processing message with id 20 from queue q1 {"id":20,"queue":"q1"} []
[2024-06-25T05:54:25.961620+00:00] q1.INFO: Message with id 20 from queue q1 was processed successfully {"id":20,"queue":"q1"} []
[2024-06-25T05:54:25.963436+00:00] q1.INFO: Processing message with id 21 from queue q1 {"id":21,"queue":"q1"} []
[2024-06-25T05:54:26.964049+00:00] q1.INFO: Message with id 21 from queue q1 was processed successfully {"id":21,"queue":"q1"} []
[2024-06-25T05:54:26.965626+00:00] q1.INFO: Processing message with id 22 from queue q1 {"id":22,"queue":"q1"} []
[2024-06-25T05:54:27.965937+00:00] q1.INFO: Message with id 22 from queue q1 was processed successfully {"id":22,"queue":"q1"} []
[2024-06-25T05:54:27.967046+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-25T05:54:27.967294+00:00] q1.ERROR: RuntimeException: Processing failed in /home/consumer/app/src/Task/Handler/GenericHandler.php:45 Stack trace: #0 /home/consumer/app/src/Task/Handler/GenericHandler.php(32): Shieldz\ConsumerApp\Task\Handler\GenericHandler->doProcessing(1) #1 /home/consumer/app/consumer.php(39): Shieldz\ConsumerApp\Task\Handler\GenericHandler->process(Object(Spiral\RoadRunner\Jobs\Task\ReceivedTask)) #2 {main} [] []
[2024-06-25T05:54:27.968419+00:00] q1.INFO: Processing message with id 23 from queue q1 {"id":23,"queue":"q1"} []
[2024-06-25T05:54:28.968653+00:00] q1.INFO: Message with id 23 from queue q1 was processed successfully {"id":23,"queue":"q1"} []
[2024-06-25T05:54:28.970063+00:00] q1.INFO: Processing message with id 24 from queue q1 {"id":24,"queue":"q1"} []
[2024-06-25T05:54:29.970398+00:00] q1.INFO: Message with id 24 from queue q1 was processed successfully {"id":24,"queue":"q1"} []
[2024-06-25T05:54:29.971685+00:00] q1.INFO: Processing message with id 25 from queue q1 {"id":25,"queue":"q1"} []
[2024-06-25T05:54:30.972017+00:00] q1.INFO: Message with id 25 from queue q1 was processed successfully {"id":25,"queue":"q1"} []

Here, processing of messages 20 and 23 failed, but they were re-processed in-order as expected.

Awesome! Thanks a lot!

@rustatian
Copy link
Member

rustatian commented Jun 25, 2024

Hey @shieldz80 👋
You won't be needed to set requeue_on_fail after I merge additional patches. Because you will be able to set redelivery if failed per message (from PHP, NACK has additional parameters).

Currently, these parameters are not parsed by RR.

@trin4ik
Copy link

trin4ik commented Nov 21, 2024

hi, thanks for feature, but it's kind of weird.

  1. i use ack instead of complete, ok
  2. i use nack instead of fail, but message not requeued and i dont see ERROR log. i use nack($e, redelivery: true), but message not requeued. i dont understand why (logically). and as i see, nack ignoring modify headers
  3. i use requeue instead of fail, ok, but i still not see ERROR log and see INFO log:
INFO jobs job was re-queued {"message": "ololo", "delay": 10, "requeue": false}

requeue works as i expect it, but "requeue": false in log makes me worry ))

i think:

  1. nack with redelivery: true needs to requeue automatic (or with requeue_on_fail in config)
  2. nack needs to push ERROR log to stdout
  3. nack needs ability to modify headers
  4. requeue needs to set requeue: true in class to avoid confusion

if you agree, i can make PR with this fixes, but i use sqs only.

@rustatian
Copy link
Member

Hey @trin4ik 👋
Please update the PHP jobs package, we recently found a bug with the requeue key mapping.

@trin4ik
Copy link

trin4ik commented Nov 21, 2024

Hey @trin4ik 👋 Please update the PHP jobs package, we recently found a bug with the requeue key mapping.

Yes, now i see
roadrunner-php/jobs#71
this PR requeue message on nack, yes, but:

  • no ERROR message in log. with requeue at least i had
INFO jobs job was re-queued {"message": "ololo", "delay": 10, "requeue": false}
  • ignoring headers modify, like:
try {
  ...
} catch (Exception $e) {
  $task->withHeader('attempts', (int)$task->getHeaderLine('attempts') - 1)
            ->withDelay(10)
            ->nack($e, true);
}

with fail or requeue modify headers works, u can see it:
fail:
https://github.com/roadrunner-php/jobs/blob/9402267cdb8add1a13b66c3f6fab953f13ec9a53/src/Task/ReceivedTask.php#L85
requeue:
https://github.com/roadrunner-php/jobs/blob/9402267cdb8add1a13b66c3f6fab953f13ec9a53/src/Task/ReceivedTask.php#L117
nack:
https://github.com/roadrunner-php/jobs/blob/9402267cdb8add1a13b66c3f6fab953f13ec9a53/src/Task/ReceivedTask.php#L103

maybe iam wrong and nack should not be able to affect headers and for my example i should use requeue while there are attempts and use nack only when they run out ¯\(ツ)

@rustatian
Copy link
Member

Nack is not modifying headers, only Requeue. SQS is significantly reworked and will be updated in the 2024.3, so redelivery, timeouts, almost all SQS functionality is updated (in non BC way).

@trin4ik
Copy link

trin4ik commented Nov 21, 2024

Nack is not modifying headers, only Requeue. SQS is significantly reworked and will be updated in the 2024.3, so redelivery, timeouts, almost all SQS functionality is updated (in non BC way).

ok, will wait ) and while waiting i will use complete/fail, they more obvious (in my case)

@rustatian
Copy link
Member

BTW, after the update, you won't need to use requeue anymore. In SQS, you may use error visibility timeout (new option) with Nack. So, after that timeout, the message would be available in the SQS queue automatically. In general, SQS behavior will be much more, lets say, standard and familiar for those, who use SQS.

@trin4ik
Copy link

trin4ik commented Nov 21, 2024

BTW, after the update, you won't need to use requeue anymore. In SQS, you may use error visibility timeout (new option) with Nack. So, after that timeout, the message would be available in the SQS queue automatically. In general, SQS behavior will be much more, lets say, standard and familiar for those, who use SQS.

sounds great. yes, "nack with timeout (visibility)" its more sqs-like logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-feature-accepted Category: Feature discussed and accepted P-jobs Plugin: Jobs
Projects
Status: ✅ Done
3 participants