-
-
Notifications
You must be signed in to change notification settings - Fork 414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[💡 FEATURE REQUEST]: preserve message publication order after re-queuing #1941
[💡 FEATURE REQUEST]: preserve message publication order after re-queuing #1941
Comments
Hey @shieldz80 👋 |
Hi, @rustatian, The RR config version: '3'
amqp:
addr: amqp://admin:123@rabbitmq:5672
rpc:
listen: tcp://127.0.0.1:6001
server:
command: php consumer.php
relay: pipes
jobs:
pool:
num_workers: ${RR_WORKERS:-4}
max_jobs: 10
consume: [ "q1", "q2", "q3", "q4" ]
pipelines:
q1:
driver: amqp
config:
prefetch: ${RR_Q1_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q1-rk
queue: q1
durable: true
requeue_on_fail: true
q2:
driver: amqp
config:
prefetch: ${RR_Q2_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q2-rk
queue: q2
durable: true
requeue_on_fail: true
q3:
driver: amqp
config:
prefetch: ${RR_Q3_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q3-rk
queue: q3
durable: true
requeue_on_fail: true
q4:
driver: amqp
config:
prefetch: ${RR_Q4_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q4-rk
queue: q4
durable: true
requeue_on_fail: true RR_WORKERS=4 Processing log for one of the queues
Processing of messages with id 2 and 4 threw an exception and unfortunately the behavior is the same as observed before Let me know if you want me to test anything else. |
Code for the consumer just in case <?php
declare(strict_types=1);
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Psr\Log\LoggerInterface;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Spiral\RoadRunner\Jobs\Consumer;
require realpath(__DIR__ . '/vendor/autoload.php');
$logDir = realpath(__DIR__ . '/var/log');
$consumer = new Consumer();
$loggers = [
$q = 'q1' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q2' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q3' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q4' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
];
while ($task = $consumer->waitTask()) {
try {
$queue = $task->getQueue();
/** @var LoggerInterface $logger */
$logger = $loggers[$queue];
(new GenericHandler($logger))->process($task);
$task->complete();
} catch (Throwable $e) {
$logger->error($e);
$task->fail($e, true);
}
} |
Thanks, @shieldz80 👍 I guess that the problem is in the priority. Are all the messages are already in the RabbitMQ queue, or they're pushed one by one after consume? |
Also, if that's possible, could you please show the code with amqp extension? Where everything was in order. |
When I test, I bring up all the containers, then I execute the command that publishes the 100 messages to the "main" exchange. So they're produced more or less at once, definitely not one by one.
Sure, here it is <?php
declare(strict_types=1);
namespace Shieldz\ConsumerApp\Command;
use AMQPChannel;
use AMQPConnection;
use AMQPEnvelope;
use AMQPQueue;
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Shieldz\ConsumerApp\Task\Task;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Throwable;
use function extension_loaded;
use function sprintf;
use const AMQP_REQUEUE;
use const SIGINT;
use const SIGQUIT;
use const SIGTERM;
#[AsCommand(name: 'consume-messages')]
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
{
private AMQPConnection $connection;
private int $signal = -1;
private bool $shouldStop = false;
public function __construct(private Logger $loggerProto)
{
parent::__construct();
$this->setAmqpConnection();
}
protected function configure(): void
{
$this->addArgument('queueName', InputArgument::REQUIRED);
$this->addOption(name: 'max-jobs', mode: InputOption::VALUE_REQUIRED, default: 10);
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$queueName = $input->getArgument('queueName');
$maxJobs = $input->getOption('max-jobs');
$logger = $this->loggerProto
->withName($queueName)
->pushHandler(
new StreamHandler(sprintf('%s/%s.log', APP_LOG_DIR, $queueName), Level::Debug)
);
$queue = new AMQPQueue(new AMQPChannel($this->connection));
$queue->setName($queueName);
$executedJobs = 0;
while (false === $this->shouldStop) {
$message = $queue->get();
if (!$message instanceof AMQPEnvelope) {
continue;
}
try {
(new GenericHandler($logger))
->process(new Task($message->getBody(), $queue->getName()));
$queue->ack($message->getDeliveryTag());
} catch (Throwable $e) {
$queue->nack($message->getDeliveryTag(), AMQP_REQUEUE);
$logger->error($e);
}
$this->shouldStop = ++$executedJobs >= $maxJobs;
}
return match ($this->signal) {
-1 => Command::SUCCESS,
default => 128 + $this->signal,
};
}
public function getSubscribedSignals(): array
{
return extension_loaded('pcntl') ? [SIGINT, SIGTERM, SIGQUIT] : [];
}
public function handleSignal(int $signal, false|int $previousExitCode = 0): int|false
{
$this->signal = $signal;
$this->shouldStop = true;
return false;
}
private function setAmqpConnection(): void
{
if (isset($this->connection)) {
return;
}
$connection = new AMQPConnection();
$connection->setHost('rabbitmq');
$connection->setLogin('admin');
$connection->setPassword('123');
$connection->connect();
$this->connection = $connection;
}
} The processing code is pretty much the same as in the RR version class GenericHandler
{
public function __construct(private LoggerInterface $logger)
{
}
public function process(Task $task): void
{
$queue = $task->getQueue();
/** @var array{id: int, load: int} $payload */
$payload = json_decode($task->getPayload(), true);
$this->logger->info(
'Processing message with id {id} from queue {queue}',
['id' => $payload['id'], 'queue' => $queue]
);
$this->doProcessing($payload['load']);
$this->logger->info(
'Message with id {id} from queue {queue} was processed successfully',
['id' => $payload['id'], 'queue' => $queue]
);
}
private function doProcessing(int $load): void
{
$throwRandomExceptions = $_ENV['THROW_RANDOM_EXCEPTIONS'] ?? '0' === '1';
if ($throwRandomExceptions && random_int(1, 20) === 5) {
// 5% chance to throw an exception
throw new RuntimeException('Processing failed');
}
// this emulates processing for $load seconds
sleep($load);
}
} Consuming is then started by running |
Hm, yeah, code is just about the same... |
Sure, I'll post the logs soon.
No, all four are used, the container uses s6-overlay to start 4 supervised processes that each consume from one queue Process tree looks like this in the container that uses plain php to consume from rabbitmq $ docker top classic-consumer axf o "pid,user,args"
PID USER COMMAND
3754558 root \_ /package/admin/s6/command/s6-svscan -d4 -- /run/service
3754664 root \_ s6-supervise s6-linux-init-shutdownd
3754666 root | \_ /package/admin/s6-linux-init/command/s6-linux-init-shutdownd -d3 -c /run/s6/basedir -g 3000 -C -B
3754672 root \_ s6-supervise q1-consumer
3754718 shieldz | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q1
3754673 root \_ s6-supervise s6rc-oneshot-runner
3754687 root | \_ /package/admin/s6/command/s6-ipcserverd -1 -- /package/admin/s6/command/s6-ipcserver-access -v0 -E -l0 -i data/rules -- /package/admin/s6/command/s6-sudod -t 30000 -- /package/admin/s6-rc/command/s6-rc-oneshot-run -l ../.. --
3754674 root \_ s6-supervise q3-consumer
3754717 shieldz | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q3
3754675 root \_ s6-supervise q4-consumer
3754715 shieldz | \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q4
3754676 root \_ s6-supervise s6rc-fdholder
3754677 root \_ s6-supervise q2-consumer
3754719 shieldz \_ php /home/consumer/app/bin/console.php consume-messages --max-jobs=10 q2 |
I mean, for the RR. You also used all 4 queues, right? |
Here are logs (this is what the container outputted during the processing of messages) If you wanted other logs let me know ... |
Yes, I produce 25 messages for each queue, and all are consumed in my setup. I only showed you processing logs for 1 queue to avoid cluttering the comments |
Ok, debug logs helped me to figure out what's happening. |
So exposing that multiply option could alter this behavior? |
Actually, no. We should add a new option to |
@msmakouz Here is the updated protocol after our discussion to handle this case:
@shieldz80 Thanks again for the report. We're going to update our internal RR<->PHP protocol to handle raw NACK's. In your case, should be able to just NACK the message to put it back to the same queue at the same position. Keep in mind, however, that pipelines are consumed in parallel, not synchronously 1-by-1. |
Great! Thank you! |
Hey @shieldz80 👋 |
Hi @rustatian, Thanks! Looking forward to test it. Is there some way to know when the PHP SDK is updated? |
Hey @shieldz80 , |
Hi @rustatian , @msmakouz I saw that the Here's also what I changed in my code. The consumer code now looks like this to use <?php
declare(strict_types=1);
use Monolog\Handler\StreamHandler;
use Monolog\Level;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use Psr\Log\LoggerInterface;
use Shieldz\ConsumerApp\Task\Handler\GenericHandler;
use Spiral\RoadRunner\Jobs\Consumer;
require realpath(__DIR__ . '/vendor/autoload.php');
$logDir = realpath(__DIR__ . '/var/log');
$consumer = new Consumer();
$loggers = [
$q = 'q1' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q2' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q3' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
$q = 'q4' => (new Logger($q))
->pushHandler(new StreamHandler(sprintf('%s/%s.log', $logDir, $q), Level::Debug))
->pushProcessor(new PsrLogMessageProcessor()),
];
while ($task = $consumer->waitTask()) {
try {
$queue = $task->getQueue();
/** @var LoggerInterface $logger */
$logger = $loggers[$queue];
(new GenericHandler($logger))->process($task);
$task->ack();
} catch (Throwable $e) {
$logger->error($e);
$task->nack($e, redelivery: true);
}
} And I've updated the {
"name": "shieldz/rr-consumer-app",
"type": "project",
"autoload": {
"psr-4": {
"Shieldz\\ConsumerApp\\": "src/"
}
},
"require": {
"php": "^8.3",
"ext-sockets": "*",
"spiral/roadrunner-jobs": "4.x-dev",
"monolog/monolog": "^3.6"
}
} The output I get in my application for one of the queues, looks like this
Here, processing of messages 8 and 23 failed but I don't see them being reprocessed anywhere in the log, not even at the back like before. |
Ah, sorry, my mistake, apparently I also need version: '3'
amqp:
addr: amqp://admin:123@rabbitmq:5672
rpc:
listen: tcp://127.0.0.1:6001
server:
command: php consumer.php
relay: pipes
jobs:
pool:
num_workers: ${RR_WORKERS:-4}
max_jobs: 10
consume: [ "q1", "q2", "q3", "q4" ]
pipelines:
q1:
driver: amqp
config:
prefetch: ${RR_Q1_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q1-rk
queue: q1
durable: true
requeue_on_fail: true
q2:
driver: amqp
config:
prefetch: ${RR_Q2_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q2-rk
queue: q2
durable: true
requeue_on_fail: true
q3:
driver: amqp
config:
prefetch: ${RR_Q3_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q3-rk
queue: q3
durable: true
requeue_on_fail: true
q4:
driver: amqp
config:
prefetch: ${RR_Q4_PREFETCH:-10}
consume_all: true
exchange: main
exchange_type: direct
exchange_durable: true
routing_key: q4-rk
queue: q4
durable: true
requeue_on_fail: true
Here, processing of messages 20 and 23 failed, but they were re-processed in-order as expected. Awesome! Thanks a lot! |
Hey @shieldz80 👋 Currently, these parameters are not parsed by RR. |
hi, thanks for feature, but it's kind of weird.
i think:
if you agree, i can make PR with this fixes, but i use sqs only. |
Hey @trin4ik 👋 |
Yes, now i see
with maybe iam wrong and |
Nack is not modifying headers, only Requeue. SQS is significantly reworked and will be updated in the 2024.3, so redelivery, timeouts, almost all SQS functionality is updated (in non BC way). |
ok, will wait ) and while waiting i will use |
BTW, after the update, you won't need to use |
sounds great. yes, "nack with timeout (visibility)" its more sqs-like logic. |
Plugin
JOBS
I have an idea!
Hi,
First of let me just say thank you for the awesome project!
As discussed in https://github.com/orgs/roadrunner-server/discussions/1932 when using RoadRunner with RabbitMQ, if, during the processing of a message, an error occurs and the message is re-queued it will end up at the back of the queue. This breaks the expected behavior of RabbitMQ which states that (https://www.rabbitmq.com/docs/semantics#ordering)
A sample log of this issue presented below
Here we can see that during the processing of message with id 21 an exception was thrown and the message is re-queued and processed again last.
If possible, the expected behavior of RabbitMQ should be preserved.
The text was updated successfully, but these errors were encountered: