diff --git a/README.md b/README.md
index cc8b0441..4d59d255 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Ruby worker for Temporal
+# Ruby SDK for Temporal
[![Coverage Status](https://coveralls.io/repos/github/coinbase/temporal-ruby/badge.svg?branch=master)](https://coveralls.io/github/coinbase/temporal-ruby?branch=master)
@@ -6,7 +6,7 @@
A pure Ruby library for defining and running Temporal workflows and activities.
-To find more about Temporal please visit .
+To find more about Temporal itself please visit .
## Getting Started
diff --git a/examples/.env b/examples/.env
new file mode 100644
index 00000000..d15cca49
--- /dev/null
+++ b/examples/.env
@@ -0,0 +1 @@
+COMPOSE_PROJECT_NAME=temporal-ruby-examples
diff --git a/examples/bin/worker b/examples/bin/worker
index 65828dfa..14752ede 100755
--- a/examples/bin/worker
+++ b/examples/bin/worker
@@ -31,6 +31,7 @@ worker.register_workflow(HelloWorldWorkflow)
worker.register_workflow(LocalHelloWorldWorkflow)
worker.register_workflow(LongWorkflow)
worker.register_workflow(LoopWorkflow)
+worker.register_workflow(MetadataWorkflow)
worker.register_workflow(ParentWorkflow)
worker.register_workflow(ProcessFileWorkflow)
worker.register_workflow(QuickTimeoutWorkflow)
@@ -39,9 +40,11 @@ worker.register_workflow(ReleaseWorkflow)
worker.register_workflow(ResultWorkflow)
worker.register_workflow(SerialHelloWorldWorkflow)
worker.register_workflow(SideEffectWorkflow)
+worker.register_workflow(SignalWithStartWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
+worker.register_workflow(WaitForWorkflow)
worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
diff --git a/examples/spec/integration/activity_cancellation_spec.rb b/examples/spec/integration/activity_cancellation_spec.rb
new file mode 100644
index 00000000..0d551a19
--- /dev/null
+++ b/examples/spec/integration/activity_cancellation_spec.rb
@@ -0,0 +1,39 @@
+require 'workflows/long_workflow'
+
+describe 'Activity cancellation' do
+ let(:workflow_id) { SecureRandom.uuid }
+
+ it 'cancels a running activity' do
+ run_id = Temporal.start_workflow(LongWorkflow, options: { workflow_id: workflow_id })
+
+ # Signal workflow after starting, allowing it to schedule the first activity
+ sleep 0.5
+ Temporal.signal_workflow(LongWorkflow, :CANCEL, workflow_id, run_id)
+
+ result = Temporal.await_workflow_result(
+ LongWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+
+ expect(result).to be_a(LongRunningActivity::Canceled)
+ expect(result.message).to eq('cancel activity request received')
+ end
+
+ it 'cancels a non-started activity' do
+ # Workflow is started with a signal which will cancel an activity before it has started
+ run_id = Temporal.start_workflow(LongWorkflow, options: {
+ workflow_id: workflow_id,
+ signal_name: :CANCEL
+ })
+
+ result = Temporal.await_workflow_result(
+ LongWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+
+ expect(result).to be_a(Temporal::ActivityCanceled)
+ expect(result.message).to eq('ACTIVITY_ID_NOT_STARTED')
+ end
+end
diff --git a/examples/spec/integration/await_workflow_result_spec.rb b/examples/spec/integration/await_workflow_result_spec.rb
index 3b4f1e77..bf48b7b1 100644
--- a/examples/spec/integration/await_workflow_result_spec.rb
+++ b/examples/spec/integration/await_workflow_result_spec.rb
@@ -95,7 +95,9 @@
run_id = Temporal.start_workflow(
LoopWorkflow,
2, # it continues as new if this arg is > 1
- { options: { workflow_id: workflow_id } },
+ options: {
+ workflow_id: workflow_id,
+ },
)
expect do
diff --git a/examples/spec/integration/continue_as_new_spec.rb b/examples/spec/integration/continue_as_new_spec.rb
new file mode 100644
index 00000000..74b0771e
--- /dev/null
+++ b/examples/spec/integration/continue_as_new_spec.rb
@@ -0,0 +1,49 @@
+require 'workflows/loop_workflow'
+
+describe LoopWorkflow do
+ it 'workflow continues as new into a new run' do
+ workflow_id = SecureRandom.uuid
+ memo = {
+ 'my-memo' => 'foo',
+ }
+ headers = {
+ 'my-header' => 'bar',
+ }
+ run_id = Temporal.start_workflow(
+ LoopWorkflow,
+ 2, # it continues as new if this arg is > 1
+ options: {
+ workflow_id: workflow_id,
+ memo: memo,
+ headers: headers,
+ },
+ )
+
+ # First run will throw because it continued as new
+ next_run_id = nil
+ expect do
+ Temporal.await_workflow_result(
+ LoopWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+ end.to raise_error(Temporal::WorkflowRunContinuedAsNew) do |error|
+ next_run_id = error.new_run_id
+ end
+
+ expect(next_run_id).to_not eq(nil)
+
+ # Second run will not throw because it returns rather than continues as new.
+ final_result = Temporal.await_workflow_result(
+ LoopWorkflow,
+ workflow_id: workflow_id,
+ run_id: next_run_id,
+ )
+
+ expect(final_result[:count]).to eq(1)
+
+ # memo and headers should be copied to the next run automatically
+ expect(final_result[:memo]).to eq(memo)
+ expect(final_result[:headers]).to eq(headers)
+ end
+end
diff --git a/examples/spec/integration/describe_namespace_spec.rb b/examples/spec/integration/describe_namespace_spec.rb
new file mode 100644
index 00000000..86501f7b
--- /dev/null
+++ b/examples/spec/integration/describe_namespace_spec.rb
@@ -0,0 +1,16 @@
+require 'temporal/errors'
+
+describe 'Temporal.describe_namespace' do
+ it 'returns a value' do
+ description = 'Namespace for temporal-ruby integration test'
+ begin
+ Temporal.register_namespace('a_test_namespace', description)
+ rescue Temporal::NamespaceAlreadyExistsFailure
+ end
+ result = Temporal.describe_namespace('a_test_namespace')
+ expect(result).to be_an_instance_of(Temporal::Api::WorkflowService::V1::DescribeNamespaceResponse)
+ expect(result.namespace_info.name).to eq('a_test_namespace')
+ expect(result.namespace_info.state).to eq(:NAMESPACE_STATE_REGISTERED)
+ expect(result.namespace_info.description).to eq(description)
+ end
+end
diff --git a/examples/spec/integration/metadata_workflow_spec.rb b/examples/spec/integration/metadata_workflow_spec.rb
new file mode 100644
index 00000000..ac828e01
--- /dev/null
+++ b/examples/spec/integration/metadata_workflow_spec.rb
@@ -0,0 +1,91 @@
+require 'workflows/metadata_workflow'
+
+describe MetadataWorkflow do
+ subject { described_class }
+
+ it 'gets task queue from running workflow' do
+ workflow_id = 'task-queue-' + SecureRandom.uuid
+ run_id = Temporal.start_workflow(
+ subject,
+ options: { workflow_id: workflow_id }
+ )
+
+ actual_result = Temporal.await_workflow_result(
+ subject,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+
+ expect(actual_result.task_queue).to eq(Temporal.configuration.task_queue)
+ end
+
+ it 'workflow can retrieve its headers' do
+ workflow_id = 'header_test_wf-' + SecureRandom.uuid
+
+ run_id = Temporal.start_workflow(
+ MetadataWorkflow,
+ options: {
+ workflow_id: workflow_id,
+ headers: { 'foo' => 'bar' },
+ }
+ )
+
+ actual_result = Temporal.await_workflow_result(
+ MetadataWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+ expect(actual_result.headers).to eq({ 'foo' => 'bar' })
+ end
+
+ it 'workflow can retrieve its run started at' do
+ workflow_id = 'started_at_test_wf-' + SecureRandom.uuid
+
+ run_id = Temporal.start_workflow(
+ MetadataWorkflow,
+ options: { workflow_id: workflow_id }
+ )
+
+ actual_result = Temporal.await_workflow_result(
+ MetadataWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+ expect(Time.now - actual_result.run_started_at).to be_between(0, 30)
+ end
+
+ it 'gets memo from workflow execution info' do
+ workflow_id = 'memo_execution_test_wf-' + SecureRandom.uuid
+ run_id = Temporal.start_workflow(subject, options: { workflow_id: workflow_id, memo: { 'foo' => 'bar' } })
+
+ actual_result = Temporal.await_workflow_result(
+ subject,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+ expect(actual_result.memo['foo']).to eq('bar')
+
+ expect(Temporal.fetch_workflow_execution_info(
+ 'ruby-samples', workflow_id, nil
+ ).memo).to eq({ 'foo' => 'bar' })
+ end
+
+ it 'gets memo from workflow context with no memo' do
+ workflow_id = 'memo_context_no_memo_test_wf-' + SecureRandom.uuid
+
+ run_id = Temporal.start_workflow(
+ subject,
+ options: { workflow_id: workflow_id }
+ )
+
+ actual_result = Temporal.await_workflow_result(
+ subject,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+ expect(actual_result.memo).to eq({})
+ expect(Temporal.fetch_workflow_execution_info(
+ 'ruby-samples', workflow_id, nil
+ ).memo).to eq({})
+ end
+end
diff --git a/examples/spec/integration/signal_with_start_spec.rb b/examples/spec/integration/signal_with_start_spec.rb
new file mode 100644
index 00000000..4af4f6f1
--- /dev/null
+++ b/examples/spec/integration/signal_with_start_spec.rb
@@ -0,0 +1,75 @@
+require 'workflows/signal_with_start_workflow'
+
+describe 'signal with start' do
+
+ it 'signals at workflow start time' do
+ workflow_id = SecureRandom.uuid
+ run_id = Temporal.start_workflow(
+ SignalWithStartWorkflow,
+ 'signal_name',
+ 0.1,
+ options: {
+ workflow_id: workflow_id,
+ signal_name: 'signal_name',
+ signal_input: 'expected value',
+ }
+ )
+
+ result = Temporal.await_workflow_result(
+ SignalWithStartWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+
+ expect(result).to eq('expected value') # the workflow should return the signal value
+ end
+
+ it 'signals at workflow start time with name only' do
+ workflow_id = SecureRandom.uuid
+ run_id = Temporal.start_workflow(
+ SignalWithStartWorkflow,
+ 'signal_name',
+ 0.1,
+ options: {
+ workflow_id: workflow_id,
+ signal_name: 'signal_name',
+ }
+ )
+
+ result = Temporal.await_workflow_result(
+ SignalWithStartWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+
+ expect(result).to eq(nil) # the workflow should return the signal value
+ end
+
+ it 'does not launch a new workflow when signaling a running workflow through signal_with_start' do
+ workflow_id = SecureRandom.uuid
+ run_id = Temporal.start_workflow(
+ SignalWithStartWorkflow,
+ 'signal_name',
+ 10,
+ options: {
+ workflow_id: workflow_id,
+ signal_name: 'signal_name',
+ signal_input: 'expected value',
+ }
+ )
+
+ second_run_id = Temporal.start_workflow(
+ SignalWithStartWorkflow,
+ 'signal_name',
+ 0.1,
+ options: {
+ workflow_id: workflow_id,
+ signal_name: 'signal_name',
+ signal_input: 'expected value',
+ }
+ )
+
+ # If the run ids are the same, then we didn't start a new workflow
+ expect(second_run_id).to eq(run_id)
+ end
+end
diff --git a/examples/spec/integration/wait_for_workflow_spec.rb b/examples/spec/integration/wait_for_workflow_spec.rb
new file mode 100644
index 00000000..d5feeee6
--- /dev/null
+++ b/examples/spec/integration/wait_for_workflow_spec.rb
@@ -0,0 +1,28 @@
+require 'workflows/wait_for_workflow'
+
+describe WaitForWorkflow do
+
+ it 'signals at workflow start time' do
+ workflow_id = SecureRandom.uuid
+ run_id = Temporal.start_workflow(
+ WaitForWorkflow,
+ 10, # number of echo activities to run
+ 2, # max activity parallelism
+ 'signal_name',
+ options: { workflow_id: workflow_id }
+ )
+
+ Temporal.signal_workflow(WaitForWorkflow, 'signal_name', workflow_id, run_id)
+
+ result = Temporal.await_workflow_result(
+ WaitForWorkflow,
+ workflow_id: workflow_id,
+ run_id: run_id,
+ )
+
+ expect(result.length).to eq(3)
+ expect(result[:signal]).to eq(true)
+ expect(result[:timer]).to eq(true)
+ expect(result[:activity]).to eq(true)
+ end
+end
\ No newline at end of file
diff --git a/examples/workflows/long_workflow.rb b/examples/workflows/long_workflow.rb
index 3682fad9..c4b4682f 100644
--- a/examples/workflows/long_workflow.rb
+++ b/examples/workflows/long_workflow.rb
@@ -9,6 +9,6 @@ def execute(cycles = 10, interval = 1)
future.cancel
end
- future.wait
+ future.get
end
end
diff --git a/examples/workflows/loop_workflow.rb b/examples/workflows/loop_workflow.rb
index 5b1f5bfd..b99408f4 100644
--- a/examples/workflows/loop_workflow.rb
+++ b/examples/workflows/loop_workflow.rb
@@ -8,6 +8,10 @@ def execute(count)
return workflow.continue_as_new(count - 1)
end
- return count
+ return {
+ count: count,
+ memo: workflow.metadata.memo,
+ headers: workflow.metadata.headers,
+ }
end
end
diff --git a/examples/workflows/metadata_workflow.rb b/examples/workflows/metadata_workflow.rb
new file mode 100644
index 00000000..62f61703
--- /dev/null
+++ b/examples/workflows/metadata_workflow.rb
@@ -0,0 +1,5 @@
+class MetadataWorkflow < Temporal::Workflow
+ def execute
+ workflow.metadata
+ end
+end
diff --git a/examples/workflows/signal_with_start_workflow.rb b/examples/workflows/signal_with_start_workflow.rb
new file mode 100644
index 00000000..dbcb186a
--- /dev/null
+++ b/examples/workflows/signal_with_start_workflow.rb
@@ -0,0 +1,16 @@
+class SignalWithStartWorkflow < Temporal::Workflow
+
+ def execute(expected_signal, sleep_for)
+ received = 'no signal received'
+
+ workflow.on_signal do |signal, input|
+ if signal == expected_signal
+ received = input
+ end
+ end
+
+ # Do something to get descheduled so the signal handler has a chance to run
+ workflow.sleep(sleep_for)
+ received
+ end
+end
diff --git a/examples/workflows/wait_for_workflow.rb b/examples/workflows/wait_for_workflow.rb
new file mode 100644
index 00000000..226ac9c4
--- /dev/null
+++ b/examples/workflows/wait_for_workflow.rb
@@ -0,0 +1,80 @@
+require 'activities/echo_activity'
+require 'activities/long_running_activity'
+
+# This example workflow exercises all three conditions that can change state that is being
+# awaited upon: activity completion, sleep completion, signal receieved.
+class WaitForWorkflow < Temporal::Workflow
+ def execute(total_echos, max_echos_at_once, expected_signal)
+ signals_received = {}
+
+ workflow.on_signal do |signal, input|
+ signals_received[signal] = input
+ end
+
+ workflow.wait_for do
+ workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
+ signals_received.key?(expected_signal)
+ end
+
+ # Run an activity but with a max time limit by starting a timer. This activity
+ # will not complete before the timer, which may result in a failed activity task after the
+ # workflow is completed.
+ long_running_future = LongRunningActivity.execute(15, 0.1)
+ timeout_timer = workflow.start_timer(1)
+ workflow.wait_for(timeout_timer, long_running_future)
+
+ timer_beat_activity = timeout_timer.finished? && !long_running_future.finished?
+
+ # This should not wait further. The first future has already finished, and therefore
+ # the second one should not be awaited upon.
+ long_timeout_timer = workflow.start_timer(15)
+ workflow.wait_for(timeout_timer, long_timeout_timer)
+ raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?
+
+ block_called = false
+ workflow.wait_for(timeout_timer) do
+ # This should never be called because the timeout_timer future was already
+ # finished before the wait was even called.
+ block_called = true
+ end
+ raise 'Block should not have been called' if block_called
+
+ workflow.wait_for(long_timeout_timer) do
+ # This condition will immediately be true and not result in any waiting or dispatching
+ true
+ end
+ raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?
+
+ activity_futures = {}
+ echos_completed = 0
+
+ total_echos.times do |i|
+ workflow.wait_for do
+ workflow.logger.info("Activities in flight #{activity_futures.length}")
+ # Pause workflow until the number of active activity futures is less than 2. This
+ # will throttle new activities from being started, guaranteeing that only two of these
+ # activities are running at once.
+ activity_futures.length < max_echos_at_once
+ end
+
+ future = EchoActivity.execute("hi #{i}")
+ activity_futures[i] = future
+
+ future.done do
+ activity_futures.delete(i)
+ echos_completed += 1
+ end
+ end
+
+ workflow.wait_for do
+ workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}")
+ activity_futures.empty?
+ end
+
+ {
+ signal: signals_received.key?(expected_signal),
+ timer: timer_beat_activity,
+ activity: echos_completed == total_echos
+ }
+ end
+end
diff --git a/lib/temporal.rb b/lib/temporal.rb
index e580192f..f02d6ec9 100644
--- a/lib/temporal.rb
+++ b/lib/temporal.rb
@@ -17,6 +17,7 @@ module Temporal
:start_workflow,
:schedule_workflow,
:register_namespace,
+ :describe_namespace,
:signal_workflow,
:await_workflow_result,
:reset_workflow,
diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb
index c07eba7e..4157f0f3 100644
--- a/lib/temporal/activity/poller.rb
+++ b/lib/temporal/activity/poller.rb
@@ -78,7 +78,7 @@ def poll_for_task
rescue StandardError => error
Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect })
- Temporal::ErrorHandler.handle(error)
+ Temporal::ErrorHandler.handle(error, config)
nil
end
diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb
index 173a2bc9..6ad9cefd 100644
--- a/lib/temporal/activity/task_processor.rb
+++ b/lib/temporal/activity/task_processor.rb
@@ -14,7 +14,7 @@ class TaskProcessor
def initialize(task, namespace, activity_lookup, middleware_chain, config)
@task = task
@namespace = namespace
- @metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace)
+ @metadata = Metadata.generate_activity_metadata(task, namespace)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
@@ -41,7 +41,7 @@ def process
# Do not complete asynchronous activities, these should be completed manually
respond_completed(result) unless context.async?
rescue StandardError, ScriptError => error
- Temporal::ErrorHandler.handle(error, metadata: metadata)
+ Temporal::ErrorHandler.handle(error, config, metadata: metadata)
respond_failed(error)
ensure
@@ -76,7 +76,7 @@ def respond_completed(result)
rescue StandardError => error
Temporal.logger.error("Unable to complete Activity", metadata.to_h.merge(error: error.inspect))
- Temporal::ErrorHandler.handle(error, metadata: metadata)
+ Temporal::ErrorHandler.handle(error, config, metadata: metadata)
end
def respond_failed(error)
@@ -90,7 +90,7 @@ def respond_failed(error)
rescue StandardError => error
Temporal.logger.error("Unable to fail Activity task", metadata.to_h.merge(error: error.inspect))
- Temporal::ErrorHandler.handle(error, metadata: metadata)
+ Temporal::ErrorHandler.handle(error, config, metadata: metadata)
end
end
end
diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb
index d2380855..2fcea794 100644
--- a/lib/temporal/client.rb
+++ b/lib/temporal/client.rb
@@ -13,32 +13,95 @@ def initialize(config)
@config = config
end
- def start_workflow(workflow, *input, **args)
- options = args.delete(:options) || {}
+ # Start a workflow with an optional signal
+ #
+ # If options[:signal_name] is specified, Temporal will atomically start a new workflow and
+ # signal it or signal a running workflow (matching a specified options[:workflow_id])
+ #
+ # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class
+ # is passed, its config (namespace, task_queue, timeouts, etc) will be used
+ # @param input [any] arguments to be passed to workflow's #execute method
+ # @param args [Hash] keyword arguments to be passed to workflow's #execute method
+ # @param options [Hash, nil] optional overrides
+ # @option options [String] :workflow_id
+ # @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY
+ # @option options [String] :name workflow name
+ # @option options [String] :namespace
+ # @option options [String] :task_queue
+ # @option options [String] :signal_name corresponds to the 'signal' argument to signal_workflow. Required if
+ # options[:signal_input] is specified.
+ # @option options [String, Array, nil] :signal_input corresponds to the 'input' argument to signal_workflow
+ # @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
+ # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
+ # @option options [Hash] :headers
+ #
+ # @return [String] workflow's run ID
+ def start_workflow(workflow, *input, options: {}, **args)
input << args unless args.empty?
+ signal_name = options.delete(:signal_name)
+ signal_input = options.delete(:signal_input)
+
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
workflow_id = options[:workflow_id] || SecureRandom.uuid
- response = connection.start_workflow_execution(
- namespace: execution_options.namespace,
- workflow_id: workflow_id,
- workflow_name: execution_options.name,
- task_queue: execution_options.task_queue,
- input: input,
- execution_timeout: execution_options.timeouts[:execution],
- # If unspecified, individual runs should have the full time for the execution (which includes retries).
- run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution],
- task_timeout: execution_options.timeouts[:task],
- workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
- headers: execution_options.headers
- )
+ if signal_name.nil? && signal_input.nil?
+ response = connection.start_workflow_execution(
+ namespace: execution_options.namespace,
+ workflow_id: workflow_id,
+ workflow_name: execution_options.name,
+ task_queue: execution_options.task_queue,
+ input: input,
+ execution_timeout: execution_options.timeouts[:execution],
+ # If unspecified, individual runs should have the full time for the execution (which includes retries).
+ run_timeout: compute_run_timeout(execution_options),
+ task_timeout: execution_options.timeouts[:task],
+ workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
+ headers: execution_options.headers,
+ memo: execution_options.memo,
+ )
+ else
+ raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
+
+ response = connection.signal_with_start_workflow_execution(
+ namespace: execution_options.namespace,
+ workflow_id: workflow_id,
+ workflow_name: execution_options.name,
+ task_queue: execution_options.task_queue,
+ input: input,
+ execution_timeout: execution_options.timeouts[:execution],
+ run_timeout: compute_run_timeout(execution_options),
+ task_timeout: execution_options.timeouts[:task],
+ workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
+ headers: execution_options.headers,
+ memo: execution_options.memo,
+ signal_name: signal_name,
+ signal_input: signal_input
+ )
+ end
response.run_id
end
- def schedule_workflow(workflow, cron_schedule, *input, **args)
- options = args.delete(:options) || {}
+ # Schedule a workflow for a periodic cron-like execution
+ #
+ # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class
+ # is passed, its config (namespace, task_queue, timeouts, etc) will be used
+ # @param cron_schedule [String] a cron-style schedule string
+ # @param input [any] arguments to be passed to workflow's #execute method
+ # @param args [Hash] keyword arguments to be pass to workflow's #execute method
+ # @param options [Hash, nil] optional overrides
+ # @option options [String] :workflow_id
+ # @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY
+ # @option options [String] :name workflow name
+ # @option options [String] :namespace
+ # @option options [String] :task_queue
+ # @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
+ # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
+ # @option options [Hash] :headers
+ #
+ # @return [String] workflow's run ID
+ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
input << args unless args.empty?
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
@@ -54,20 +117,41 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
# Execution timeout is across all scheduled jobs, whereas run is for an individual run.
# This default is here for backward compatibility. Certainly, the run timeout shouldn't be higher
# than the execution timeout.
- run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution],
+ run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers,
- cron_schedule: cron_schedule
+ cron_schedule: cron_schedule,
+ memo: execution_options.memo
)
response.run_id
end
+ # Register a new Temporal namespace
+ #
+ # @param name [String] name of the new namespace
+ # @param description [String] optional namespace description
def register_namespace(name, description = nil)
connection.register_namespace(name: name, description: description)
end
+ # Fetches metadata for a namespace.
+ # @param name [String] name of the namespace
+ # @return [Hash] info deserialized from Temporal::Api::WorkflowService::V1::DescribeNamespaceResponse
+ def describe_namespace(name)
+ connection.describe_namespace(name: name)
+ end
+
+ # Send a signal to a running workflow
+ #
+ # @param workflow [Temporal::Workflow, nil] workflow class or nil
+ # @param signal [String] name of the signal to send
+ # @param workflow_id [String]
+ # @param run_id [String]
+ # @param input [String, Array, nil] optional arguments for the signal
+ # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
+ # global default
def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespace: nil)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)
@@ -80,15 +164,22 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac
)
end
- # Long polls for a workflow to be completed and returns whatever the execute function
- # returned. This function times out after 30 seconds and throws Temporal::TimeoutError,
- # not to be confused with Temporal::WorkflowTimedOut which reports that the workflow
- # itself timed out.
- # run_id of nil: await the entire workflow completion. This can span multiple runs
- # in the case where the workflow uses continue-as-new.
- # timeout: seconds to wait for the result. This cannot be longer than 30 seconds because
- # that is the maximum the server supports.
- # namespace: if nil, choose the one declared on the Workflow, or the global default
+ # Long polls for a workflow to be completed and returns workflow's return value.
+ #
+ # @note This function times out after 30 seconds and throws Temporal::TimeoutError,
+ # not to be confused with `Temporal::WorkflowTimedOut` which reports that the workflow
+ # itself timed out.
+ #
+ # @param workflow [Temporal::Workflow, nil] workflow class or nil
+ # @param workflow_id [String]
+ # @param run_id [String, nil] awaits the entire workflow completion when nil. This can span
+ # multiple runs in the case where the workflow uses continue-as-new.
+ # @param timeout [Integer, nil] seconds to wait for the result. This cannot be longer than 30
+ # seconds because that is the maximum the server supports.
+ # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
+ # global default
+ #
+ # @return workflow's return value
def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil)
options = namespace ? {namespace: namespace} : {}
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
@@ -135,6 +226,21 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
end
end
+ # Reset a workflow
+ #
+ # @note More on resetting a workflow here —
+ # https://docs.temporal.io/docs/system-tools/tctl/#restart-reset-workflow
+ #
+ # @param namespace [String]
+ # @param workflow_id [String]
+ # @param run_id [String]
+ # @param strategy [Symbol, nil] one of the Temporal::ResetStrategy values or `nil` when
+ # passing a workflow_task_id
+ # @param workflow_task_id [Integer, nil] A specific event ID to reset to. The event has to
+ # be of a type WorkflowTaskCompleted, WorkflowTaskFailed or WorkflowTaskTimedOut
+ # @param reason [String] a reset reason to be recorded in workflow's history for reference
+ #
+ # @return [String] run_id of the new workflow execution
def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_workflow_task unless workflow_task_id
@@ -157,6 +263,14 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_
response.run_id
end
+ # Terminate a running workflow
+ #
+ # @param workflow_id [String]
+ # @param namespace [String, nil] use a default namespace when `nil`
+ # @param run_id [String, nil]
+ # @param reason [String, nil] a termination reason to be recorded in workflow's history
+ # for reference
+ # @param details [String, Array, nil] optional details to be stored in history
def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil)
namespace ||= Temporal.configuration.namespace
@@ -169,6 +283,13 @@ def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, de
)
end
+ # Fetch workflow's execution info
+ #
+ # @param namespace [String]
+ # @param workflow_id [String]
+ # @param run_id [String]
+ #
+ # @return [Temporal::Workflow::ExecutionInfo] an object containing workflow status and other info
def fetch_workflow_execution_info(namespace, workflow_id, run_id)
response = connection.describe_workflow_execution(
namespace: namespace,
@@ -179,6 +300,11 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
end
+ # Manually complete an activity
+ #
+ # @param async_token [String] an encoded Temporal::Activity::AsyncToken
+ # @param result [String, Array, nil] activity's return value to be stored in history and
+ # passed back to a workflow
def complete_activity(async_token, result = nil)
details = Activity::AsyncToken.decode(async_token)
@@ -191,6 +317,11 @@ def complete_activity(async_token, result = nil)
)
end
+ # Manually fail an activity
+ #
+ # @param async_token [String] an encoded Temporal::Activity::AsyncToken
+ # @param exception [Exception] activity's failure exception to be stored in history and
+ # raised in a workflow
def fail_activity(async_token, exception)
details = Activity::AsyncToken.decode(async_token)
@@ -203,6 +334,13 @@ def fail_activity(async_token, exception)
)
end
+ # Fetch workflow's execution history
+ #
+ # @param namespace [String]
+ # @param workflow_id [String]
+ # @param run_id [String]
+ #
+ # @return [Temporal::Workflow::History] workflow's execution history
def get_workflow_history(namespace:, workflow_id:, run_id:)
history_response = connection.get_workflow_execution_history(
namespace: namespace,
@@ -226,6 +364,10 @@ def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end
+ def compute_run_timeout(execution_options)
+ execution_options.timeouts[:run] || execution_options.timeouts[:execution]
+ end
+
def find_workflow_task(namespace, workflow_id, run_id, strategy)
history = get_workflow_history(
namespace: namespace,
diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb
index 49af8f3f..3be276d2 100644
--- a/lib/temporal/concerns/payloads.rb
+++ b/lib/temporal/concerns/payloads.rb
@@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
from_payloads(payloads)&.first
end
+ def from_payload_map(payload_map)
+ payload_map.map { |key, value| [key, from_payload(value)] }.to_h
+ end
+
def to_payloads(data)
payload_converter.to_payloads(data)
end
@@ -41,6 +45,10 @@ def to_signal_payloads(data)
to_payloads([data])
end
+ def to_payload_map(data)
+ data.transform_values(&method(:to_payload))
+ end
+
private
def payload_converter
diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb
index 828c561f..8d36615c 100644
--- a/lib/temporal/configuration.rb
+++ b/lib/temporal/configuration.rb
@@ -14,6 +14,7 @@ class Configuration
attr_writer :converter
attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers
+ # See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
# We choose an 10-year execution timeout because that's the maximum the cassandra DB supports,
# matching the go SDK, see https://github.com/temporalio/sdk-go/blob/d96130dad3d2bc189bc7626543bd5911cc07ff6d/internal/internal_workflow_testsuite.go#L68
@@ -21,9 +22,12 @@ class Configuration
execution: 86_400 * 365 * 10, # End-to-end workflow time, including all recurrences if it's scheduled.
# Time for a single run, excluding retries. Server defaults to execution timeout; we default here as well to be explicit.
run: 86_400 * 365 * 10,
- task: 10, # Workflow task processing time
+ # Workflow task processing time. Workflows should not use the network and should execute very quickly.
+ task: 10,
schedule_to_close: nil, # End-to-end activity time (default: schedule_to_start + start_to_close)
- schedule_to_start: 10, # Queue time for an activity
+ # Max queue time for an activity. Default: none. This is dangerous; most teams don't use.
+ # See # https://docs.temporal.io/blog/activity-timeouts/#schedule-to-start-timeout
+ schedule_to_start: nil,
start_to_close: 30, # Time spent processing an activity
heartbeat: nil # Max time between heartbeats (off by default)
}.freeze
diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb
index 848d2de0..b2ce0350 100644
--- a/lib/temporal/connection/grpc.rb
+++ b/lib/temporal/connection/grpc.rb
@@ -81,7 +81,8 @@ def start_workflow_execution(
task_timeout:,
workflow_id_reuse_policy: nil,
headers: nil,
- cron_schedule: nil
+ cron_schedule: nil,
+ memo: nil
)
request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
identity: identity,
@@ -99,9 +100,12 @@ def start_workflow_execution(
workflow_task_timeout: task_timeout,
request_id: SecureRandom.uuid,
header: Temporal::Api::Common::V1::Header.new(
- fields: headers
+ fields: to_payload_map(headers || {})
),
- cron_schedule: cron_schedule
+ cron_schedule: cron_schedule,
+ memo: Temporal::Api::Common::V1::Memo.new(
+ fields: to_payload_map(memo || {})
+ )
)
if workflow_id_reuse_policy
@@ -292,8 +296,66 @@ def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input:
client.signal_workflow_execution(request)
end
- def signal_with_start_workflow_execution
- raise NotImplementedError
+ def signal_with_start_workflow_execution(
+ namespace:,
+ workflow_id:,
+ workflow_name:,
+ task_queue:,
+ input: nil,
+ execution_timeout:,
+ run_timeout:,
+ task_timeout:,
+ workflow_id_reuse_policy: nil,
+ headers: nil,
+ cron_schedule: nil,
+ signal_name:,
+ signal_input:,
+ memo: nil
+ )
+ proto_header_fields = if headers.nil?
+ to_payload_map({})
+ elsif headers.class == Hash
+ to_payload_map(headers)
+ else
+ # Preserve backward compatability for headers specified using proto objects
+ warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects'
+ headers
+ end
+
+ request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new(
+ identity: identity,
+ namespace: namespace,
+ workflow_type: Temporal::Api::Common::V1::WorkflowType.new(
+ name: workflow_name
+ ),
+ workflow_id: workflow_id,
+ task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
+ name: task_queue
+ ),
+ input: to_payloads(input),
+ workflow_execution_timeout: execution_timeout,
+ workflow_run_timeout: run_timeout,
+ workflow_task_timeout: task_timeout,
+ request_id: SecureRandom.uuid,
+ header: Temporal::Api::Common::V1::Header.new(
+ fields: proto_header_fields,
+ ),
+ cron_schedule: cron_schedule,
+ signal_name: signal_name,
+ signal_input: to_signal_payloads(signal_input),
+ memo: Temporal::Api::Common::V1::Memo.new(
+ fields: to_payload_map(memo || {})
+ ),
+ )
+
+ if workflow_id_reuse_policy
+ policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy]
+ raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy
+
+ request.workflow_id_reuse_policy = policy
+ end
+
+ client.signal_with_start_workflow_execution(request)
end
def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:)
diff --git a/lib/temporal/connection/retryer.rb b/lib/temporal/connection/retryer.rb
index d70ba3b4..2948f05f 100644
--- a/lib/temporal/connection/retryer.rb
+++ b/lib/temporal/connection/retryer.rb
@@ -1,3 +1,5 @@
+require 'grpc/errors'
+
module Temporal
module Connection
module Retryer
@@ -11,15 +13,15 @@ module Retryer
# No amount of retrying will help in these cases.
def self.do_not_retry_errors
[
- GRPC::AlreadyExists,
- GRPC::Cancelled,
- GRPC::FailedPrecondition,
- GRPC::InvalidArgument,
+ ::GRPC::AlreadyExists,
+ ::GRPC::Cancelled,
+ ::GRPC::FailedPrecondition,
+ ::GRPC::InvalidArgument,
# If the activity has timed out, the server will return this and will never accept a retry
- GRPC::NotFound,
- GRPC::PermissionDenied,
- GRPC::Unauthenticated,
- GRPC::Unimplemented,
+ ::GRPC::NotFound,
+ ::GRPC::PermissionDenied,
+ ::GRPC::Unauthenticated,
+ ::GRPC::Unimplemented,
]
end
diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb
index 2d1e588c..357f0008 100644
--- a/lib/temporal/connection/serializer/continue_as_new.rb
+++ b/lib/temporal/connection/serializer/continue_as_new.rb
@@ -19,7 +19,8 @@ def to_proto
workflow_run_timeout: object.timeouts[:execution],
workflow_task_timeout: object.timeouts[:task],
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
- header: serialize_headers(object.headers)
+ header: serialize_headers(object.headers),
+ memo: serialize_memo(object.memo)
)
)
end
@@ -29,7 +30,13 @@ def to_proto
def serialize_headers(headers)
return unless headers
- Temporal::Api::Common::V1::Header.new(fields: object.headers)
+ Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers))
+ end
+
+ def serialize_memo(memo)
+ return unless memo
+
+ Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo))
end
end
end
diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb
index 55312e50..5f6b350d 100644
--- a/lib/temporal/connection/serializer/start_child_workflow.rb
+++ b/lib/temporal/connection/serializer/start_child_workflow.rb
@@ -22,7 +22,8 @@ def to_proto
workflow_run_timeout: object.timeouts[:run],
workflow_task_timeout: object.timeouts[:task],
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
- header: serialize_headers(object.headers)
+ header: serialize_headers(object.headers),
+ memo: serialize_memo(object.memo),
)
)
end
@@ -32,7 +33,13 @@ def to_proto
def serialize_headers(headers)
return unless headers
- Temporal::Api::Common::V1::Header.new(fields: object.headers)
+ Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers))
+ end
+
+ def serialize_memo(memo)
+ return unless memo
+
+ Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo))
end
end
end
diff --git a/lib/temporal/error_handler.rb b/lib/temporal/error_handler.rb
index 9702edcb..98fca99f 100644
--- a/lib/temporal/error_handler.rb
+++ b/lib/temporal/error_handler.rb
@@ -1,7 +1,7 @@
module Temporal
module ErrorHandler
- def self.handle(error, metadata: nil)
- Temporal.configuration.error_handlers.each do |handler|
+ def self.handle(error, configuration, metadata: nil)
+ configuration.error_handlers.each do |handler|
handler.call(error, metadata: metadata)
rescue StandardError => e
Temporal.logger.error("Error handler failed", { error: e.inspect })
diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb
index 5730224c..2e7f4ebe 100644
--- a/lib/temporal/errors.rb
+++ b/lib/temporal/errors.rb
@@ -19,6 +19,9 @@ class TimeoutError < ClientError; end
# with the intent to propagate to a workflow
class ActivityException < ClientError; end
+ # Represents cancellation of a non-started activity
+ class ActivityCanceled < ActivityException; end
+
class ActivityNotRegistered < ClientError; end
class WorkflowNotRegistered < ClientError; end
diff --git a/lib/temporal/execution_options.rb b/lib/temporal/execution_options.rb
index 6fcaf371..182c4079 100644
--- a/lib/temporal/execution_options.rb
+++ b/lib/temporal/execution_options.rb
@@ -3,7 +3,7 @@
module Temporal
class ExecutionOptions
- attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers
+ attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo
def initialize(object, options, defaults = nil)
# Options are treated as overrides and take precedence
@@ -13,6 +13,7 @@ def initialize(object, options, defaults = nil)
@retry_policy = options[:retry_policy] || {}
@timeouts = options[:timeouts] || {}
@headers = options[:headers] || {}
+ @memo = options[:memo] || {}
# For Temporal::Workflow and Temporal::Activity use defined values as the next option
if has_executable_concern?(object)
diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb
index e39f8da9..df7165ea 100644
--- a/lib/temporal/metadata.rb
+++ b/lib/temporal/metadata.rb
@@ -6,37 +6,11 @@
module Temporal
module Metadata
- ACTIVITY_TYPE = :activity
- WORKFLOW_TASK_TYPE = :workflow_task
- WORKFLOW_TYPE = :workflow
class << self
include Concerns::Payloads
- def generate(type, data, namespace = nil)
- case type
- when ACTIVITY_TYPE
- activity_metadata_from(data, namespace)
- when WORKFLOW_TASK_TYPE
- workflow_task_metadata_from(data, namespace)
- when WORKFLOW_TYPE
- workflow_metadata_from(data)
- else
- raise InternalError, 'Unsupported metadata type'
- end
- end
-
- private
-
- def headers(fields)
- result = {}
- fields.each do |field, payload|
- result[field] = from_payload(payload)
- end
- result
- end
-
- def activity_metadata_from(task, namespace)
+ def generate_activity_metadata(task, namespace)
Metadata::Activity.new(
namespace: namespace,
id: task.activity_id,
@@ -46,12 +20,12 @@ def activity_metadata_from(task, namespace)
workflow_run_id: task.workflow_execution.run_id,
workflow_id: task.workflow_execution.workflow_id,
workflow_name: task.workflow_type.name,
- headers: headers(task.header&.fields),
+ headers: from_payload_map(task.header&.fields || {}),
heartbeat_details: from_details_payloads(task.heartbeat_details)
)
end
- def workflow_task_metadata_from(task, namespace)
+ def generate_workflow_task_metadata(task, namespace)
Metadata::WorkflowTask.new(
namespace: namespace,
id: task.started_event_id,
@@ -63,12 +37,20 @@ def workflow_task_metadata_from(task, namespace)
)
end
- def workflow_metadata_from(event)
+ # @param event [Temporal::Workflow::History::Event] Workflow started history event
+ # @param event [WorkflowExecutionStartedEventAttributes] :attributes
+ # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata
+ def generate_workflow_metadata(event, task_metadata)
Metadata::Workflow.new(
- name: event.workflow_type.name,
- run_id: event.original_execution_run_id,
- attempt: event.attempt,
- headers: headers(event.header&.fields)
+ name: event.attributes.workflow_type.name,
+ id: task_metadata.workflow_id,
+ run_id: event.attributes.original_execution_run_id,
+ attempt: event.attributes.attempt,
+ namespace: task_metadata.namespace,
+ task_queue: event.attributes.task_queue.name,
+ headers: from_payload_map(event.attributes.header&.fields || {}),
+ run_started_at: event.timestamp,
+ memo: from_payload_map(event.attributes.memo&.fields || {}),
)
end
end
diff --git a/lib/temporal/metadata/workflow.rb b/lib/temporal/metadata/workflow.rb
index 86d14de4..f4715dde 100644
--- a/lib/temporal/metadata/workflow.rb
+++ b/lib/temporal/metadata/workflow.rb
@@ -3,13 +3,18 @@
module Temporal
module Metadata
class Workflow < Base
- attr_reader :name, :run_id, :attempt, :headers
+ attr_reader :namespace, :id, :name, :run_id, :attempt, :task_queue, :headers, :run_started_at, :memo
- def initialize(name:, run_id:, attempt:, headers: {})
+ def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:, run_started_at:, memo:)
+ @namespace = namespace
+ @id = id
@name = name
@run_id = run_id
@attempt = attempt
+ @task_queue = task_queue
@headers = headers
+ @run_started_at = run_started_at
+ @memo = memo
freeze
end
@@ -20,9 +25,14 @@ def workflow?
def to_h
{
+ 'namespace' => namespace,
+ 'workflow_id' => id,
'workflow_name' => name,
'workflow_run_id' => run_id,
- 'attempt' => attempt
+ 'attempt' => attempt,
+ 'task_queue' => task_queue,
+ 'run_started_at' => run_started_at.to_f,
+ 'memo' => memo,
}
end
end
diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb
index 3642a7d3..5543452d 100644
--- a/lib/temporal/testing/local_workflow_context.rb
+++ b/lib/temporal/testing/local_workflow_context.rb
@@ -9,7 +9,7 @@
module Temporal
module Testing
class LocalWorkflowContext
- attr_reader :metadata
+ attr_reader :metadata, :config
def initialize(execution, workflow_id, run_id, disabled_releases, metadata, config = Temporal.configuration)
@last_event_id = 0
@@ -165,9 +165,16 @@ def wait_for_all(*futures)
return
end
- def wait_for(future)
- # Point of communication
- Fiber.yield while !future.finished?
+ def wait_for(*futures, &unblock_condition)
+ if futures.empty? && unblock_condition.nil?
+ raise 'You must pass either a future or an unblock condition block to wait_for'
+ end
+
+ while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call)
+ Fiber.yield
+ end
+
+ return
end
def now
@@ -175,20 +182,20 @@ def now
end
def on_signal(&block)
- raise NotImplementedError, 'not yet available for testing'
+ raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on'
end
def cancel_activity(activity_id)
- raise NotImplementedError, 'not yet available for testing'
+ raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on'
end
def cancel(target, cancelation_id)
- raise NotImplementedError, 'not yet available for testing'
+ raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on'
end
private
- attr_reader :execution, :run_id, :workflow_id, :disabled_releases, :config
+ attr_reader :execution, :run_id, :workflow_id, :disabled_releases
def completed!
@completed = true
diff --git a/lib/temporal/testing/temporal_override.rb b/lib/temporal/testing/temporal_override.rb
index de61b591..d44da24f 100644
--- a/lib/temporal/testing/temporal_override.rb
+++ b/lib/temporal/testing/temporal_override.rb
@@ -73,9 +73,15 @@ def start_locally(workflow, schedule, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
+ # signals aren't supported at all, so let's prohibit start_workflow calls that try to signal
+ signal_name = options.delete(:signal_name)
+ signal_input = options.delete(:signal_input)
+ raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' if signal_name || signal_input
+
reuse_policy = options[:workflow_id_reuse_policy] || :allow_failed
workflow_id = options[:workflow_id] || SecureRandom.uuid
run_id = SecureRandom.uuid
+ memo = options[:memo] || {}
if !allowed?(workflow_id, reuse_policy)
raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(
@@ -89,7 +95,15 @@ def start_locally(workflow, schedule, *input, **args)
execution_options = ExecutionOptions.new(workflow, options)
metadata = Metadata::Workflow.new(
- name: workflow_id, run_id: run_id, attempt: 1, headers: execution_options.headers
+ namespace: execution_options.namespace,
+ id: workflow_id,
+ name: execution_options.name,
+ run_id: run_id,
+ attempt: 1,
+ task_queue: execution_options.task_queue,
+ run_started_at: Time.now,
+ memo: memo,
+ headers: execution_options.headers
)
context = Temporal::Testing::LocalWorkflowContext.new(
execution, workflow_id, run_id, workflow.disabled_releases, metadata
diff --git a/lib/temporal/testing/workflow_override.rb b/lib/temporal/testing/workflow_override.rb
index a36e843e..9d43f953 100644
--- a/lib/temporal/testing/workflow_override.rb
+++ b/lib/temporal/testing/workflow_override.rb
@@ -28,7 +28,15 @@ def execute_locally(*input)
run_id = SecureRandom.uuid
execution = WorkflowExecution.new
metadata = Temporal::Metadata::Workflow.new(
- name: workflow_id, run_id: run_id, attempt: 1
+ namespace: nil,
+ id: workflow_id,
+ name: name, # Workflow class name
+ run_id: run_id,
+ attempt: 1,
+ task_queue: 'unit-test-task-queue',
+ headers: {},
+ run_started_at: Time.now,
+ memo: {},
)
context = Temporal::Testing::LocalWorkflowContext.new(
execution, workflow_id, run_id, disabled_releases, metadata
diff --git a/lib/temporal/workflow.rb b/lib/temporal/workflow.rb
index 06bf2b80..3b5dcfe6 100644
--- a/lib/temporal/workflow.rb
+++ b/lib/temporal/workflow.rb
@@ -20,7 +20,7 @@ def self.execute_in_context(context, input)
Temporal.logger.error("Workflow execution failed", context.metadata.to_h.merge(error: error.inspect))
Temporal.logger.debug(error.backtrace.join("\n"))
- Temporal::ErrorHandler.handle(error, metadata: context.metadata)
+ Temporal::ErrorHandler.handle(error, context.config, metadata: context.metadata)
context.fail(error)
ensure
diff --git a/lib/temporal/workflow/command.rb b/lib/temporal/workflow/command.rb
index 0f0d6ed9..8297abe9 100644
--- a/lib/temporal/workflow/command.rb
+++ b/lib/temporal/workflow/command.rb
@@ -3,8 +3,8 @@ class Workflow
module Command
# TODO: Move these classes into their own directories under workflow/command/*
ScheduleActivity = Struct.new(:activity_type, :activity_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true)
- StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true)
- ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, keyword_init: true)
+ StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, keyword_init: true)
+ ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, :memo, keyword_init: true)
RequestActivityCancellation = Struct.new(:activity_id, keyword_init: true)
RecordMarker = Struct.new(:name, :details, keyword_init: true)
StartTimer = Struct.new(:timeout, :timer_id, keyword_init: true)
diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb
index d4541840..d88d8f30 100644
--- a/lib/temporal/workflow/context.rb
+++ b/lib/temporal/workflow/context.rb
@@ -15,7 +15,7 @@
module Temporal
class Workflow
class Context
- attr_reader :metadata
+ attr_reader :metadata, :config
def initialize(state_manager, dispatcher, workflow_class, metadata, config)
@state_manager = state_manager
@@ -195,6 +195,13 @@ def continue_as_new(*input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
+ # If memo or headers are not overridden, use those from the current run
+ options_from_metadata = {
+ memo: metadata.memo,
+ headers: metadata.headers,
+ }
+ options = options_from_metadata.merge(options)
+
execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options)
command = Command::ContinueAsNew.new(
@@ -203,7 +210,8 @@ def continue_as_new(*input, **args)
input: input,
timeouts: execution_options.timeouts,
retry_policy: execution_options.retry_policy,
- headers: execution_options.headers
+ headers: execution_options.headers,
+ memo: execution_options.memo,
)
schedule_command(command)
completed!
@@ -215,14 +223,54 @@ def wait_for_all(*futures)
return
end
- def wait_for(future)
+ # Block workflow progress until any future is finished or any unblock_condition
+ # block evaluates to true.
+ def wait_for(*futures, &unblock_condition)
+ if futures.empty? && unblock_condition.nil?
+ raise 'You must pass either a future or an unblock condition block to wait_for'
+ end
+
fiber = Fiber.current
+ should_yield = false
+ blocked = true
+
+ if futures.any?
+ if futures.any?(&:finished?)
+ blocked = false
+ else
+ should_yield = true
+ futures.each do |future|
+ dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
+ if blocked && future.finished?
+ # Because this block can run for any dispatch, ensure the fiber is only
+ # resumed one time by checking if it's already been unblocked.
+ blocked = false
+ fiber.resume
+ end
+ end
+ end
+ end
+ end
- dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
- fiber.resume if future.finished?
+ if blocked && unblock_condition
+ if unblock_condition.call
+ blocked = false
+ should_yield = false
+ else
+ should_yield = true
+
+ dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
+ # Because this block can run for any dispatch, ensure the fiber is only
+ # resumed one time by checking if it's already been unblocked.
+ if blocked && unblock_condition.call
+ blocked = false
+ fiber.resume
+ end
+ end
+ end
end
- Fiber.yield
+ Fiber.yield if should_yield
return
end
@@ -258,7 +306,7 @@ def cancel(target, cancelation_id)
private
- attr_reader :state_manager, :dispatcher, :workflow_class, :config
+ attr_reader :state_manager, :dispatcher, :workflow_class
def completed!
@completed = true
diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb
index 55c581fb..2a768e54 100644
--- a/lib/temporal/workflow/dispatcher.rb
+++ b/lib/temporal/workflow/dispatcher.rb
@@ -2,6 +2,7 @@ module Temporal
class Workflow
class Dispatcher
WILDCARD = '*'.freeze
+ TARGET_WILDCARD = '*'.freeze
def initialize
@handlers = Hash.new { |hash, key| hash[key] = [] }
@@ -23,6 +24,7 @@ def dispatch(target, event_name, args = nil)
def handlers_for(target, event_name)
handlers[target]
+ .concat(handlers[TARGET_WILDCARD])
.select { |(name, _)| name == event_name || name == WILDCARD }
.map(&:last)
end
diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb
index 67cad260..46b8e4cb 100644
--- a/lib/temporal/workflow/execution_info.rb
+++ b/lib/temporal/workflow/execution_info.rb
@@ -1,6 +1,10 @@
+require 'temporal/concerns/payloads'
+
module Temporal
class Workflow
- class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, keyword_init: true)
+ class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, :memo, keyword_init: true)
+ extend Concerns::Payloads
+
RUNNING_STATUS = :RUNNING
COMPLETED_STATUS = :COMPLETED
FAILED_STATUS = :FAILED
@@ -38,6 +42,7 @@ def self.generate_from(response)
close_time: response.close_time&.to_time,
status: API_STATUS_MAP.fetch(response.status),
history_length: response.history_length,
+ memo: self.from_payload_map(response.memo.fields),
).freeze
end
diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb
index c81703cb..78feb61b 100644
--- a/lib/temporal/workflow/executor.rb
+++ b/lib/temporal/workflow/executor.rb
@@ -4,15 +4,21 @@
require 'temporal/workflow/state_manager'
require 'temporal/workflow/context'
require 'temporal/workflow/history/event_target'
+require 'temporal/metadata'
module Temporal
class Workflow
class Executor
- def initialize(workflow_class, history, config)
+ # @param workflow_class [Class]
+ # @param history [Workflow::History]
+ # @param task_metadata [Metadata::WorkflowTask]
+ # @param config [Configuration]
+ def initialize(workflow_class, history, task_metadata, config)
@workflow_class = workflow_class
@dispatcher = Dispatcher.new
@state_manager = StateManager.new(dispatcher)
@history = history
+ @task_metadata = task_metadata
@config = config
end
@@ -32,9 +38,10 @@ def run
private
- attr_reader :workflow_class, :dispatcher, :state_manager, :history, :config
+ attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config
- def execute_workflow(input, metadata)
+ def execute_workflow(input, workflow_started_event)
+ metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata)
context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config)
Fiber.new do
diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb
index a54dab55..8f605a01 100644
--- a/lib/temporal/workflow/history/event_target.rb
+++ b/lib/temporal/workflow/history/event_target.rb
@@ -19,7 +19,7 @@ class UnexpectedEventType < InternalError; end
# NOTE: The order is important, first prefix match wins (will be a longer match)
TARGET_TYPES = {
- 'ACTIVITY_TASK_CANCEL' => CANCEL_ACTIVITY_REQUEST_TYPE,
+ 'ACTIVITY_TASK_CANCEL_REQUESTED' => CANCEL_ACTIVITY_REQUEST_TYPE,
'ACTIVITY_TASK' => ACTIVITY_TYPE,
'REQUEST_CANCEL_ACTIVITY_TASK' => CANCEL_ACTIVITY_REQUEST_TYPE,
'TIMER_CANCELED' => CANCEL_TIMER_REQUEST_TYPE,
@@ -61,7 +61,7 @@ def initialize(id, type)
end
def ==(other)
- id == other.id && type == other.type
+ self.class == other.class && id == other.id && type == other.type
end
def eql?(other)
diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb
index f312d0f9..c0e7b950 100644
--- a/lib/temporal/workflow/poller.rb
+++ b/lib/temporal/workflow/poller.rb
@@ -77,7 +77,7 @@ def poll_for_task
connection.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue)
rescue StandardError => error
Temporal.logger.error("Unable to poll Workflow task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect })
- Temporal::ErrorHandler.handle(error)
+ Temporal::ErrorHandler.handle(error, config)
nil
end
diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb
index 693f1305..ac4c1846 100644
--- a/lib/temporal/workflow/state_manager.rb
+++ b/lib/temporal/workflow/state_manager.rb
@@ -3,7 +3,6 @@
require 'temporal/workflow/command'
require 'temporal/workflow/command_state_machine'
require 'temporal/workflow/history/event_target'
-require 'temporal/metadata'
require 'temporal/concerns/payloads'
require 'temporal/workflow/errors'
@@ -106,7 +105,7 @@ def apply_event(event)
History::EventTarget.workflow,
'started',
from_payloads(event.attributes.input),
- Metadata.generate(Metadata::WORKFLOW_TYPE, event.attributes)
+ event,
)
when 'WORKFLOW_EXECUTION_COMPLETED'
@@ -163,7 +162,7 @@ def apply_event(event)
when 'ACTIVITY_TASK_CANCELED'
state_machine.cancel
- dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure))
+ dispatch(target, 'failed', Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details)))
when 'TIMER_STARTED'
state_machine.start
diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb
index c22e2c7d..63b4c76e 100644
--- a/lib/temporal/workflow/task_processor.rb
+++ b/lib/temporal/workflow/task_processor.rb
@@ -12,7 +12,7 @@ class TaskProcessor
def initialize(task, namespace, workflow_lookup, middleware_chain, config)
@task = task
@namespace = namespace
- @metadata = Metadata.generate(Metadata::WORKFLOW_TASK_TYPE, task, namespace)
+ @metadata = Metadata.generate_workflow_task_metadata(task, namespace)
@task_token = task.task_token
@workflow_name = task.workflow_type.name
@workflow_class = workflow_lookup.find(workflow_name)
@@ -32,7 +32,7 @@ def process
history = fetch_full_history
# TODO: For sticky workflows we need to cache the Executor instance
- executor = Workflow::Executor.new(workflow_class, history, config)
+ executor = Workflow::Executor.new(workflow_class, history, metadata, config)
commands = middleware_chain.invoke(metadata) do
executor.run
@@ -40,7 +40,7 @@ def process
complete_task(commands)
rescue StandardError => error
- Temporal::ErrorHandler.handle(error, metadata: metadata)
+ Temporal::ErrorHandler.handle(error, config, metadata: metadata)
fail_task(error)
ensure
@@ -110,7 +110,7 @@ def fail_task(error)
rescue StandardError => error
Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect))
- Temporal::ErrorHandler.handle(error, metadata: metadata)
+ Temporal::ErrorHandler.handle(error, config, metadata: metadata)
end
end
end
diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb
index 0d7e9e48..9e8538eb 100644
--- a/spec/fabricators/grpc/history_event_fabricator.rb
+++ b/spec/fabricators/grpc/history_event_fabricator.rb
@@ -1,13 +1,24 @@
require 'securerandom'
+class TestSerializer
+ extend Temporal::Concerns::Payloads
+end
+
Fabricator(:api_history_event, from: Temporal::Api::History::V1::HistoryEvent) do
event_id { 1 }
event_time { Time.now }
end
Fabricator(:api_workflow_execution_started_event, from: :api_history_event) do
+ transient :headers
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED }
- workflow_execution_started_event_attributes do
+ event_time { Time.now }
+ workflow_execution_started_event_attributes do |attrs|
+ header_fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h|
+ h[field] = Temporal.configuration.converter.to_payload(value)
+ end
+ header = Temporal::Api::Common::V1::Header.new(fields: header_fields)
+
Temporal::Api::History::V1::WorkflowExecutionStartedEventAttributes.new(
workflow_type: Fabricate(:api_workflow_type),
task_queue: Fabricate(:api_task_queue),
@@ -19,7 +30,7 @@
first_execution_run_id: SecureRandom.uuid,
retry_policy: nil,
attempt: 0,
- header: Fabricate(:api_header)
+ header: header,
)
end
end
@@ -115,6 +126,28 @@
end
end
+Fabricator(:api_activity_task_canceled_event, from: :api_history_event) do
+ event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCELED }
+ activity_task_canceled_event_attributes do |attrs|
+ Temporal::Api::History::V1::ActivityTaskCanceledEventAttributes.new(
+ details: TestSerializer.to_details_payloads('ACTIVITY_ID_NOT_STARTED'),
+ scheduled_event_id: attrs[:event_id] - 2,
+ started_event_id: nil,
+ identity: 'test-worker@test-host'
+ )
+ end
+end
+
+Fabricator(:api_activity_task_cancel_requested_event, from: :api_history_event) do
+ event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED }
+ activity_task_cancel_requested_event_attributes do |attrs|
+ Temporal::Api::History::V1::ActivityTaskCancelRequestedEventAttributes.new(
+ scheduled_event_id: attrs[:event_id] - 1,
+ workflow_task_completed_event_id: attrs[:event_id] - 2,
+ )
+ end
+end
+
Fabricator(:api_timer_started_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_TIMER_STARTED }
timer_started_event_attributes do |attrs|
diff --git a/spec/fabricators/grpc/memo_fabricator.rb b/spec/fabricators/grpc/memo_fabricator.rb
new file mode 100644
index 00000000..6c9fd726
--- /dev/null
+++ b/spec/fabricators/grpc/memo_fabricator.rb
@@ -0,0 +1,7 @@
+Fabricator(:memo, from: Temporal::Api::Common::V1::Memo) do
+ fields do
+ Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload).tap do |m|
+ m['foo'] = Temporal.configuration.converter.to_payload('bar')
+ end
+ end
+end
diff --git a/spec/fabricators/grpc/workflow_execution_info_fabricator.rb b/spec/fabricators/grpc/workflow_execution_info_fabricator.rb
index 296bde87..4f1d577e 100644
--- a/spec/fabricators/grpc/workflow_execution_info_fabricator.rb
+++ b/spec/fabricators/grpc/workflow_execution_info_fabricator.rb
@@ -5,4 +5,5 @@
close_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
status { Temporal::Api::Enums::V1::WorkflowExecutionStatus::WORKFLOW_EXECUTION_STATUS_COMPLETED }
history_length { rand(100) }
+ memo { Fabricate(:memo) }
end
diff --git a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb
index a3e72609..0cc19e16 100644
--- a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb
+++ b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb
@@ -9,6 +9,7 @@
workflow_type { Fabricate(:api_workflow_type) }
original_execution_run_id { SecureRandom.uuid }
attempt 1
+ task_queue { Fabricate(:api_task_queue) }
header do |attrs|
fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h|
h[field] = Temporal.configuration.converter.to_payload(value)
diff --git a/spec/fabricators/workflow_metadata_fabricator.rb b/spec/fabricators/workflow_metadata_fabricator.rb
index 5a609bb8..c32fd3e1 100644
--- a/spec/fabricators/workflow_metadata_fabricator.rb
+++ b/spec/fabricators/workflow_metadata_fabricator.rb
@@ -1,8 +1,13 @@
require 'securerandom'
Fabricator(:workflow_metadata, from: :open_struct) do
+ namespace 'test-namespace'
+ id { SecureRandom.uuid }
name 'TestWorkflow'
run_id { SecureRandom.uuid }
attempt 1
+ task_queue { Fabricate(:api_task_queue) }
+ run_started_at { Time.now }
+ memo { {} }
headers { {} }
end
diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb
index 91e1eccf..4cc18a5e 100644
--- a/spec/unit/lib/temporal/activity/task_processor_spec.rb
+++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb
@@ -11,10 +11,10 @@
Fabricate(
:api_activity_task,
activity_name: activity_name,
- input: Temporal.configuration.converter.to_payloads(input)
+ input: config.converter.to_payloads(input)
)
end
- let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task) }
+ let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace) }
let(:activity_name) { 'TestActivity' }
let(:connection) { instance_double('Temporal::Connection::GRPC') }
let(:middleware_chain) { Temporal::Middleware::Chain.new }
@@ -30,8 +30,8 @@
.with(config.for_connection)
.and_return(connection)
allow(Temporal::Metadata)
- .to receive(:generate)
- .with(Temporal::Metadata::ACTIVITY_TYPE, task, namespace)
+ .to receive(:generate_activity_metadata)
+ .with(task, namespace)
.and_return(metadata)
allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata).and_return(context)
@@ -70,7 +70,7 @@
reported_error = nil
reported_metadata = nil
- Temporal.configuration.on_error do |error, metadata: nil|
+ config.on_error do |error, metadata: nil|
reported_error = error
reported_metadata = metadata.to_h
end
@@ -187,7 +187,7 @@
reported_error = nil
reported_metadata = nil
- Temporal.configuration.on_error do |error, metadata: nil|
+ config.on_error do |error, metadata: nil|
reported_error = error
reported_metadata = metadata
end
diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb
index 92ec094b..68e5076a 100644
--- a/spec/unit/lib/temporal/client_spec.rb
+++ b/spec/unit/lib/temporal/client_spec.rb
@@ -61,7 +61,8 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
- headers: {}
+ headers: {},
+ memo: {}
)
end
@@ -73,7 +74,8 @@ class TestStartWorkflow < Temporal::Workflow
name: 'test-workflow',
namespace: 'test-namespace',
task_queue: 'test-task-queue',
- headers: { 'Foo' => 'Bar' }
+ headers: { 'Foo' => 'Bar' },
+ memo: { 'MemoKey1' => 'MemoValue1' }
}
)
@@ -89,7 +91,8 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
- headers: { 'Foo' => 'Bar' }
+ headers: { 'Foo' => 'Bar' },
+ memo: { 'MemoKey1' => 'MemoValue1' }
)
end
@@ -114,7 +117,8 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
- headers: {}
+ headers: {},
+ memo: {}
)
end
@@ -133,7 +137,8 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
- headers: {}
+ headers: {},
+ memo: {}
)
end
@@ -154,7 +159,8 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: :allow,
- headers: {}
+ headers: {},
+ memo: {}
)
end
end
@@ -179,12 +185,91 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
- headers: {}
+ headers: {},
+ memo: {}
)
end
end
end
+ describe '#start_workflow with a signal' do
+ let(:temporal_response) do
+ Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx')
+ end
+
+ before { allow(connection).to receive(:signal_with_start_workflow_execution).and_return(temporal_response) }
+
+ def expect_signal_with_start(expected_arguments, expected_signal_argument)
+ expect(connection)
+ .to have_received(:signal_with_start_workflow_execution)
+ .with(
+ namespace: 'default-test-namespace',
+ workflow_id: an_instance_of(String),
+ workflow_name: 'TestStartWorkflow',
+ task_queue: 'default-test-task-queue',
+ input: expected_arguments,
+ task_timeout: Temporal.configuration.timeouts[:task],
+ run_timeout: Temporal.configuration.timeouts[:run],
+ execution_timeout: Temporal.configuration.timeouts[:execution],
+ workflow_id_reuse_policy: nil,
+ headers: {},
+ memo: {},
+ signal_name: 'the question',
+ signal_input: expected_signal_argument,
+ )
+ end
+
+ it 'starts a workflow with a signal and no arguments' do
+ subject.start_workflow(
+ TestStartWorkflow,
+ options: { signal_name: 'the question' }
+ )
+
+ expect_signal_with_start([], nil)
+ end
+
+ it 'starts a workflow with a signal and one scalar argument' do
+ signal_input = 'what do you get if you multiply six by nine?'
+ subject.start_workflow(
+ TestStartWorkflow,
+ 42,
+ options: {
+ signal_name: 'the question',
+ signal_input: signal_input,
+ }
+ )
+
+ expect_signal_with_start([42], signal_input)
+ end
+
+ it 'starts a workflow with a signal and multiple arguments and signal_inputs' do
+ signal_input = ['what do you get', 'if you multiply six by nine?']
+ subject.start_workflow(
+ TestStartWorkflow,
+ 42,
+ 43,
+ options: {
+ signal_name: 'the question',
+ # signals can't have multiple scalar args, but you can pass an array
+ signal_input: signal_input
+ }
+ )
+
+ expect_signal_with_start([42, 43], signal_input)
+ end
+
+ it 'raises when signal_input is given but signal_name is not' do
+ expect do
+ subject.start_workflow(
+ TestStartWorkflow,
+ [42, 54],
+ [43, 55],
+ options: { signal_input: 'what do you get if you multiply six by nine?', }
+ )
+ end.to raise_error(ArgumentError)
+ end
+ end
+
describe '#schedule_workflow' do
let(:temporal_response) do
Temporal::Api::WorkflowService::V1::StartWorkflowExecutionResponse.new(run_id: 'xxx')
@@ -208,7 +293,8 @@ class TestStartWorkflow < Temporal::Workflow
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
- headers: {}
+ memo: {},
+ headers: {},
)
end
end
@@ -233,6 +319,17 @@ class TestStartWorkflow < Temporal::Workflow
end
end
+ describe '#describe_namespace' do
+ before { allow(connection).to receive(:describe_namespace).and_return(Temporal::Api::WorkflowService::V1::DescribeNamespaceResponse.new) }
+
+ it 'passes the namespace to the connection' do
+ result = subject.describe_namespace('new-namespace')
+
+ expect(connection)
+ .to have_received(:describe_namespace)
+ .with(name: 'new-namespace')
+ end
+ end
describe '#signal_workflow' do
before { allow(connection).to receive(:signal_workflow_execution).and_return(nil) }
diff --git a/spec/unit/lib/temporal/configuration_spec.rb b/spec/unit/lib/temporal/configuration_spec.rb
new file mode 100644
index 00000000..7e083429
--- /dev/null
+++ b/spec/unit/lib/temporal/configuration_spec.rb
@@ -0,0 +1,29 @@
+require 'temporal/configuration'
+
+describe Temporal::Configuration do
+ describe '#initialize' do
+ it 'initializes proper default workflow timeouts' do
+ timeouts = subject.timeouts
+
+ # By default, we don't ever want to timeout workflows, because workflows "always succeed" and
+ # they may be long-running
+ expect(timeouts[:execution]).to be >= 86_400 * 365 * 10
+ expect(timeouts[:run]).to eq(timeouts[:execution])
+ expect(timeouts[:task]).to eq(10)
+ end
+
+ it 'initializes proper default activity timeouts' do
+ timeouts = subject.timeouts
+
+ # Schedule to start timeouts are dangerous because there is no retry.
+ # https://docs.temporal.io/blog/activity-timeouts/#schedule-to-start-timeout recommends to use them rarely
+ expect(timeouts[:schedule_to_start]).to be(nil)
+ # We keep retrying until the workflow times out, by default
+ expect(timeouts[:schedule_to_close]).to be(nil)
+ # Activity invocations should be short-lived by default so they can be retried relatively quickly
+ expect(timeouts[:start_to_close]).to eq(30)
+ # No heartbeating for a default (short-lived) activity
+ expect(timeouts[:heartbeat]).to be(nil)
+ end
+ end
+end
\ No newline at end of file
diff --git a/spec/unit/lib/temporal/connection/retryer.rb b/spec/unit/lib/temporal/connection/retryer_spec.rb
similarity index 100%
rename from spec/unit/lib/temporal/connection/retryer.rb
rename to spec/unit/lib/temporal/connection/retryer_spec.rb
diff --git a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb
index fbb00623..18de4355 100644
--- a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb
+++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb
@@ -5,10 +5,12 @@
describe 'to_proto' do
it 'produces a protobuf' do
command = Temporal::Workflow::Command::ContinueAsNew.new(
- workflow_type: 'Test',
- task_queue: 'Test',
+ workflow_type: 'my-workflow-type',
+ task_queue: 'my-task-queue',
input: ['one', 'two'],
- timeouts: Temporal.configuration.timeouts
+ timeouts: Temporal.configuration.timeouts,
+ headers: {'foo-header': 'bar'},
+ memo: {'foo-memo': 'baz'},
)
result = described_class.new(command).to_proto
@@ -17,6 +19,18 @@
expect(result.command_type).to eql(
:COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION
)
+ expect(result.continue_as_new_workflow_execution_command_attributes).not_to be_nil
+ attribs = result.continue_as_new_workflow_execution_command_attributes
+
+ expect(attribs.workflow_type.name).to eq('my-workflow-type')
+
+ expect(attribs.task_queue.name).to eq('my-task-queue')
+
+ expect(attribs.input.payloads[0].data).to eq('"one"')
+ expect(attribs.input.payloads[1].data).to eq('"two"')
+
+ expect(attribs.header.fields['foo-header'].data).to eq('"bar"')
+ expect(attribs.memo.fields['foo-memo'].data).to eq('"baz"')
end
end
end
diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_client_spec.rb
index bcc0458d..a168c359 100644
--- a/spec/unit/lib/temporal/grpc_client_spec.rb
+++ b/spec/unit/lib/temporal/grpc_client_spec.rb
@@ -27,13 +27,51 @@
task_queue: 'test',
execution_timeout: 0,
run_timeout: 0,
- task_timeout: 0
+ task_timeout: 0,
+ memo: {}
)
end.to raise_error(Temporal::WorkflowExecutionAlreadyStartedFailure) do |e|
expect(e.run_id).to eql('baaf1d86-4459-4ecd-a288-47aeae55245d')
end
end
end
+
+ describe '#signal_with_start_workflow' do
+ let(:temporal_response) do
+ Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx')
+ end
+
+ before { allow(grpc_stub).to receive(:signal_with_start_workflow_execution).and_return(temporal_response) }
+
+ it 'starts a workflow with a signal with scalar arguments' do
+ subject.signal_with_start_workflow_execution(
+ namespace: namespace,
+ workflow_id: workflow_id,
+ workflow_name: 'workflow_name',
+ task_queue: 'task_queue',
+ input: ['foo'],
+ execution_timeout: 1,
+ run_timeout: 2,
+ task_timeout: 3,
+ signal_name: 'the question',
+ signal_input: 'what do you get if you multiply six by nine?'
+ )
+
+ expect(grpc_stub).to have_received(:signal_with_start_workflow_execution) do |request|
+ expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest)
+ expect(request.namespace).to eq(namespace)
+ expect(request.workflow_id).to eq(workflow_id)
+ expect(request.workflow_type.name).to eq('workflow_name')
+ expect(request.task_queue.name).to eq('task_queue')
+ expect(request.input.payloads[0].data).to eq('"foo"')
+ expect(request.workflow_execution_timeout.seconds).to eq(1)
+ expect(request.workflow_run_timeout.seconds).to eq(2)
+ expect(request.workflow_task_timeout.seconds).to eq(3)
+ expect(request.signal_name).to eq('the question')
+ expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"')
+ end
+ end
+ end
describe '#get_workflow_execution_history' do
let(:response) do
diff --git a/spec/unit/lib/temporal/metadata/workflow_spec.rb b/spec/unit/lib/temporal/metadata/workflow_spec.rb
index 6bbe3af6..be3f50b9 100644
--- a/spec/unit/lib/temporal/metadata/workflow_spec.rb
+++ b/spec/unit/lib/temporal/metadata/workflow_spec.rb
@@ -6,6 +6,8 @@
let(:args) { Fabricate(:workflow_metadata) }
it 'sets the attributes' do
+ expect(subject.namespace).to eq(args.namespace)
+ expect(subject.id).to eq(args.id)
expect(subject.name).to eq(args.name)
expect(subject.run_id).to eq(args.run_id)
expect(subject.attempt).to eq(args.attempt)
@@ -25,9 +27,14 @@
it 'returns a hash' do
expect(subject.to_h).to eq({
+ 'namespace' => subject.namespace,
+ 'workflow_id' => subject.id,
'attempt' => subject.attempt,
'workflow_name' => subject.name,
- 'workflow_run_id' => subject.run_id
+ 'workflow_run_id' => subject.run_id,
+ 'task_queue' => subject.task_queue,
+ 'run_started_at' => subject.run_started_at.to_f,
+ 'memo' => subject.memo,
})
end
end
diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb
index 18134e26..cd21fb76 100644
--- a/spec/unit/lib/temporal/metadata_spec.rb
+++ b/spec/unit/lib/temporal/metadata_spec.rb
@@ -1,80 +1,76 @@
require 'temporal/metadata'
describe Temporal::Metadata do
- describe '.generate' do
- subject { described_class.generate(type, data, namespace) }
+ describe '.generate_activity_metadata' do
+ subject { described_class.generate_activity_metadata(data, namespace) }
- context 'with activity type' do
- let(:type) { described_class::ACTIVITY_TYPE }
- let(:data) { Fabricate(:api_activity_task) }
- let(:namespace) { 'test-namespace' }
+ let(:data) { Fabricate(:api_activity_task) }
+ let(:namespace) { 'test-namespace' }
- it 'generates metadata' do
- expect(subject.namespace).to eq(namespace)
- expect(subject.id).to eq(data.activity_id)
- expect(subject.name).to eq(data.activity_type.name)
- expect(subject.task_token).to eq(data.task_token)
- expect(subject.attempt).to eq(data.attempt)
- expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id)
- expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id)
- expect(subject.workflow_name).to eq(data.workflow_type.name)
- expect(subject.headers).to eq({})
- end
-
- context 'with headers' do
- let(:data) { Fabricate(:api_activity_task, headers: { 'Foo' => 'Bar' }) }
-
- it 'assigns headers' do
- expect(subject.headers).to eq('Foo' => 'Bar')
- end
- end
+ it 'generates metadata' do
+ expect(subject.namespace).to eq(namespace)
+ expect(subject.id).to eq(data.activity_id)
+ expect(subject.name).to eq(data.activity_type.name)
+ expect(subject.task_token).to eq(data.task_token)
+ expect(subject.attempt).to eq(data.attempt)
+ expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id)
+ expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id)
+ expect(subject.workflow_name).to eq(data.workflow_type.name)
+ expect(subject.headers).to eq({})
end
- context 'with workflow task type' do
- let(:type) { described_class::WORKFLOW_TASK_TYPE }
- let(:data) { Fabricate(:api_workflow_task) }
- let(:namespace) { 'test-namespace' }
+ context 'with headers' do
+ let(:data) { Fabricate(:api_activity_task, headers: { 'Foo' => 'Bar' }) }
- it 'generates metadata' do
- expect(subject.namespace).to eq(namespace)
- expect(subject.id).to eq(data.started_event_id)
- expect(subject.task_token).to eq(data.task_token)
- expect(subject.attempt).to eq(data.attempt)
- expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id)
- expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id)
- expect(subject.workflow_name).to eq(data.workflow_type.name)
+ it 'assigns headers' do
+ expect(subject.headers).to eq('Foo' => 'Bar')
end
end
+ end
- context 'with workflow type' do
- let(:type) { described_class::WORKFLOW_TYPE }
- let(:data) { Fabricate(:api_workflow_execution_started_event_attributes) }
- let(:namespace) { nil }
+ describe '.generate_workflow_task_metadata' do
+ subject { described_class.generate_workflow_task_metadata(data, namespace) }
- it 'generates metadata' do
- expect(subject.run_id).to eq(data.original_execution_run_id)
- expect(subject.attempt).to eq(data.attempt)
- expect(subject.headers).to eq({})
- end
+ let(:data) { Fabricate(:api_workflow_task) }
+ let(:namespace) { 'test-namespace' }
- context 'with headers' do
- let(:data) do
- Fabricate(:api_workflow_execution_started_event_attributes, headers: { 'Foo' => 'Bar' })
- end
+ it 'generates metadata' do
+ expect(subject.namespace).to eq(namespace)
+ expect(subject.id).to eq(data.started_event_id)
+ expect(subject.task_token).to eq(data.task_token)
+ expect(subject.attempt).to eq(data.attempt)
+ expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id)
+ expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id)
+ expect(subject.workflow_name).to eq(data.workflow_type.name)
+ end
+ end
- it 'assigns headers' do
- expect(subject.headers).to eq('Foo' => 'Bar')
- end
- end
+ context '.generate_workflow_metadata' do
+ subject { described_class.generate_workflow_metadata(event, task_metadata) }
+ let(:event) { Temporal::Workflow::History::Event.new(Fabricate(:api_workflow_execution_started_event)) }
+ let(:task_metadata) { Fabricate(:workflow_task_metadata) }
+ let(:namespace) { nil }
+
+ it 'generates metadata' do
+ expect(subject.run_id).to eq(event.attributes.original_execution_run_id)
+ expect(subject.id).to eq(task_metadata.workflow_id)
+ expect(subject.attempt).to eq(event.attributes.attempt)
+ expect(subject.headers).to eq({})
+ expect(subject.memo).to eq({})
+ expect(subject.namespace).to eq(task_metadata.namespace)
+ expect(subject.task_queue).to eq(event.attributes.task_queue.name)
+ expect(subject.run_started_at).to eq(event.timestamp)
end
- context 'with unknown type' do
- let(:type) { :unknown }
- let(:data) { nil }
- let(:namespace) { nil }
+ context 'with headers' do
+ let(:event) do
+ Temporal::Workflow::History::Event.new(
+ Fabricate(:api_workflow_execution_started_event, headers: { 'Foo' => 'Bar' })
+ )
+ end
- it 'raises' do
- expect { subject }.to raise_error(Temporal::InternalError, 'Unsupported metadata type')
+ it 'assigns headers' do
+ expect(subject.headers).to eq('Foo' => 'Bar')
end
end
end
diff --git a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb
index 29b4b2a3..9397ef88 100644
--- a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb
+++ b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb
@@ -6,13 +6,24 @@
let(:workflow_id) { 'workflow_id_1' }
let(:run_id) { 'run_id_1' }
let(:execution) { Temporal::Testing::WorkflowExecution.new }
+ let(:task_queue) { 'my_test_queue' }
let(:workflow_context) do
Temporal::Testing::LocalWorkflowContext.new(
execution,
workflow_id,
run_id,
[],
- Temporal::Metadata::Workflow.new(name: workflow_id, run_id: run_id, attempt: 1)
+ Temporal::Metadata::Workflow.new(
+ namespace: 'ruby-samples',
+ id: workflow_id,
+ name: 'HelloWorldWorkflow',
+ run_id: run_id,
+ attempt: 1,
+ task_queue: task_queue,
+ headers: {},
+ run_started_at: Time.now,
+ memo: {},
+ )
)
end
let(:async_token) do
@@ -120,10 +131,69 @@ def execute
result = workflow_context.execute_activity!(TestActivity)
expect(result).to eq('ok')
end
+
+ it 'can heartbeat' do
+ # Heartbeat doesn't do anything in local mode, but at least it can be called.
+ workflow_context.execute_activity!(TestHeartbeatingActivity)
+ end
end
- it 'can heartbeat' do
- # Heartbeat doesn't do anything in local mode, but at least it can be called.
- workflow_context.execute_activity!(TestHeartbeatingActivity)
+ describe '#wait_for' do
+ it 'await unblocks once condition changes' do
+ can_continue = false
+ exited = false
+ fiber = Fiber.new do
+ workflow_context.wait_for do
+ can_continue
+ end
+
+ exited = true
+ end
+
+ fiber.resume # start running
+ expect(exited).to eq(false)
+
+ can_continue = true # change condition
+ fiber.resume # resume running after the Fiber.yield done in context.await
+ expect(exited).to eq(true)
+ end
+
+ it 'condition or future unblocks' do
+ exited = false
+
+ future = workflow_context.execute_activity(TestAsyncActivity)
+
+ fiber = Fiber.new do
+ workflow_context.wait_for(future) do
+ false
+ end
+
+ exited = true
+ end
+
+ fiber.resume # start running
+ expect(exited).to eq(false)
+
+ execution.complete_activity(async_token, 'async_ok')
+
+ fiber.resume # resume running after the Fiber.yield done in context.await
+ expect(exited).to eq(true)
+ end
+
+ it 'any future unblocks' do
+ exited = false
+
+ async_future = workflow_context.execute_activity(TestAsyncActivity)
+ future = workflow_context.execute_activity(TestActivity)
+ future.wait
+
+ fiber = Fiber.new do
+ workflow_context.wait_for(future, async_future)
+ exited = true
+ end
+
+ fiber.resume # start running
+ expect(exited).to eq(true)
+ end
end
end
diff --git a/spec/unit/lib/temporal/testing/temporal_override_spec.rb b/spec/unit/lib/temporal/testing/temporal_override_spec.rb
index 09e4eeee..42d72a9c 100644
--- a/spec/unit/lib/temporal/testing/temporal_override_spec.rb
+++ b/spec/unit/lib/temporal/testing/temporal_override_spec.rb
@@ -136,6 +136,14 @@ def execute
.with(an_instance_of(Temporal::Testing::LocalWorkflowContext))
end
+ it 'explicitly does not support staring a workflow with a signal' do
+ expect {
+ client.start_workflow(TestTemporalOverrideWorkflow, options: { signal_name: 'breakme' })
+ }.to raise_error(NotImplementedError) do |e|
+ expect(e.message).to eql("Signals are not available when Temporal::Testing.local! is on")
+ end
+ end
+
describe 'execution control' do
subject do
client.start_workflow(
diff --git a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb
index d5e008f8..43ccc8fc 100644
--- a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb
+++ b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb
@@ -1,13 +1,17 @@
require 'temporal/workflow/dispatcher'
+require 'temporal/workflow/history/event_target'
describe Temporal::Workflow::Dispatcher do
+ let(:target) { Temporal::Workflow::History::EventTarget.new(1, Temporal::Workflow::History::EventTarget::ACTIVITY_TYPE) }
+ let(:other_target) { Temporal::Workflow::History::EventTarget.new(2, Temporal::Workflow::History::EventTarget::TIMER_TYPE) }
+
describe '#register_handler' do
it 'stores a given handler against the target' do
block = -> { 'handler body' }
- subject.register_handler('target', 'signaled', &block)
+ subject.register_handler(target, 'signaled', &block)
- expect(subject.send(:handlers)).to include('target' => [['signaled', block]])
+ expect(subject.send(:handlers)).to include(target => [['signaled', block]])
end
end
@@ -22,14 +26,14 @@
allow(handler).to receive(:call)
end
- subject.register_handler('target', 'completed', &handler_1)
- subject.register_handler('other_target', 'completed', &handler_2)
- subject.register_handler('target', 'failed', &handler_3)
- subject.register_handler('target', 'completed', &handler_4)
+ subject.register_handler(target, 'completed', &handler_1)
+ subject.register_handler(other_target, 'completed', &handler_2)
+ subject.register_handler(target, 'failed', &handler_3)
+ subject.register_handler(target, 'completed', &handler_4)
end
it 'calls all matching handlers in the original order' do
- subject.dispatch('target', 'completed')
+ subject.dispatch(target, 'completed')
expect(handler_1).to have_received(:call).ordered
expect(handler_4).to have_received(:call).ordered
@@ -39,7 +43,7 @@
end
it 'passes given arguments to the handlers' do
- subject.dispatch('target', 'failed', ['TIME_OUT', 'Exceeded execution time'])
+ subject.dispatch(target, 'failed', ['TIME_OUT', 'Exceeded execution time'])
expect(handler_3).to have_received(:call).with('TIME_OUT', 'Exceeded execution time')
@@ -54,15 +58,36 @@
before do
allow(handler_5).to receive(:call)
- subject.register_handler('target', described_class::WILDCARD, &handler_5)
+ subject.register_handler(target, described_class::WILDCARD, &handler_5)
end
it 'calls the handler' do
- subject.dispatch('target', 'completed')
+ subject.dispatch(target, 'completed')
expect(handler_5).to have_received(:call)
end
+ end
+
+ context 'with TARGET_WILDCARD target handler' do
+ let(:handler_6) { -> { 'sixth block' } }
+ before do
+ allow(handler_6).to receive(:call)
+
+ subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6)
+ end
+ it 'calls the handler' do
+ subject.dispatch(target, 'completed')
+
+ # Target handlers still invoked
+ expect(handler_1).to have_received(:call).ordered
+ expect(handler_4).to have_received(:call).ordered
+ expect(handler_6).to have_received(:call).ordered
+ end
+
+ it 'TARGET_WILDCARD can be compared to an EventTarget object' do
+ expect(target.eql?(described_class::TARGET_WILDCARD)).to be(false)
+ end
end
end
end
diff --git a/spec/unit/lib/temporal/workflow/execution_info_spec.rb b/spec/unit/lib/temporal/workflow/execution_info_spec.rb
index fbac5ee0..a064e8ba 100644
--- a/spec/unit/lib/temporal/workflow/execution_info_spec.rb
+++ b/spec/unit/lib/temporal/workflow/execution_info_spec.rb
@@ -14,6 +14,7 @@
expect(subject.close_time).to be_a(Time)
expect(subject.status).to eq(:COMPLETED)
expect(subject.history_length).to eq(api_info.history_length)
+ expect(subject.memo).to eq({ 'foo' => 'bar' })
end
it 'freezes the info' do
diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb
new file mode 100644
index 00000000..a74c3387
--- /dev/null
+++ b/spec/unit/lib/temporal/workflow/executor_spec.rb
@@ -0,0 +1,81 @@
+require 'temporal/workflow/executor'
+require 'temporal/workflow/history'
+require 'temporal/workflow'
+
+describe Temporal::Workflow::Executor do
+ subject { described_class.new(workflow, history, workflow_metadata, config) }
+
+ let(:workflow_started_event) { Fabricate(:api_workflow_execution_started_event, event_id: 1) }
+ let(:history) do
+ Temporal::Workflow::History.new([
+ workflow_started_event,
+ Fabricate(:api_workflow_task_scheduled_event, event_id: 2),
+ Fabricate(:api_workflow_task_started_event, event_id: 3),
+ Fabricate(:api_workflow_task_completed_event, event_id: 4)
+ ])
+ end
+ let(:workflow) { TestWorkflow }
+ let(:workflow_metadata) { Fabricate(:workflow_metadata) }
+ let(:config) { Temporal::Configuration.new }
+
+ class TestWorkflow < Temporal::Workflow
+ def execute
+ 'test'
+ end
+ end
+
+ describe '#run' do
+ it 'runs a workflow' do
+ allow(workflow).to receive(:execute_in_context).and_call_original
+
+ subject.run
+
+ expect(workflow)
+ .to have_received(:execute_in_context)
+ .with(
+ an_instance_of(Temporal::Workflow::Context),
+ nil
+ )
+ end
+
+ it 'returns a complete workflow decision' do
+ decisions = subject.run
+
+ expect(decisions.length).to eq(1)
+
+ decision_id, decision = decisions.first
+ expect(decision_id).to eq(history.events.length + 1)
+ expect(decision).to be_an_instance_of(Temporal::Workflow::Command::CompleteWorkflow)
+ expect(decision.result).to eq('test')
+ end
+
+ it 'generates workflow metadata' do
+ allow(Temporal::Metadata::Workflow).to receive(:new).and_call_original
+ payload = Temporal::Api::Common::V1::Payload.new(
+ metadata: { 'encoding' => 'json/plain' },
+ data: '"bar"'.b
+ )
+ header =
+ Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload, { 'Foo' => payload })
+ workflow_started_event.workflow_execution_started_event_attributes.header =
+ Fabricate(:api_header, fields: header)
+
+ subject.run
+
+ event_attributes = workflow_started_event.workflow_execution_started_event_attributes
+ expect(Temporal::Metadata::Workflow)
+ .to have_received(:new)
+ .with(
+ namespace: workflow_metadata.namespace,
+ id: workflow_metadata.workflow_id,
+ name: event_attributes.workflow_type.name,
+ run_id: event_attributes.original_execution_run_id,
+ attempt: event_attributes.attempt,
+ task_queue: event_attributes.task_queue.name,
+ run_started_at: workflow_started_event.event_time.to_time,
+ memo: {},
+ headers: {'Foo' => 'bar'}
+ )
+ end
+ end
+end
\ No newline at end of file
diff --git a/spec/unit/lib/temporal/workflow/history/event_target_spec.rb b/spec/unit/lib/temporal/workflow/history/event_target_spec.rb
index 717c4572..2f2c80bd 100644
--- a/spec/unit/lib/temporal/workflow/history/event_target_spec.rb
+++ b/spec/unit/lib/temporal/workflow/history/event_target_spec.rb
@@ -21,5 +21,21 @@
expect(subject.type).to eq(described_class::CANCEL_TIMER_REQUEST_TYPE)
end
end
+
+ context 'when event is ACTIVITY_CANCELED' do
+ let(:raw_event) { Fabricate(:api_activity_task_canceled_event) }
+
+ it 'sets type to activity' do
+ expect(subject.type).to eq(described_class::ACTIVITY_TYPE)
+ end
+ end
+
+ context 'when event is ACTIVITY_TASK_CANCEL_REQUESTED' do
+ let(:raw_event) { Fabricate(:api_activity_task_cancel_requested_event) }
+
+ it 'sets type to cancel_activity_request' do
+ expect(subject.type).to eq(described_class::CANCEL_ACTIVITY_REQUEST_TYPE)
+ end
+ end
end
end
diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb
index d1537bcc..695b3da6 100644
--- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb
+++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb
@@ -44,7 +44,7 @@
reported_error = nil
reported_metadata = nil
- Temporal.configuration.on_error do |error, metadata: nil|
+ config.on_error do |error, metadata: nil|
reported_error = error
reported_metadata = metadata
end
@@ -154,7 +154,7 @@
reported_error = nil
reported_metadata = nil
- Temporal.configuration.on_error do |error, metadata: nil|
+ config.on_error do |error, metadata: nil|
reported_error = error
reported_metadata = metadata
end
diff --git a/spec/unit/lib/temporal_spec.rb b/spec/unit/lib/temporal_spec.rb
index 0d9daefd..47ccd73d 100644
--- a/spec/unit/lib/temporal_spec.rb
+++ b/spec/unit/lib/temporal_spec.rb
@@ -28,6 +28,10 @@
describe '.register_namespace' do
it_behaves_like 'a forwarded method', :register_namespace, 'test-namespace', 'This is a test namespace'
end
+
+ describe '.describe_namespace' do
+ it_behaves_like 'a forwarded method', :describe_namespace, 'test-namespace'
+ end
describe '.signal_workflow' do
it_behaves_like 'a forwarded method', :signal_workflow, 'TestWorkflow', 'TST_SIGNAL', 'x', 'y'
diff --git a/temporal.gemspec b/temporal.gemspec
index 59ef1192..c8589c9e 100644
--- a/temporal.gemspec
+++ b/temporal.gemspec
@@ -21,4 +21,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rspec'
spec.add_development_dependency 'fabrication'
spec.add_development_dependency 'grpc-tools'
+ spec.add_development_dependency 'yard'
end