From dbc50e8061cd5ee9b4079e93bef9d08d23670dc5 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 19 Oct 2023 00:11:48 +0400 Subject: [PATCH 1/3] Add interceptors example with: - request logger - activity attributed options applier --- README.md | 2 + app/.rr.yaml | 8 ++- app/src/FileProcessing/README.md | 2 +- app/src/Interceptors/.rr.yaml | 14 ++++ app/src/Interceptors/Activity/Sleeper.php | 36 ++++++++++ .../Interceptors/Attribute/ActivityOption.php | 16 +++++ .../Attribute/StartToCloseTimeout.php | 23 ++++++ app/src/Interceptors/ExecuteCommand.php | 51 +++++++++++++ .../ActivityAttributesInterceptor.php | 62 ++++++++++++++++ .../Interceptor/RequestLoggerInterceptor.php | 38 ++++++++++ app/src/Interceptors/README.md | 11 +++ .../TestActivityAttributesInterceptor.php | 72 +++++++++++++++++++ app/src/Interceptors/worker.php | 32 +++++++++ 13 files changed, 364 insertions(+), 3 deletions(-) create mode 100644 app/src/Interceptors/.rr.yaml create mode 100644 app/src/Interceptors/Activity/Sleeper.php create mode 100644 app/src/Interceptors/Attribute/ActivityOption.php create mode 100644 app/src/Interceptors/Attribute/StartToCloseTimeout.php create mode 100644 app/src/Interceptors/ExecuteCommand.php create mode 100644 app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php create mode 100644 app/src/Interceptors/Interceptor/RequestLoggerInterceptor.php create mode 100644 app/src/Interceptors/README.md create mode 100644 app/src/Interceptors/Workflow/TestActivityAttributesInterceptor.php create mode 100644 app/src/Interceptors/worker.php diff --git a/README.md b/README.md index 7ed6f53..fee453a 100644 --- a/README.md +++ b/README.md @@ -161,4 +161,6 @@ The following samples demonstrate some of the more complex aspects associated wi - **[Subscription](https://github.com/temporalio/samples-php/tree/master/app/src/Subscription)**: Demonstrates a long-running process associated with a user ID. The process charges the user once every 30 days after a one month free trial period. +- **[Interceptors](https://github.com/temporalio/samples-php/tree/master/app/src/Interceptors)**: Demonstrates how to use Workflow and Activity interceptors to implement custom logic. + diff --git a/app/.rr.yaml b/app/.rr.yaml index ccfbe97..d0d9c56 100644 --- a/app/.rr.yaml +++ b/app/.rr.yaml @@ -1,4 +1,4 @@ -version: "2.7" +version: "3" rpc: listen: tcp://127.0.0.1:6001 @@ -11,6 +11,10 @@ temporal: activities: num_workers: 4 +service: + interceptors: + command: "./rr serve -c ./src/Interceptors/.rr.yaml" + logs: - level: debug + level: info mode: development diff --git a/app/src/FileProcessing/README.md b/app/src/FileProcessing/README.md index 8667c46..53730cf 100644 --- a/app/src/FileProcessing/README.md +++ b/app/src/FileProcessing/README.md @@ -1,6 +1,6 @@ # FileProcessing sample -This sample demonstrates how to how to use Task routing. +This sample demonstrates how to use Task routing. This Workflow downloads a file, processes it, and uploads the result to a destination. Any worker can pick up the first Activity. diff --git a/app/src/Interceptors/.rr.yaml b/app/src/Interceptors/.rr.yaml new file mode 100644 index 0000000..30a9f61 --- /dev/null +++ b/app/src/Interceptors/.rr.yaml @@ -0,0 +1,14 @@ +version: "3" + +server: + command: "php worker.php" + env: + - TEMPORAL_CLI_ADDRESS: temporal:7233 + +temporal: + address: "temporal:7233" + activities: + num_workers: 2 + +logs: + level: info diff --git a/app/src/Interceptors/Activity/Sleeper.php b/app/src/Interceptors/Activity/Sleeper.php new file mode 100644 index 0000000..8bdc796 --- /dev/null +++ b/app/src/Interceptors/Activity/Sleeper.php @@ -0,0 +1,36 @@ +workflowClient->newWorkflowStub( + TestActivityAttributesInterceptor::class, + WorkflowOptions::new() + ->withTaskQueue('interceptors') + ->withWorkflowExecutionTimeout(CarbonInterval::minute()) + ); + + $output->writeln("Starting Interceptors.TestActivityAttributesInterceptor... "); + + $run = $this->workflowClient->start($workflow); + + $output->writeln( + \sprintf( + 'Started: WorkflowID=%s, RunID=%s', + $run->getExecution()->getID(), + $run->getExecution()->getRunID(), + ), + ); + + $output->writeln(\sprintf("Result:\n%s", \print_r($run->getResult(), true))); + + return self::SUCCESS; + } +} \ No newline at end of file diff --git a/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php b/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php new file mode 100644 index 0000000..4e188d1 --- /dev/null +++ b/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php @@ -0,0 +1,62 @@ +method === null) { + return $next($input); + } + + $options = $input->options; + + foreach ($this->iterateOptions($input->method) as $attribute) { + if ($attribute instanceof Attribute\StartToCloseTimeout) { + \error_log(\sprintf('Redeclare timeout of %s to %s', $input->type, $attribute->timeout)); + $options = $options->withStartToCloseTimeout($attribute->timeout); + } + } + + return $next($input->with(options: $options)); + } + + /** + * @return iterable + */ + private function iterateOptions(\ReflectionMethod $method): iterable + { + $class = $method->getDeclaringClass(); + foreach ($class->getAttributes(Attribute\ActivityOption::class, ReflectionAttribute::IS_INSTANCEOF) as $attr) { + yield $attr->newInstance(); + } + + foreach ($method->getAttributes(Attribute\ActivityOption::class, ReflectionAttribute::IS_INSTANCEOF) as $attr) { + yield $attr->newInstance(); + } + } +} diff --git a/app/src/Interceptors/Interceptor/RequestLoggerInterceptor.php b/app/src/Interceptors/Interceptor/RequestLoggerInterceptor.php new file mode 100644 index 0000000..e1fe42e --- /dev/null +++ b/app/src/Interceptors/Interceptor/RequestLoggerInterceptor.php @@ -0,0 +1,38 @@ +logger->info( + \sprintf('Sending request %s', $request->getName()), + [ + 'headers' => \iterator_to_array($request->getHeader()), + 'options' => $request->getOptions(), + ] + ); + + return $next($request); + } +} diff --git a/app/src/Interceptors/README.md b/app/src/Interceptors/README.md new file mode 100644 index 0000000..47a4820 --- /dev/null +++ b/app/src/Interceptors/README.md @@ -0,0 +1,11 @@ +# Interceptors sample + +This sample demonstrates how you can use Interceptors. + +There are few interceptors: +- RequestLoggerInterceptor - logs all internal requests from the Workflow into the RoadRunner log. +- ActivityAttributesInterceptor - reads [ActivityOption](./Attribute/ActivityOption.php) attributes and applies them to the Activity options. + +```bash +php ./app/app.php interceptors +``` diff --git a/app/src/Interceptors/Workflow/TestActivityAttributesInterceptor.php b/app/src/Interceptors/Workflow/TestActivityAttributesInterceptor.php new file mode 100644 index 0000000..9e31ce6 --- /dev/null +++ b/app/src/Interceptors/Workflow/TestActivityAttributesInterceptor.php @@ -0,0 +1,72 @@ +sleeper = Workflow::newActivityStub( + Sleeper::class, + ActivityOptions::new() + ->withStartToCloseTimeout(10) + ->withRetryOptions( + RetryOptions::new()->withMaximumAttempts(1) + ) + ); + } + + #[Workflow\WorkflowMethod('Interceptors.TestActivityAttributesInterceptor')] + public function execute() + { + // Sleep for 1 second. + yield $this->sleeper->sleep(1); + + /** + * Sleep for 7 seconds. Should fail if the {@see ActivityAttributesInterceptor} is working because there + * is {@see StartToCloseTimeout} attribute on the {@see Sleeper} class with 5-seconds timeout. + */ + try { + yield $this->sleeper->sleep(7); + throw new ApplicationFailure("ActivityAttributesInterceptor doesn't work on the class", 'TEST', true); + } catch (ActivityFailure) { + // OK + } + + + /** + * Sleep for 3 seconds. Should fail if the {@see ActivityAttributesInterceptor} is working because there + * is {@see StartToCloseTimeout} attribute on the {@see Sleeper::sleep2()} method with 2-seconds timeout. + */ + try { + yield $this->sleeper->sleep2(3); + throw new ApplicationFailure("ActivityAttributesInterceptor doesn't work on the method", 'TEST', true); + } catch (ActivityFailure) { + // OK + } + + return "OK"; + } +} diff --git a/app/src/Interceptors/worker.php b/app/src/Interceptors/worker.php new file mode 100644 index 0000000..15e3acc --- /dev/null +++ b/app/src/Interceptors/worker.php @@ -0,0 +1,32 @@ +newWorker(taskQueue: 'interceptors', interceptorProvider: new SimplePipelineProvider([ + new RequestLoggerInterceptor(new \Temporal\SampleUtils\Logger()), + new ActivityAttributesInterceptor() +])) + ->registerWorkflowTypes(TestActivityAttributesInterceptor::class) + ->registerActivityImplementations(new Sleeper()); + +$factory->run(); + From 71522ee93ec2fd68d98c720707eb4f6a9cdbe84c Mon Sep 17 00:00:00 2001 From: Aleksei Gagarin Date: Wed, 15 Nov 2023 23:55:24 +0400 Subject: [PATCH 2/3] Update log message for StartToCloseTimeout attribute Co-authored-by: Spencer Judge --- .../Interceptors/Interceptor/ActivityAttributesInterceptor.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php b/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php index 4e188d1..c0c894c 100644 --- a/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php +++ b/app/src/Interceptors/Interceptor/ActivityAttributesInterceptor.php @@ -37,7 +37,7 @@ public function executeActivity(ExecuteActivityInput $input, callable $next): Pr foreach ($this->iterateOptions($input->method) as $attribute) { if ($attribute instanceof Attribute\StartToCloseTimeout) { - \error_log(\sprintf('Redeclare timeout of %s to %s', $input->type, $attribute->timeout)); + \error_log(\sprintf('Redeclare start_to_close timeout of %s to %s', $input->type, $attribute->timeout)); $options = $options->withStartToCloseTimeout($attribute->timeout); } } From 69e5262214bfe4df09e1ed7da6f16c5c6631bc63 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Thu, 16 Nov 2023 00:03:49 +0400 Subject: [PATCH 3/3] Update composer.json: up temporal/sdk version to 2.7 --- app/composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/composer.json b/app/composer.json index afa1a22..46bdc90 100644 --- a/app/composer.json +++ b/app/composer.json @@ -1,7 +1,8 @@ { "minimum-stability": "beta", + "prefer-stable": true, "require": { - "temporal/sdk": ">=2.6.0", + "temporal/sdk": ">=2.7.0", "spiral/tokenizer": ">=2.7" }, "autoload": {