In this guide you'll learn how to create a message handler.
A handler gets called once the message is dispatched by the handle_messages
middleware. Handlers do the actual processing of the message.
As most guides, this guide is also built upon the Plugin base guide, but you don't necessarily need that. It will use an example message, so if you don't know how to add a custom message yet, have a look at our guide about Adding a message to queue. Furthermore, registering classes or services to the DI container is also not explained here, but it's covered in our guide about Dependency injection, so having this open in another tab won't hurt.
First, we have to create a new class which we will name SmsHandler
in this example. Our handler has to extend the Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler
class and implement the method handle
. To specify which methods should be handled by a given handler we implement the static method getHandledMessages
and return a reference to the MessageClasses our handler should handle. We can also define multiple handlers for the same message. To register a handler, we have to tag it with the messenger.message_handler
tag.
{% code title="/src/MessageQueue/Handler/SmsHandler.php" %}
<?php declare(strict_types=1);
namespace Swag\BasicExample\MessageQueue\Handler;
use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
use Swag\BasicExample\MessageQueue\Message\SmsNotification;
class SmsHandler extends AbstractMessageHandler
{
/**
* @param SmsNotification $message
*/
public function handle($message): void
{
// send our sms here
}
public static function getHandledMessages(): iterable
{
return [SmsNotification::class];
}
}
{% endcode %}
There is a console command to start a worker that will receive incoming messages from your transport and dispatch them. Simply start the worker with the following command:
{% code title="" %}
bin\console messenger:consume-messages default
{% endcode %}
Where default
is the transport you want to consume message from. There is also an API-Route that lets you consume messages for a given transport. Just post to the route /api/_action/message-queue/consume
and define the transport from which you want to consume:
{% code title="" %}
{
"receiver": "default"
}
{% endcode %}
The receiver will consume messages for 2 seconds and then you get the count of the handled messages in the response:
{% code title="" %}
{
"handledMessages": 15
}
{% endcode %}
Per default there is an admin-worker that will periodically ping the endpoint to consume messages from the administration. This feature is intended for development and hosting environments where a more complex setup is not feasible. However, you really should use the cli-worker in production setups, because the admin-worker just consumes messages if an administration user is logged in.
The recommended way to consume messages is through the cli command. You can configure the command to run a certain amount of time or to stop if it exceeds a certain memory limit like:
{% code title="" %}
bin/console messenger:consume-messages default --time-limit=60
{% endcode %}
{% code title="" %}
bin/console messenger:consume-messages default --memory-limit=128M
{% endcode %}
For more information about the command and its configuration use the -h
option:
{% code title="" %}
bin/console messenger:consume-messages -h
{% endcode %}
You should use the limit option to periodically restart the worker processes, because of the memory leak issues of long running php processes. To automatically start the processes again after they stopped because of exceeding the given limits you can use something like upstart or supervisor. Alternatively you can configure a CronJob
that runs the command periodically.
If you have configured the cli-worker, you can turn off the admin worker in your shopware.yaml
.
{% code title="/src/Core/Framework/Resources/config/packages/shopware.yaml" %}
shopware:
admin_worker:
enable_admin_worker: false
{% endcode %}
Note: This will disable the AdminWorker completely and you have to configure the cli-worker for scheduled tasks as well.
The message bus is used to dispatch your messages to your registered handlers. While dispatching your message it loops through the configured middleware for that bus. The message bus used inside Shopware can be found under the service tag messenger.bus.shopware
. It is mandatory to use this message bus if your messages should be handled inside Shopware. However if you want to send messages to external systems you can define your custom message bus for that.
You can configure an array of buses and define one default bus in your framework.yaml
.
{% code title="/src/Core/Framework/Resources/config/packages/framework.yaml" %}
framework:
messenger:
default_bus: messenger.bus.shopware
buses:
messenger.bus.shopware:
{% endcode %}
For more information on this check the Symfony docs.
A transport is responsible for communicating with your 3rd party message broker. You can configure multiple transports and route messages to multiple or different transports. Supported are all transports that are either supported by Symfony itself, or by Enqueue. If you don't configure a transport, messages will be processed synchronously like in the Symfony event system.
You can configure an amqp transport directly in your framework.yaml
, but since Shopware is integrated with Enqueue, it is best practice to configure your transport in your enqueue.yaml
and simply tell Symfony to use your enqueue transports.
In your enqueue.yaml
you simply configure your transports as follows:
{% code title="/src/Core/Framework/Resources/config/packages/enqueue.yaml" %}
enqueue:
default: # the name of your transport
transport: ~ # the transport configuration
client: ~ # the client configuration
{% endcode %}
In a simple setup you only need to set the transport to a valid DSN like:
{% code title="/src/Core/Framework/Resources/config/packages/enqueue.yaml" %}
enqueue:
default:
transport: "file://path/to/my/file"
client: ~
{% endcode %}
Notice that for different schemas (e.g. file://
, amqp://)
different enqueue transports are required. Shopware just ships with the Filesystem
transport, so you need to require the transport you want to use.
For more information on this check the enqueue docs.
To tell Symfony to use your transports from your enqueue.yaml
simply use:
{% code title="/src/Core/Framework/Resources/config/packages/enqueue.yaml" %}
framework:
messenger:
transports:
default: # the name for the transport used by symfony
enqueue: //default # 'enqueue://' followed by the name of your transport used in 'enqueue.yaml'
{% endcode %}
You can route messages to different transports. For that, just configure your routing in the framework.yaml
.
{% code title="/src/" %}
framework:
messenger:
transports:
default: enqueue://default
another_transport: enqueue://another_transport
routing:
'Swag\BasicExample\MessageQueue\Message\SmsNotification': another_transport
'Swag\BasicExample\MessageQueue\Message\AnotherExampleNotification': [default, another_transport]
'*': default
{% endcode %}
You can route messages by their classname and use the asterisk as a fallback for all other messages. If you specify a list of transports the messages will be routed to all of them. For more information on this check the Symfony docs.
The admin-worker can be configured or disabled in the general shopware.yml
configuration. If you want to use the admin worker you have to specify each transport, that previously was configured. The poll interval is the time in seconds that the admin-worker polls messages from the queue. After the poll-interval is over the request terminates and the administration initiates a new request.
{% code title="/src/Core/Framework/Resources/config/packages/shopware.yaml" %}
shopware:
admin_worker:
enable_admin_worker: true
poll_interval: 30
transports: ["default"]
{% endcode %}
Now that you know how to add a message handler and configure a bus for it, you may want to add a custom middleware for your bus. To do this, head over to our "Add middleware" guide:
{% page-ref page="add-middleware.md" %}