Skip to content

Commit

Permalink
Implement updatePassword method to allow long-running connections to …
Browse files Browse the repository at this point in the history
…reauthenticate
  • Loading branch information
kratkyzobak committed Feb 9, 2024
1 parent 804dd61 commit bb7d914
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 0 deletions.
11 changes: 11 additions & 0 deletions PhpAmqpLib/Connection/AbstractConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ public function reconnect()
$this->connect();
}

/**
* Sets new password for this connection.
* Should be used for RabbitMQ's OAuth2 authentication backend to update current token.
* @param string $password
*/
public function updatePassword($password): void
{
// send new secret to broker
$this->x_update_secret($password);
}

/**
* Cloning will use the old properties to make a new connection to the same server
*/
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ please refer to the [official RabbitMQ tutorials](http://www.rabbitmq.com/tutori
- `amqp_consumer_fanout_{1,2}.php` and `amqp_publisher_fanout.php`: demos fanout exchanges with named queues.
- `amqp_consumer_pcntl_heartbeat.php`: demos signal-based heartbeat sender usage.
- `basic_get.php`: demos obtaining messages from the queues by using the _basic get_ AMQP call.
- `oauth2_authorization.php`: demo for use of OAuth 2.0 authorization using the _update-secret_ AMQP call.

## Multiple hosts connections ##

Expand Down
44 changes: 44 additions & 0 deletions demo/oauth2_authorization.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

include(__DIR__ . '/config.php');

use PhpAmqpLib\Connection\AMQPStreamConnection;


require_once __DIR__ . '/../tests/generate_jwt_keys.php';

// you should implement your own logic to provide tokens to authorize against RabbitMQ broker from your OAuth server
function getNextOauth2Token() {
static $tokens = [
JWT_TOKEN_1, // connect using this token
JWT_TOKEN_2, // upgrade on first refresh
];

if (empty($tokens)) {
return "invalidToken"; // fail on 2nd refresh
}

return array_shift($tokens);
}

$connection = new AMQPStreamConnection(HOST, PORT, posix_getpid(), getNextOauth2Token(), VHOST);

// for workers running longer than the token expiration time, YOU HAVE TO REFRESH TOKEN proactively
// we will use pcntl alarm for this purpose - we are refreshing every 5 seconds

pcntl_async_signals(true);
pcntl_signal(SIGALRM, function () use ($connection) {
echo "Refreshing token...\n";
$connection->updatePassword(getNextOauth2Token()); // this will fail on 2nd attempt - see getNextOauth2Token
pcntl_alarm(5);
}, true);
pcntl_alarm(5);

register_shutdown_function(function () use ($connection) {
$connection->close();
});

while (true) {
echo "Connection is ", ($connection->isConnected() ? "connected" : "not connected"), "\n";
sleep(1);
}
40 changes: 40 additions & 0 deletions tests/Functional/Connection/OAuth2ConnectionTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace Functional\Connection;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Tests\Functional\AbstractConnectionTest;

/**
* @group connection
*/
class OAuth2ConnectionTest extends AbstractConnectionTest
{
/**
* @test
* @covers \PhpAmqpLib\Connection\AbstractConnection::connection_update_secret_ok()
* @covers \PhpAmqpLib\Connection\AbstractConnection::x_update_secret()
* @covers \PhpAmqpLib\Connection\AbstractConnection::updatePassword()
*/
public function update_password()
{
$conn = new AMQPStreamConnection(HOST, PORT, 'oauth', JWT_TOKEN_1, '/', false, 'PLAIN', null, 'en_US', 1);
$conn->updatePassword(JWT_TOKEN_2);
self::assertTrue($conn->isConnected());
}

/**
* @test
* @covers \PhpAmqpLib\Connection\AbstractConnection::x_update_secret()
* @covers \PhpAmqpLib\Connection\AbstractConnection::updatePassword()
*/
public function update_password_invalid()
{
$this->expectException(AMQPConnectionClosedException::class);
$this->expectExceptionMessage('New secret was refused');

$conn = new AMQPStreamConnection(HOST, PORT, 'oauth', JWT_TOKEN_1, '/', false, 'PLAIN', null, 'en_US', 1);
$conn->updatePassword('invalidJwt');
}
}

0 comments on commit bb7d914

Please sign in to comment.