Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add projection option constants to trigger pcntl_signal_dispatch #293

Merged
merged 8 commits into from
Jul 1, 2017
16 changes: 16 additions & 0 deletions docs/projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ OPTION_LOCK_TIMEOUT_MS = 'lock_timeout_ms'
Indicates the time (in microseconds) the projector is locked. During this time no other projector with the same name can
be started. A running projector will update the lock timeout on every loop.

OPTION_PCNTL_DISPATCH = 'false'

Enable dispatching of process signals to registered [signal handlers](http://php.net/manual/en/function.pcntl-signal.php) while
the projection is running. You must still register your own signal handler and take according action.
For example to gracefully stop the projection you could do
```
$projection = $projectionManager->createProjection(
'test_projection',
[ Projector::OPTION_PCNTL_DISPATCH => true, ]
);
pcntl_signal(SIGQUIT, function () use ($projection) {
$projection->stop();
});
$projection->run();
```

## Read Model Projections

Projections can also be used to create read models. A read model has to implement `Prooph\EventStore\Projection\ReadModel`.
Expand Down
24 changes: 22 additions & 2 deletions src/Projection/InMemoryEventStoreProjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ final class InMemoryEventStoreProjector implements Projector
*/
private $sleep;

/**
* @var bool
*/
private $triggerPcntlSignalDispatch;

/**
* @var array|null
*/
Expand All @@ -101,8 +106,13 @@ final class InMemoryEventStoreProjector implements Projector
*/
private $streamCreated = false;

public function __construct(EventStore $eventStore, string $name, int $cacheSize, int $sleep)
{
public function __construct(
EventStore $eventStore,
string $name,
int $cacheSize,
int $sleep,
bool $triggerPcntlSignalDispatch = false
) {
if ($cacheSize < 1) {
throw new Exception\InvalidArgumentException('cache size must be a positive integer');
}
Expand All @@ -116,6 +126,7 @@ public function __construct(EventStore $eventStore, string $name, int $cacheSize
$this->cachedStreamNames = new ArrayCache($cacheSize);
$this->sleep = $sleep;
$this->status = ProjectionStatus::IDLE();
$this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch && extension_loaded('pcntl');

while ($eventStore instanceof EventStoreDecorator) {
$eventStore = $eventStore->getInnerEventStore();
Expand Down Expand Up @@ -345,6 +356,8 @@ public function run(bool $keepRunning = true): void
if (0 === $eventCounter) {
usleep($this->sleep);
}

$this->triggerPcntlSignalDispatch();
} while ($keepRunning && ! $this->isStopped);

$this->status = ProjectionStatus::IDLE();
Expand Down Expand Up @@ -494,4 +507,11 @@ private function prepareStreamPositions(): void

$this->streamPositions = array_merge($streamPositions, $this->streamPositions);
}

private function triggerPcntlSignalDispatch(): void
{
if ($this->triggerPcntlSignalDispatch) {
pcntl_signal_dispatch();
}
}
}
18 changes: 17 additions & 1 deletion src/Projection/InMemoryEventStoreReadModelProjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ final class InMemoryEventStoreReadModelProjector implements ReadModelProjector
*/
private $sleep;

/**
* @var bool
*/
private $triggerPcntlSignalDispatch;

/**
* @var array|null
*/
Expand All @@ -115,7 +120,8 @@ public function __construct(
ReadModel $readModel,
int $cacheSize,
int $persistBlockSize,
int $sleep
int $sleep,
bool $triggerPcntlSignalDispatch = false
) {
if ($cacheSize < 1) {
throw new Exception\InvalidArgumentException('cache size must be a positive integer');
Expand All @@ -136,6 +142,7 @@ public function __construct(
$this->readModel = $readModel;
$this->sleep = $sleep;
$this->status = ProjectionStatus::IDLE();
$this->triggerPcntlSignalDispatch = $triggerPcntlSignalDispatch && extension_loaded('pcntl');

while ($eventStore instanceof EventStoreDecorator) {
$eventStore = $eventStore->getInnerEventStore();
Expand Down Expand Up @@ -317,6 +324,8 @@ public function run(bool $keepRunning = true): void
if (0 === $eventCounter) {
usleep($this->sleep);
}

$this->triggerPcntlSignalDispatch();
} while ($keepRunning && ! $this->isStopped);

$this->status = ProjectionStatus::IDLE();
Expand Down Expand Up @@ -495,4 +504,11 @@ private function prepareStreamPositions(): void

$this->streamPositions = array_merge($streamPositions, $this->streamPositions);
}

private function triggerPcntlSignalDispatch(): void
{
if ($this->triggerPcntlSignalDispatch) {
pcntl_signal_dispatch();
}
}
}
6 changes: 4 additions & 2 deletions src/Projection/InMemoryProjectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public function createProjection(
$this->eventStore,
$name,
$options[Projector::OPTION_CACHE_SIZE] ?? Projector::DEFAULT_CACHE_SIZE,
$options[Projector::OPTION_SLEEP] ?? Projector::DEFAULT_SLEEP
$options[Projector::OPTION_SLEEP] ?? Projector::DEFAULT_SLEEP,
$options[Projector::OPTION_PCNTL_DISPATCH] ?? Projector::DEFAULT_PCNTL_DISPATCH
);

if (! isset($this->projectors[$name])) {
Expand All @@ -79,7 +80,8 @@ public function createReadModelProjection(
$readModel,
$options[ReadModelProjector::OPTION_CACHE_SIZE] ?? ReadModelProjector::DEFAULT_CACHE_SIZE,
$options[ReadModelProjector::OPTION_PERSIST_BLOCK_SIZE] ?? ReadModelProjector::DEFAULT_PERSIST_BLOCK_SIZE,
$options[ReadModelProjector::OPTION_SLEEP] ?? ReadModelProjector::DEFAULT_SLEEP
$options[ReadModelProjector::OPTION_SLEEP] ?? ReadModelProjector::DEFAULT_SLEEP,
$options[ReadModelProjector::OPTION_PCNTL_DISPATCH] ?? ReadModelProjector::DEFAULT_PCNTL_DISPATCH
);

if (! isset($this->projectors[$name])) {
Expand Down
2 changes: 2 additions & 0 deletions src/Projection/Projector.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ interface Projector
public const OPTION_SLEEP = 'sleep';
public const OPTION_PERSIST_BLOCK_SIZE = 'persist_block_size';
public const OPTION_LOCK_TIMEOUT_MS = 'lock_timeout_ms';
public const OPTION_PCNTL_DISPATCH = 'trigger_pcntl_dispatch';

public const DEFAULT_CACHE_SIZE = 1000;
public const DEFAULT_SLEEP = 100000;
public const DEFAULT_PERSIST_BLOCK_SIZE = 1000;
public const DEFAULT_LOCK_TIMEOUT_MS = 1000;
public const DEFAULT_PCNTL_DISPATCH = false;

/**
* The callback has to return an array
Expand Down
2 changes: 2 additions & 0 deletions src/Projection/ReadModelProjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ interface ReadModelProjector
public const OPTION_SLEEP = 'sleep';
public const OPTION_PERSIST_BLOCK_SIZE = 'persist_block_size';
public const OPTION_LOCK_TIMEOUT_MS = 'lock_timeout_ms';
public const OPTION_PCNTL_DISPATCH = 'trigger_pcntl_dispatch';

public const DEFAULT_CACHE_SIZE = 1000;
public const DEFAULT_SLEEP = 100000;
public const DEFAULT_PERSIST_BLOCK_SIZE = 1000;
public const DEFAULT_LOCK_TIMEOUT_MS = 1000;
public const DEFAULT_PCNTL_DISPATCH = false;

/**
* The callback has to return an array
Expand Down
36 changes: 35 additions & 1 deletion tests/Projection/InMemoryEventStoreProjectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public function it_throws_exception_when_unknown_event_store_instance_passed():

$eventStore = $this->prophesize(EventStore::class);

new InMemoryEventStoreProjector($eventStore->reveal(), 'test_projection', 10, 10, 2000);
new InMemoryEventStoreProjector($eventStore->reveal(), 'test_projection', 10, 10);
}

/**
Expand Down Expand Up @@ -154,4 +154,38 @@ public function it_throws_exception_when_invalid_sleep_given(): void

new InMemoryEventStoreProjector($this->eventStore, 'test_projection', 1, -1);
}

/**
* @test
*/
public function it_dispatches_pcntl_signals_when_enabled(): void
{
if (! extension_loaded('pcntl')) {
$this->markTestSkipped('The PCNTL extension is not available.');

return;
}

$command = 'php ' . realpath(__DIR__) . '/isolated-projection.php';
$descriptorSpec = [
0 => ['pipe', 'r'],
1 => ['pipe', 'w'],
2 => ['pipe', 'w'],
];
/**
* Created process inherits env variables from this process.
* Script returns with non-standard code SIGUSR1 from the handler and -1 else
*/
$projectionProcess = proc_open($command, $descriptorSpec, $pipes);
$processDetails = proc_get_status($projectionProcess);
sleep(2);
posix_kill($processDetails['pid'], SIGQUIT);
sleep(2);

$processDetails = proc_get_status($projectionProcess);
$this->assertEquals(
SIGUSR1,
$processDetails['exitcode']
);
}
}
34 changes: 34 additions & 0 deletions tests/Projection/InMemoryEventStoreReadModelProjectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,38 @@ public function it_throws_exception_when_invalid_wrapped_event_store_instance_pa

new InMemoryEventStoreReadModelProjector($wrappedEventStore->reveal(), 'test_projection', new ReadModelMock(), 1, 1, 1);
}

/**
* @test
*/
public function it_dispatches_pcntl_signals_when_enabled(): void
{
if (! extension_loaded('pcntl')) {
$this->markTestSkipped('The PCNTL extension is not available.');

return;
}

$command = 'php ' . realpath(__DIR__) . '/isolated-read-model-projection.php';
$descriptorSpec = [
0 => ['pipe', 'r'],
1 => ['pipe', 'w'],
2 => ['pipe', 'w'],
];
/**
* Created process inherits env variables from this process.
* Script returns with non-standard code SIGUSR1 from the handler and -1 else
*/
$projectionProcess = proc_open($command, $descriptorSpec, $pipes);
$processDetails = proc_get_status($projectionProcess);
sleep(2);
posix_kill($processDetails['pid'], SIGQUIT);
sleep(2);

$processDetails = proc_get_status($projectionProcess);
$this->assertEquals(
SIGUSR1,
$processDetails['exitcode']
);
}
}
31 changes: 31 additions & 0 deletions tests/Projection/isolated-projection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

use Prooph\EventStore\InMemoryEventStore;
use Prooph\EventStore\Projection\InMemoryProjectionManager;
use Prooph\EventStore\Projection\Projector;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;

require __DIR__ . '/../../vendor/autoload.php';

$eventStore = new InMemoryEventStore();
$eventStore->create(new Stream(new StreamName('user-123'), new ArrayIterator([])));

$projectionManager = new InMemoryProjectionManager($eventStore);
$projection = $projectionManager->createProjection(
'test_projection',
[
Projector::OPTION_PCNTL_DISPATCH => true,
]
);
pcntl_signal(SIGQUIT, function () use ($projection) {
$projection->stop();
exit(SIGUSR1);
});
$projection
->fromStream('user-123')
->whenAny(function () {
})
->run();
60 changes: 60 additions & 0 deletions tests/Projection/isolated-read-model-projection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

declare(strict_types=1);

use Prooph\EventStore\InMemoryEventStore;
use Prooph\EventStore\Projection\InMemoryProjectionManager;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjector;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;

require __DIR__ . '/../../vendor/autoload.php';

$readModel = new class() implements ReadModel {
public function init(): void
{
}

public function isInitialized(): bool
{
return true;
}

public function reset(): void
{
}

public function delete(): void
{
}

public function stack(string $operation, ...$args): void
{
}

public function persist(): void
{
}
};

$eventStore = new InMemoryEventStore();
$eventStore->create(new Stream(new StreamName('user-123'), new ArrayIterator([])));

$projectionManager = new InMemoryProjectionManager($eventStore);
$projection = $projectionManager->createReadModelProjection(
'test_projection',
$readModel,
[
ReadModelProjector::OPTION_PCNTL_DISPATCH => true,
]
);
pcntl_signal(SIGQUIT, function () use ($projection) {
$projection->stop();
exit(SIGUSR1);
});
$projection
->fromStream('user-123')
->whenAny(function () {
})
->run();