Skip to content

Commit

Permalink
DP-460 Upgrade to PHP 8.1 / Laravel 9
Browse files Browse the repository at this point in the history
  • Loading branch information
daniilly committed Feb 27, 2023
1 parent 7d8a25d commit 4072aae
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 40 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
],
"require": {
"dreamfactory/df-pubsub": "~0.2",
"php-amqplib/php-amqplib": "2.7.*"
"php-amqplib/php-amqplib": "^3.1.1"
},
"autoload": {
"psr-4": {
Expand Down
65 changes: 33 additions & 32 deletions src/Components/AMQPClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use ServiceManager;
use Cache;
use Log;
use \Illuminate\Support\Arr;

class AMQPClient implements MessageQueueInterface
{
Expand Down Expand Up @@ -110,31 +111,31 @@ public function subscribe(array $data)
{
try {
$channel = $this->setupChannel($data, static::MODE_SUB);
$consumerTag = array_get($data, 'consumer_tag', '');
$noLocal = array_get($data, 'no_local', false);
$noAck = array_get($data, 'no_ack', false);
$exclusive = array_get($data, 'exclusive', false);
$consumerTag = Arr::get($data, 'consumer_tag', '');
$noLocal = Arr::get($data, 'no_local', false);
$noAck = Arr::get($data, 'no_ack', false);
$exclusive = Arr::get($data, 'exclusive', false);
$noWait = array_get_or($data, ['nowait', 'no_wait'], false);
$callback = function ($msg) use ($data){
Log::debug("[AMQP] Message received: " . $msg->body);
$service = array_get($data, 'service');
$service = Arr::get($data, 'service');

if (is_array($service) && $msg->body !== Subscribe::TERMINATOR) {
Log::debug('[AMQP] Triggering service: ' . json_encode($service, JSON_UNESCAPED_SLASHES));
// Retrieve service information
$endpoint = trim(array_get($service, 'endpoint', ''), '/');
$endpoint = trim(Arr::get($service, 'endpoint', ''), '/');
if (empty($endpoint)) {
throw new AMQPInvalidArgumentException('No service endpoint provided for consumer task.');
}
$endpoint = str_replace('api/v2/', '', $endpoint);
$endpointArray = explode('/', $endpoint);
$serviceName = array_get($endpointArray, 0);
$serviceName = Arr::get($endpointArray, 0);
array_shift($endpointArray);
$resource = implode('/', $endpointArray);
$verb = strtoupper(array_get($service, 'verb', array_get($service, 'method', Verbs::POST)));
$params = array_get($service, 'parameter', array_get($service, 'parameters', []));
$header = array_get($service, 'header', array_get($service, 'headers', []));
$payload = array_get($service, 'payload', []);
$verb = strtoupper(Arr::get($service, 'verb', Arr::get($service, 'method', Verbs::POST)));
$params = Arr::get($service, 'parameter', Arr::get($service, 'parameters', []));
$header = Arr::get($service, 'header', Arr::get($service, 'headers', []));
$payload = Arr::get($service, 'payload', []);
$payload['message'] = $msg->body;

/** @var \DreamFactory\Core\Utility\ServiceResponse $rs */
Expand All @@ -154,8 +155,8 @@ public function subscribe(array $data)
$msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
}
};
$ticket = array_get($data, 'ticket');
$arguments = array_get($data, 'arguments', []);
$ticket = Arr::get($data, 'ticket');
$arguments = Arr::get($data, 'arguments', []);

$channel->basic_consume(
$this->queueName,
Expand Down Expand Up @@ -198,7 +199,7 @@ public function subscribe(array $data)
protected function setupChannel(array $data, $mode)
{
$channel = $this->getChannel(array_get_or($data, ['channel', 'channel_id']));
$exchange = array_get($data, 'exchange', '');
$exchange = Arr::get($data, 'exchange', '');
if (!empty($exchange)) {
$this->exchangeName = $this->declareExchange($channel, $exchange);
}
Expand All @@ -213,7 +214,7 @@ protected function setupChannel(array $data, $mode)
$this->queueName = $queue;
if (empty($this->queueName)) {
if (is_array($routingKey)) {
$routingKey = array_get($routingKey, 0);
$routingKey = Arr::get($routingKey, 0);
}
if (empty($routingKey)) {
throw new InternalServerErrorException('No routing key provided for exchange that is NOT of type ' .
Expand All @@ -236,11 +237,11 @@ protected function setupChannel(array $data, $mode)
$channel->queue_bind($this->queueName, $this->exchangeName, $routingKey);
}
}
$qos = array_get($data, 'qos');
$qos = Arr::get($data, 'qos');
if (!empty($qos) && is_array($qos)) {
$prefetchSize = array_get($qos, 'prefetch_size');
$prefetchCount = array_get($qos, 'prefetch_count');
$aGlobal = array_get($qos, 'a_global');
$prefetchSize = Arr::get($qos, 'prefetch_size');
$prefetchCount = Arr::get($qos, 'prefetch_count');
$aGlobal = Arr::get($qos, 'a_global');
$channel->basic_qos($prefetchSize, $prefetchCount, $aGlobal);
}
}
Expand Down Expand Up @@ -270,14 +271,14 @@ protected function declareQueue(&$channel, $queue)
$ticket = null;

if (is_array($queue)) {
$name = array_get($queue, 'name', '');
$passive = array_get($queue, 'passive', $passive);
$durable = array_get($queue, 'durable', $durable);
$exclusive = array_get($queue, 'exclusive', $exclusive);
$autoDelete = array_get($queue, 'auto_delete', $autoDelete);
$name = Arr::get($queue, 'name', '');
$passive = Arr::get($queue, 'passive', $passive);
$durable = Arr::get($queue, 'durable', $durable);
$exclusive = Arr::get($queue, 'exclusive', $exclusive);
$autoDelete = Arr::get($queue, 'auto_delete', $autoDelete);
$noWait = array_get_or($queue, ['nowait', 'no_wait'], $noWait);
$arguments = array_get_or($queue, ['argument', 'arguments'], $arguments);
$ticket = array_get($queue, 'ticket', $ticket);
$ticket = Arr::get($queue, 'ticket', $ticket);

if (empty($name)) {
throw new InternalServerErrorException('No queue name found in queue data.');
Expand Down Expand Up @@ -357,15 +358,15 @@ protected function declareExchange(&$channel, $exchange)
$ticket = null;

if (is_array($exchange)) {
$name = array_get($exchange, 'name');
$type = array_get($exchange, 'type', $type);
$passive = array_get($exchange, 'passive', $passive);
$durable = array_get($exchange, 'durable', $durable);
$autoDelete = array_get($exchange, 'auto_delete', $autoDelete);
$internal = array_get($exchange, 'internal', $internal);
$name = Arr::get($exchange, 'name');
$type = Arr::get($exchange, 'type', $type);
$passive = Arr::get($exchange, 'passive', $passive);
$durable = Arr::get($exchange, 'durable', $durable);
$autoDelete = Arr::get($exchange, 'auto_delete', $autoDelete);
$internal = Arr::get($exchange, 'internal', $internal);
$noWait = array_get_or($exchange, ['nowait', 'no_wait'], $noWait);
$arguments = array_get_or($exchange, ['argument', 'arguments'], $arguments);
$ticket = array_get($exchange, 'ticket', $ticket);
$ticket = Arr::get($exchange, 'ticket', $ticket);

if (empty($name) || empty($type)) {
throw new InternalServerErrorException('No exchange name and/or type found in exchange data.');
Expand Down
4 changes: 2 additions & 2 deletions src/Resources/Sub.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ protected function handleDELETE()
if ($job->attempts === 0) {
DB::table('jobs')->delete($job->id);
} else {
$obj = unserialize(array_get(json_decode($job->payload, true), 'data.command'));
$obj = unserialize(Arr::get(json_decode($job->payload, true), 'data.command'));
$payload = $obj->getPayload();
$channel = array_get_or($payload, ['channel', 'channel_id']);
$exchange = array_get($payload, 'exchange');
$exchange = Arr::get($payload, 'exchange');
$queue = array_get_or($payload, ['queue', 'topic']);
$routingKey = array_get_or(
$payload,
Expand Down
11 changes: 6 additions & 5 deletions src/Services/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use DreamFactory\Core\AMQP\Resources\Pub;
use DreamFactory\Core\AMQP\Resources\Sub;
use DreamFactory\Core\PubSub\Services\PubSub;
use \Illuminate\Support\Arr;

class AMQP extends PubSub
{
Expand Down Expand Up @@ -38,11 +39,11 @@ protected function setClient($config)
throw new InternalServerErrorException('No service configuration found for AMQP service.');
}

$host = array_get($config, 'host');
$port = array_get($config, 'port', 5672);
$username = array_get($config, 'username');
$password = array_get($config, 'password');
$vhost = array_get($config, 'vhost');
$host = Arr::get($config, 'host');
$port = Arr::get($config, 'port', 5672);
$username = Arr::get($config, 'username');
$password = Arr::get($config, 'password');
$vhost = Arr::get($config, 'vhost');

$this->client = new AMQPClient($host, $username, $password, $port, $vhost);
}
Expand Down

0 comments on commit 4072aae

Please sign in to comment.