Skip to content
This repository has been archived by the owner on Jul 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #20 from juriansluiman/sqs-batch-delete
Browse files Browse the repository at this point in the history
Allow to splice batch delete
  • Loading branch information
bakura10 committed Oct 2, 2014
2 parents d4e3318 + 3a98618 commit 93c0058
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# 0.4.0

- [BC] Bump SlmQueue dependency to 0.4. Please read SlmQueue CHANGELOG for further information
- `batchDelete` now also automatically splice jobs if you submit more than 10 jobs.

# 0.3.2

Expand Down
4 changes: 2 additions & 2 deletions src/SlmQueueSqs/Controller/SqsWorkerController.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public function processAction()
'wait_time_seconds' => isset($params['waitTime']) ? $params['waitTime'] : null
);

$queue = $options['queue'];
$queue = $this->queuePluginManager->get($options['queue']);

try {
$result = $this->worker->processQueue($queue, array_filter($options));
Expand All @@ -37,7 +37,7 @@ public function processAction()

return sprintf(
"Finished worker for queue '%s' with %s jobs\n",
$queue,
$options['queue'],
$result
);
}
Expand Down
10 changes: 10 additions & 0 deletions src/SlmQueueSqs/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ public function batchPop(array $options = array())
*/
public function batchDelete(array $jobs)
{
// SQS can only handle up to 10 jobs, so if we have more jobs, we handle them in slices
if (count($jobs) > 10) {
do {
$splicedJobs = array_splice($jobs, 0, 10);
$this->batchDelete($splicedJobs);
} while (count($splicedJobs) >= 10);

return;
}

$parameters = array(
'QueueUrl' => $this->queueOptions->getQueueUrl(),
'Entries' => array()
Expand Down
14 changes: 11 additions & 3 deletions tests/SlmQueueSqsTest/Controller/SqsWorkerControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@ class SqsWorkerControllerTest extends TestCase
{
public function testCorrectlyCountJobs()
{
$worker = $this->getMock('SlmQueue\Worker\WorkerInterface');
$controller = new SqsWorkerController($worker);
$queue = $this->getMock('SlmQueue\Queue\QueueInterface');
$worker = $this->getMock('SlmQueue\Worker\WorkerInterface');
$pluginManager = $this->getMock('SlmQueue\Queue\QueuePluginManager', array(), array(), '', false);

$pluginManager->expects($this->once())
->method('get')
->with('newsletter')
->will($this->returnValue($queue));

$controller = new SqsWorkerController($worker, $pluginManager);

$routeMatch = new RouteMatch(array('queue' => 'newsletter'));
$controller->getEvent()->setRouteMatch($routeMatch);

$worker->expects($this->once())
->method('processQueue')
->with('newsletter')
->with($queue)
->will($this->returnValue(1));

$result = $controller->processAction();
Expand Down
89 changes: 76 additions & 13 deletions tests/SlmQueueSqsTest/Queue/SqsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function setUp()
{
$this->sqsClient = $this->getMock(
'Aws\Sqs\SqsClient',
array('getQueueUrl', 'sendMessage', 'sendMessageBatch', 'receiveMessage'),
array('getQueueUrl', 'sendMessage', 'sendMessageBatch', 'deleteMessageBatch', 'receiveMessage'),
array(),
'',
false
Expand Down Expand Up @@ -153,20 +153,20 @@ public function testSetMetadataWhenMultipleJobsArePushed()
$this->assertEquals(md5('baz'), $jobs[1]->getMetadata('md5'));
}

public function testCanSpliceJobsIfLimitIsExceeded()
public function testCanPushSpliceJobsIfLimitIsExceeded()
{
$jobs = array(
new Asset\SimpleJob(array('pos' => 1)),
new Asset\SimpleJob(array('pos' => 2)),
new Asset\SimpleJob(array('pos' => 3)),
new Asset\SimpleJob(array('pos' => 4)),
new Asset\SimpleJob(array('pos' => 5)),
new Asset\SimpleJob(array('pos' => 6)),
new Asset\SimpleJob(array('pos' => 7)),
new Asset\SimpleJob(array('pos' => 8)),
new Asset\SimpleJob(array('pos' => 9)),
new Asset\SimpleJob(array('pos' => 10)),
new Asset\SimpleJob(array('pos' => 11))
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob()
);

$firstSuccessful = array();
Expand Down Expand Up @@ -216,6 +216,69 @@ public function testCanSpliceJobsIfLimitIsExceeded()
$this->assertCount(11, $jobs);
}

public function testCanDeleteSpliceJobsIfLimitIsExceeded()
{
$jobs = array(
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob(),
new Asset\SimpleJob()
);

$firstSuccessful = array();

for ($i = 0 ; $i != 10 ; ++$i) {
$firstSuccessful[] = array(
'Id' => $i,
'MessageId' => $i + 1,
'MD5OfMessageBody' => md5('foo')
);
}

$firstResult = new ResourceModel(array(
'Successful' => $firstSuccessful
));

$secondResult = new ResourceModel(array(
'Successful' => array(
0 => array(
'Id' => 0,
'MessageId' => 1,
'MD5OfMessageBody' => md5 ('fpp')
)
)
));

$self = $this;

$this->sqsClient->expects($this->at(0))
->method('deleteMessageBatch')
->with($this->callback(function($parameters) use ($self) {
$self->assertCount(10, $parameters['Entries']);
return true;
}))
->will($this->returnValue($firstResult));

$this->sqsClient->expects($this->at(1))
->method('deleteMessageBatch')
->with($this->callback(function($parameters) use ($self) {
$self->assertCount(1, $parameters['Entries']);
return true;
}))
->will($this->returnValue($secondResult));

$this->sqsQueue->batchDelete($jobs);

$this->assertCount(11, $jobs);
}

public function testMetadataIsPopped()
{
$this->sqsClient->expects($this->once())
Expand Down
5 changes: 1 addition & 4 deletions tests/SlmQueueSqsTest/Worker/SqsWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ public function testAssertJobIsDeletedIfNoExceptionIsThrown()
$job->expects($this->once())->method('execute');
$queue->expects($this->once())->method('delete')->with($job);

$worker = new SqsWorker(
$this->getMock('SlmQueue\Queue\QueuePluginManager'),
new WorkerOptions()
);
$worker = new SqsWorker(new WorkerOptions());

$worker->processJob($job, $queue);
}
Expand Down

0 comments on commit 93c0058

Please sign in to comment.