Skip to content

Commit

Permalink
allow to mock workflows without a running server and worker
Browse files Browse the repository at this point in the history
  • Loading branch information
cappuc committed Oct 13, 2022
1 parent d9090f1 commit a62f420
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 9 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog

# v0.4
## Unreleased
- Allow to mock workflows without a running temporal server and worker

## v0.4
- Added support for `Eloquent` models serialization/deserialization
- Updated configuration file with eloquent serialization options

Expand Down
62 changes: 54 additions & 8 deletions src/Testing/TemporalMockerCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use Closure;
use Illuminate\Support\Arr;
use Illuminate\Support\Collection;
use Spiral\Goridge\Exception\RelayException;
use Spiral\Goridge\RPC\RPC;
use Spiral\RoadRunner\KeyValue\Factory;
use Spiral\RoadRunner\KeyValue\StorageInterface;
Expand All @@ -17,9 +19,14 @@ final class TemporalMockerCache

private readonly StorageInterface $cache;

private readonly Collection $localCache;

private bool $localOnly = false;

public function __construct(string $host, string $cacheName)
{
$this->cache = (new Factory(RPC::create($host)))->select($cacheName);
$this->localCache = Collection::make();
}

public static function create(): self
Expand All @@ -32,20 +39,33 @@ public static function create(): self

public function clear(): void
{
$this->cache->clear();
$this->cacheProxy(fn () => $this->cache->clear());
}

public function saveWorkflowMock(string $workflowName, mixed $value, ?string $taskQueue = null): void
{
$this->cache->set(sprintf('workflow::%s', $workflowName), [
$key = sprintf('workflow::%s', $workflowName);

$payload = [
'mock' => $value ?? 'null',
'taskQueue' => $taskQueue,
]);
];

$this->cacheProxy(
fn () => $this->cache->set($key, $payload),
fn () => $this->localCache->put($key, $payload)
);
}

public function getWorkflowMock(string $workflowName, string $taskQueue): ?Closure
{
$value = $this->cache->get(sprintf('workflow::%s', $workflowName));
$key = sprintf('workflow::%s', $workflowName);

$value = $this->cacheProxy(
fn () => $this->cache->get($key),
fn () => $this->localCache->get($key)
);

if (! is_array($value)) {
return null;
}
Expand Down Expand Up @@ -97,22 +117,33 @@ public function getActivityMock(string $activityName, string $taskQueue): ?Closu

public function recordWorkflowDispatch(string $workflowName, string $taskQueue, array $args): void
{
$cacheKey = sprintf('workflow_dispatch::%s', $workflowName);
$key = sprintf('workflow_dispatch::%s', $workflowName);

/** @var array $dispatches */
$dispatches = $this->cache->get($cacheKey, []);
$dispatches = $this->cacheProxy(
fn () => $this->cache->get($key, []),
fn () => $this->localCache->get($key, [])
);

$dispatches[] = [
'taskQueue' => $taskQueue,
'args' => $args,
];

$this->cache->set($cacheKey, $dispatches);
$this->cacheProxy(
fn () => $this->cache->set($key, $dispatches),
fn () => $this->localCache->put($key, $dispatches)
);
}

public function getWorkflowDispatches(string $workflowName): array
{
return $this->cache->get(sprintf('workflow_dispatch::%s', $workflowName), []);
$key = sprintf('workflow_dispatch::%s', $workflowName);

return $this->cacheProxy(
fn () => $this->cache->get($key, []),
fn () => $this->localCache->get($key, [])
);
}

public function recordActivityDispatch(string $activityName, string $taskQueue, array $args): void
Expand All @@ -134,4 +165,19 @@ public function getActivityDispatches(string $activityName): array
{
return $this->cache->get(sprintf('activity_dispatch::%s', $activityName), []);
}

private function cacheProxy(Closure $action, ?Closure $fallback = null): mixed
{
if ($this->localOnly) {
return $fallback?->__invoke();
}

try {
return retry(3, $action);
} catch (RelayException) {
$this->localOnly = true;

return $fallback?->__invoke();
}
}
}

0 comments on commit a62f420

Please sign in to comment.