diff --git a/README.md b/README.md index cc8b0441..4d59d255 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Ruby worker for Temporal +# Ruby SDK for Temporal [![Coverage Status](https://coveralls.io/repos/github/coinbase/temporal-ruby/badge.svg?branch=master)](https://coveralls.io/github/coinbase/temporal-ruby?branch=master) @@ -6,7 +6,7 @@ A pure Ruby library for defining and running Temporal workflows and activities. -To find more about Temporal please visit . +To find more about Temporal itself please visit . ## Getting Started diff --git a/examples/.env b/examples/.env new file mode 100644 index 00000000..d15cca49 --- /dev/null +++ b/examples/.env @@ -0,0 +1 @@ +COMPOSE_PROJECT_NAME=temporal-ruby-examples diff --git a/examples/bin/worker b/examples/bin/worker index 65828dfa..14752ede 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -31,6 +31,7 @@ worker.register_workflow(HelloWorldWorkflow) worker.register_workflow(LocalHelloWorldWorkflow) worker.register_workflow(LongWorkflow) worker.register_workflow(LoopWorkflow) +worker.register_workflow(MetadataWorkflow) worker.register_workflow(ParentWorkflow) worker.register_workflow(ProcessFileWorkflow) worker.register_workflow(QuickTimeoutWorkflow) @@ -39,9 +40,11 @@ worker.register_workflow(ReleaseWorkflow) worker.register_workflow(ResultWorkflow) worker.register_workflow(SerialHelloWorldWorkflow) worker.register_workflow(SideEffectWorkflow) +worker.register_workflow(SignalWithStartWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) +worker.register_workflow(WaitForWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/activity_cancellation_spec.rb b/examples/spec/integration/activity_cancellation_spec.rb new file mode 100644 index 00000000..0d551a19 --- /dev/null +++ b/examples/spec/integration/activity_cancellation_spec.rb @@ -0,0 +1,39 @@ +require 'workflows/long_workflow' + +describe 'Activity cancellation' do + let(:workflow_id) { SecureRandom.uuid } + + it 'cancels a running activity' do + run_id = Temporal.start_workflow(LongWorkflow, options: { workflow_id: workflow_id }) + + # Signal workflow after starting, allowing it to schedule the first activity + sleep 0.5 + Temporal.signal_workflow(LongWorkflow, :CANCEL, workflow_id, run_id) + + result = Temporal.await_workflow_result( + LongWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result).to be_a(LongRunningActivity::Canceled) + expect(result.message).to eq('cancel activity request received') + end + + it 'cancels a non-started activity' do + # Workflow is started with a signal which will cancel an activity before it has started + run_id = Temporal.start_workflow(LongWorkflow, options: { + workflow_id: workflow_id, + signal_name: :CANCEL + }) + + result = Temporal.await_workflow_result( + LongWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result).to be_a(Temporal::ActivityCanceled) + expect(result.message).to eq('ACTIVITY_ID_NOT_STARTED') + end +end diff --git a/examples/spec/integration/await_workflow_result_spec.rb b/examples/spec/integration/await_workflow_result_spec.rb index 3b4f1e77..bf48b7b1 100644 --- a/examples/spec/integration/await_workflow_result_spec.rb +++ b/examples/spec/integration/await_workflow_result_spec.rb @@ -95,7 +95,9 @@ run_id = Temporal.start_workflow( LoopWorkflow, 2, # it continues as new if this arg is > 1 - { options: { workflow_id: workflow_id } }, + options: { + workflow_id: workflow_id, + }, ) expect do diff --git a/examples/spec/integration/continue_as_new_spec.rb b/examples/spec/integration/continue_as_new_spec.rb new file mode 100644 index 00000000..74b0771e --- /dev/null +++ b/examples/spec/integration/continue_as_new_spec.rb @@ -0,0 +1,49 @@ +require 'workflows/loop_workflow' + +describe LoopWorkflow do + it 'workflow continues as new into a new run' do + workflow_id = SecureRandom.uuid + memo = { + 'my-memo' => 'foo', + } + headers = { + 'my-header' => 'bar', + } + run_id = Temporal.start_workflow( + LoopWorkflow, + 2, # it continues as new if this arg is > 1 + options: { + workflow_id: workflow_id, + memo: memo, + headers: headers, + }, + ) + + # First run will throw because it continued as new + next_run_id = nil + expect do + Temporal.await_workflow_result( + LoopWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + end.to raise_error(Temporal::WorkflowRunContinuedAsNew) do |error| + next_run_id = error.new_run_id + end + + expect(next_run_id).to_not eq(nil) + + # Second run will not throw because it returns rather than continues as new. + final_result = Temporal.await_workflow_result( + LoopWorkflow, + workflow_id: workflow_id, + run_id: next_run_id, + ) + + expect(final_result[:count]).to eq(1) + + # memo and headers should be copied to the next run automatically + expect(final_result[:memo]).to eq(memo) + expect(final_result[:headers]).to eq(headers) + end +end diff --git a/examples/spec/integration/describe_namespace_spec.rb b/examples/spec/integration/describe_namespace_spec.rb new file mode 100644 index 00000000..86501f7b --- /dev/null +++ b/examples/spec/integration/describe_namespace_spec.rb @@ -0,0 +1,16 @@ +require 'temporal/errors' + +describe 'Temporal.describe_namespace' do + it 'returns a value' do + description = 'Namespace for temporal-ruby integration test' + begin + Temporal.register_namespace('a_test_namespace', description) + rescue Temporal::NamespaceAlreadyExistsFailure + end + result = Temporal.describe_namespace('a_test_namespace') + expect(result).to be_an_instance_of(Temporal::Api::WorkflowService::V1::DescribeNamespaceResponse) + expect(result.namespace_info.name).to eq('a_test_namespace') + expect(result.namespace_info.state).to eq(:NAMESPACE_STATE_REGISTERED) + expect(result.namespace_info.description).to eq(description) + end +end diff --git a/examples/spec/integration/metadata_workflow_spec.rb b/examples/spec/integration/metadata_workflow_spec.rb new file mode 100644 index 00000000..ac828e01 --- /dev/null +++ b/examples/spec/integration/metadata_workflow_spec.rb @@ -0,0 +1,91 @@ +require 'workflows/metadata_workflow' + +describe MetadataWorkflow do + subject { described_class } + + it 'gets task queue from running workflow' do + workflow_id = 'task-queue-' + SecureRandom.uuid + run_id = Temporal.start_workflow( + subject, + options: { workflow_id: workflow_id } + ) + + actual_result = Temporal.await_workflow_result( + subject, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(actual_result.task_queue).to eq(Temporal.configuration.task_queue) + end + + it 'workflow can retrieve its headers' do + workflow_id = 'header_test_wf-' + SecureRandom.uuid + + run_id = Temporal.start_workflow( + MetadataWorkflow, + options: { + workflow_id: workflow_id, + headers: { 'foo' => 'bar' }, + } + ) + + actual_result = Temporal.await_workflow_result( + MetadataWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + expect(actual_result.headers).to eq({ 'foo' => 'bar' }) + end + + it 'workflow can retrieve its run started at' do + workflow_id = 'started_at_test_wf-' + SecureRandom.uuid + + run_id = Temporal.start_workflow( + MetadataWorkflow, + options: { workflow_id: workflow_id } + ) + + actual_result = Temporal.await_workflow_result( + MetadataWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + expect(Time.now - actual_result.run_started_at).to be_between(0, 30) + end + + it 'gets memo from workflow execution info' do + workflow_id = 'memo_execution_test_wf-' + SecureRandom.uuid + run_id = Temporal.start_workflow(subject, options: { workflow_id: workflow_id, memo: { 'foo' => 'bar' } }) + + actual_result = Temporal.await_workflow_result( + subject, + workflow_id: workflow_id, + run_id: run_id, + ) + expect(actual_result.memo['foo']).to eq('bar') + + expect(Temporal.fetch_workflow_execution_info( + 'ruby-samples', workflow_id, nil + ).memo).to eq({ 'foo' => 'bar' }) + end + + it 'gets memo from workflow context with no memo' do + workflow_id = 'memo_context_no_memo_test_wf-' + SecureRandom.uuid + + run_id = Temporal.start_workflow( + subject, + options: { workflow_id: workflow_id } + ) + + actual_result = Temporal.await_workflow_result( + subject, + workflow_id: workflow_id, + run_id: run_id, + ) + expect(actual_result.memo).to eq({}) + expect(Temporal.fetch_workflow_execution_info( + 'ruby-samples', workflow_id, nil + ).memo).to eq({}) + end +end diff --git a/examples/spec/integration/signal_with_start_spec.rb b/examples/spec/integration/signal_with_start_spec.rb new file mode 100644 index 00000000..4af4f6f1 --- /dev/null +++ b/examples/spec/integration/signal_with_start_spec.rb @@ -0,0 +1,75 @@ +require 'workflows/signal_with_start_workflow' + +describe 'signal with start' do + + it 'signals at workflow start time' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + SignalWithStartWorkflow, + 'signal_name', + 0.1, + options: { + workflow_id: workflow_id, + signal_name: 'signal_name', + signal_input: 'expected value', + } + ) + + result = Temporal.await_workflow_result( + SignalWithStartWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result).to eq('expected value') # the workflow should return the signal value + end + + it 'signals at workflow start time with name only' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + SignalWithStartWorkflow, + 'signal_name', + 0.1, + options: { + workflow_id: workflow_id, + signal_name: 'signal_name', + } + ) + + result = Temporal.await_workflow_result( + SignalWithStartWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result).to eq(nil) # the workflow should return the signal value + end + + it 'does not launch a new workflow when signaling a running workflow through signal_with_start' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + SignalWithStartWorkflow, + 'signal_name', + 10, + options: { + workflow_id: workflow_id, + signal_name: 'signal_name', + signal_input: 'expected value', + } + ) + + second_run_id = Temporal.start_workflow( + SignalWithStartWorkflow, + 'signal_name', + 0.1, + options: { + workflow_id: workflow_id, + signal_name: 'signal_name', + signal_input: 'expected value', + } + ) + + # If the run ids are the same, then we didn't start a new workflow + expect(second_run_id).to eq(run_id) + end +end diff --git a/examples/spec/integration/wait_for_workflow_spec.rb b/examples/spec/integration/wait_for_workflow_spec.rb new file mode 100644 index 00000000..d5feeee6 --- /dev/null +++ b/examples/spec/integration/wait_for_workflow_spec.rb @@ -0,0 +1,28 @@ +require 'workflows/wait_for_workflow' + +describe WaitForWorkflow do + + it 'signals at workflow start time' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + WaitForWorkflow, + 10, # number of echo activities to run + 2, # max activity parallelism + 'signal_name', + options: { workflow_id: workflow_id } + ) + + Temporal.signal_workflow(WaitForWorkflow, 'signal_name', workflow_id, run_id) + + result = Temporal.await_workflow_result( + WaitForWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result.length).to eq(3) + expect(result[:signal]).to eq(true) + expect(result[:timer]).to eq(true) + expect(result[:activity]).to eq(true) + end +end \ No newline at end of file diff --git a/examples/workflows/long_workflow.rb b/examples/workflows/long_workflow.rb index 3682fad9..c4b4682f 100644 --- a/examples/workflows/long_workflow.rb +++ b/examples/workflows/long_workflow.rb @@ -9,6 +9,6 @@ def execute(cycles = 10, interval = 1) future.cancel end - future.wait + future.get end end diff --git a/examples/workflows/loop_workflow.rb b/examples/workflows/loop_workflow.rb index 5b1f5bfd..b99408f4 100644 --- a/examples/workflows/loop_workflow.rb +++ b/examples/workflows/loop_workflow.rb @@ -8,6 +8,10 @@ def execute(count) return workflow.continue_as_new(count - 1) end - return count + return { + count: count, + memo: workflow.metadata.memo, + headers: workflow.metadata.headers, + } end end diff --git a/examples/workflows/metadata_workflow.rb b/examples/workflows/metadata_workflow.rb new file mode 100644 index 00000000..62f61703 --- /dev/null +++ b/examples/workflows/metadata_workflow.rb @@ -0,0 +1,5 @@ +class MetadataWorkflow < Temporal::Workflow + def execute + workflow.metadata + end +end diff --git a/examples/workflows/signal_with_start_workflow.rb b/examples/workflows/signal_with_start_workflow.rb new file mode 100644 index 00000000..dbcb186a --- /dev/null +++ b/examples/workflows/signal_with_start_workflow.rb @@ -0,0 +1,16 @@ +class SignalWithStartWorkflow < Temporal::Workflow + + def execute(expected_signal, sleep_for) + received = 'no signal received' + + workflow.on_signal do |signal, input| + if signal == expected_signal + received = input + end + end + + # Do something to get descheduled so the signal handler has a chance to run + workflow.sleep(sleep_for) + received + end +end diff --git a/examples/workflows/wait_for_workflow.rb b/examples/workflows/wait_for_workflow.rb new file mode 100644 index 00000000..226ac9c4 --- /dev/null +++ b/examples/workflows/wait_for_workflow.rb @@ -0,0 +1,80 @@ +require 'activities/echo_activity' +require 'activities/long_running_activity' + +# This example workflow exercises all three conditions that can change state that is being +# awaited upon: activity completion, sleep completion, signal receieved. +class WaitForWorkflow < Temporal::Workflow + def execute(total_echos, max_echos_at_once, expected_signal) + signals_received = {} + + workflow.on_signal do |signal, input| + signals_received[signal] = input + end + + workflow.wait_for do + workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") + signals_received.key?(expected_signal) + end + + # Run an activity but with a max time limit by starting a timer. This activity + # will not complete before the timer, which may result in a failed activity task after the + # workflow is completed. + long_running_future = LongRunningActivity.execute(15, 0.1) + timeout_timer = workflow.start_timer(1) + workflow.wait_for(timeout_timer, long_running_future) + + timer_beat_activity = timeout_timer.finished? && !long_running_future.finished? + + # This should not wait further. The first future has already finished, and therefore + # the second one should not be awaited upon. + long_timeout_timer = workflow.start_timer(15) + workflow.wait_for(timeout_timer, long_timeout_timer) + raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? + + block_called = false + workflow.wait_for(timeout_timer) do + # This should never be called because the timeout_timer future was already + # finished before the wait was even called. + block_called = true + end + raise 'Block should not have been called' if block_called + + workflow.wait_for(long_timeout_timer) do + # This condition will immediately be true and not result in any waiting or dispatching + true + end + raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? + + activity_futures = {} + echos_completed = 0 + + total_echos.times do |i| + workflow.wait_for do + workflow.logger.info("Activities in flight #{activity_futures.length}") + # Pause workflow until the number of active activity futures is less than 2. This + # will throttle new activities from being started, guaranteeing that only two of these + # activities are running at once. + activity_futures.length < max_echos_at_once + end + + future = EchoActivity.execute("hi #{i}") + activity_futures[i] = future + + future.done do + activity_futures.delete(i) + echos_completed += 1 + end + end + + workflow.wait_for do + workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}") + activity_futures.empty? + end + + { + signal: signals_received.key?(expected_signal), + timer: timer_beat_activity, + activity: echos_completed == total_echos + } + end +end diff --git a/lib/temporal.rb b/lib/temporal.rb index e580192f..f02d6ec9 100644 --- a/lib/temporal.rb +++ b/lib/temporal.rb @@ -17,6 +17,7 @@ module Temporal :start_workflow, :schedule_workflow, :register_namespace, + :describe_namespace, :signal_workflow, :await_workflow_result, :reset_workflow, diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index c07eba7e..4157f0f3 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -78,7 +78,7 @@ def poll_for_task rescue StandardError => error Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) - Temporal::ErrorHandler.handle(error) + Temporal::ErrorHandler.handle(error, config) nil end diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 173a2bc9..6ad9cefd 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -14,7 +14,7 @@ class TaskProcessor def initialize(task, namespace, activity_lookup, middleware_chain, config) @task = task @namespace = namespace - @metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) + @metadata = Metadata.generate_activity_metadata(task, namespace) @task_token = task.task_token @activity_name = task.activity_type.name @activity_class = activity_lookup.find(activity_name) @@ -41,7 +41,7 @@ def process # Do not complete asynchronous activities, these should be completed manually respond_completed(result) unless context.async? rescue StandardError, ScriptError => error - Temporal::ErrorHandler.handle(error, metadata: metadata) + Temporal::ErrorHandler.handle(error, config, metadata: metadata) respond_failed(error) ensure @@ -76,7 +76,7 @@ def respond_completed(result) rescue StandardError => error Temporal.logger.error("Unable to complete Activity", metadata.to_h.merge(error: error.inspect)) - Temporal::ErrorHandler.handle(error, metadata: metadata) + Temporal::ErrorHandler.handle(error, config, metadata: metadata) end def respond_failed(error) @@ -90,7 +90,7 @@ def respond_failed(error) rescue StandardError => error Temporal.logger.error("Unable to fail Activity task", metadata.to_h.merge(error: error.inspect)) - Temporal::ErrorHandler.handle(error, metadata: metadata) + Temporal::ErrorHandler.handle(error, config, metadata: metadata) end end end diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index d2380855..2fcea794 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -13,32 +13,95 @@ def initialize(config) @config = config end - def start_workflow(workflow, *input, **args) - options = args.delete(:options) || {} + # Start a workflow with an optional signal + # + # If options[:signal_name] is specified, Temporal will atomically start a new workflow and + # signal it or signal a running workflow (matching a specified options[:workflow_id]) + # + # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class + # is passed, its config (namespace, task_queue, timeouts, etc) will be used + # @param input [any] arguments to be passed to workflow's #execute method + # @param args [Hash] keyword arguments to be passed to workflow's #execute method + # @param options [Hash, nil] optional overrides + # @option options [String] :workflow_id + # @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY + # @option options [String] :name workflow name + # @option options [String] :namespace + # @option options [String] :task_queue + # @option options [String] :signal_name corresponds to the 'signal' argument to signal_workflow. Required if + # options[:signal_input] is specified. + # @option options [String, Array, nil] :signal_input corresponds to the 'input' argument to signal_workflow + # @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options + # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS + # @option options [Hash] :headers + # + # @return [String] workflow's run ID + def start_workflow(workflow, *input, options: {}, **args) input << args unless args.empty? + signal_name = options.delete(:signal_name) + signal_input = options.delete(:signal_input) + execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) workflow_id = options[:workflow_id] || SecureRandom.uuid - response = connection.start_workflow_execution( - namespace: execution_options.namespace, - workflow_id: workflow_id, - workflow_name: execution_options.name, - task_queue: execution_options.task_queue, - input: input, - execution_timeout: execution_options.timeouts[:execution], - # If unspecified, individual runs should have the full time for the execution (which includes retries). - run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution], - task_timeout: execution_options.timeouts[:task], - workflow_id_reuse_policy: options[:workflow_id_reuse_policy], - headers: execution_options.headers - ) + if signal_name.nil? && signal_input.nil? + response = connection.start_workflow_execution( + namespace: execution_options.namespace, + workflow_id: workflow_id, + workflow_name: execution_options.name, + task_queue: execution_options.task_queue, + input: input, + execution_timeout: execution_options.timeouts[:execution], + # If unspecified, individual runs should have the full time for the execution (which includes retries). + run_timeout: compute_run_timeout(execution_options), + task_timeout: execution_options.timeouts[:task], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + headers: execution_options.headers, + memo: execution_options.memo, + ) + else + raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil? + + response = connection.signal_with_start_workflow_execution( + namespace: execution_options.namespace, + workflow_id: workflow_id, + workflow_name: execution_options.name, + task_queue: execution_options.task_queue, + input: input, + execution_timeout: execution_options.timeouts[:execution], + run_timeout: compute_run_timeout(execution_options), + task_timeout: execution_options.timeouts[:task], + workflow_id_reuse_policy: options[:workflow_id_reuse_policy], + headers: execution_options.headers, + memo: execution_options.memo, + signal_name: signal_name, + signal_input: signal_input + ) + end response.run_id end - def schedule_workflow(workflow, cron_schedule, *input, **args) - options = args.delete(:options) || {} + # Schedule a workflow for a periodic cron-like execution + # + # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class + # is passed, its config (namespace, task_queue, timeouts, etc) will be used + # @param cron_schedule [String] a cron-style schedule string + # @param input [any] arguments to be passed to workflow's #execute method + # @param args [Hash] keyword arguments to be pass to workflow's #execute method + # @param options [Hash, nil] optional overrides + # @option options [String] :workflow_id + # @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY + # @option options [String] :name workflow name + # @option options [String] :namespace + # @option options [String] :task_queue + # @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options + # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS + # @option options [Hash] :headers + # + # @return [String] workflow's run ID + def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args) input << args unless args.empty? execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) @@ -54,20 +117,41 @@ def schedule_workflow(workflow, cron_schedule, *input, **args) # Execution timeout is across all scheduled jobs, whereas run is for an individual run. # This default is here for backward compatibility. Certainly, the run timeout shouldn't be higher # than the execution timeout. - run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution], + run_timeout: compute_run_timeout(execution_options), task_timeout: execution_options.timeouts[:task], workflow_id_reuse_policy: options[:workflow_id_reuse_policy], headers: execution_options.headers, - cron_schedule: cron_schedule + cron_schedule: cron_schedule, + memo: execution_options.memo ) response.run_id end + # Register a new Temporal namespace + # + # @param name [String] name of the new namespace + # @param description [String] optional namespace description def register_namespace(name, description = nil) connection.register_namespace(name: name, description: description) end + # Fetches metadata for a namespace. + # @param name [String] name of the namespace + # @return [Hash] info deserialized from Temporal::Api::WorkflowService::V1::DescribeNamespaceResponse + def describe_namespace(name) + connection.describe_namespace(name: name) + end + + # Send a signal to a running workflow + # + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param signal [String] name of the signal to send + # @param workflow_id [String] + # @param run_id [String] + # @param input [String, Array, nil] optional arguments for the signal + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespace: nil) execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) @@ -80,15 +164,22 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac ) end - # Long polls for a workflow to be completed and returns whatever the execute function - # returned. This function times out after 30 seconds and throws Temporal::TimeoutError, - # not to be confused with Temporal::WorkflowTimedOut which reports that the workflow - # itself timed out. - # run_id of nil: await the entire workflow completion. This can span multiple runs - # in the case where the workflow uses continue-as-new. - # timeout: seconds to wait for the result. This cannot be longer than 30 seconds because - # that is the maximum the server supports. - # namespace: if nil, choose the one declared on the Workflow, or the global default + # Long polls for a workflow to be completed and returns workflow's return value. + # + # @note This function times out after 30 seconds and throws Temporal::TimeoutError, + # not to be confused with `Temporal::WorkflowTimedOut` which reports that the workflow + # itself timed out. + # + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param workflow_id [String] + # @param run_id [String, nil] awaits the entire workflow completion when nil. This can span + # multiple runs in the case where the workflow uses continue-as-new. + # @param timeout [Integer, nil] seconds to wait for the result. This cannot be longer than 30 + # seconds because that is the maximum the server supports. + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default + # + # @return workflow's return value def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil) options = namespace ? {namespace: namespace} : {} execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) @@ -135,6 +226,21 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam end end + # Reset a workflow + # + # @note More on resetting a workflow here — + # https://docs.temporal.io/docs/system-tools/tctl/#restart-reset-workflow + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] + # @param strategy [Symbol, nil] one of the Temporal::ResetStrategy values or `nil` when + # passing a workflow_task_id + # @param workflow_task_id [Integer, nil] A specific event ID to reset to. The event has to + # be of a type WorkflowTaskCompleted, WorkflowTaskFailed or WorkflowTaskTimedOut + # @param reason [String] a reset reason to be recorded in workflow's history for reference + # + # @return [String] run_id of the new workflow execution def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_id: nil, reason: 'manual reset') # Pick default strategy for backwards-compatibility strategy ||= :last_workflow_task unless workflow_task_id @@ -157,6 +263,14 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_ response.run_id end + # Terminate a running workflow + # + # @param workflow_id [String] + # @param namespace [String, nil] use a default namespace when `nil` + # @param run_id [String, nil] + # @param reason [String, nil] a termination reason to be recorded in workflow's history + # for reference + # @param details [String, Array, nil] optional details to be stored in history def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil) namespace ||= Temporal.configuration.namespace @@ -169,6 +283,13 @@ def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, de ) end + # Fetch workflow's execution info + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] + # + # @return [Temporal::Workflow::ExecutionInfo] an object containing workflow status and other info def fetch_workflow_execution_info(namespace, workflow_id, run_id) response = connection.describe_workflow_execution( namespace: namespace, @@ -179,6 +300,11 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id) Workflow::ExecutionInfo.generate_from(response.workflow_execution_info) end + # Manually complete an activity + # + # @param async_token [String] an encoded Temporal::Activity::AsyncToken + # @param result [String, Array, nil] activity's return value to be stored in history and + # passed back to a workflow def complete_activity(async_token, result = nil) details = Activity::AsyncToken.decode(async_token) @@ -191,6 +317,11 @@ def complete_activity(async_token, result = nil) ) end + # Manually fail an activity + # + # @param async_token [String] an encoded Temporal::Activity::AsyncToken + # @param exception [Exception] activity's failure exception to be stored in history and + # raised in a workflow def fail_activity(async_token, exception) details = Activity::AsyncToken.decode(async_token) @@ -203,6 +334,13 @@ def fail_activity(async_token, exception) ) end + # Fetch workflow's execution history + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] + # + # @return [Temporal::Workflow::History] workflow's execution history def get_workflow_history(namespace:, workflow_id:, run_id:) history_response = connection.get_workflow_execution_history( namespace: namespace, @@ -226,6 +364,10 @@ def connection @connection ||= Temporal::Connection.generate(config.for_connection) end + def compute_run_timeout(execution_options) + execution_options.timeouts[:run] || execution_options.timeouts[:execution] + end + def find_workflow_task(namespace, workflow_id, run_id, strategy) history = get_workflow_history( namespace: namespace, diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb index 49af8f3f..3be276d2 100644 --- a/lib/temporal/concerns/payloads.rb +++ b/lib/temporal/concerns/payloads.rb @@ -21,6 +21,10 @@ def from_signal_payloads(payloads) from_payloads(payloads)&.first end + def from_payload_map(payload_map) + payload_map.map { |key, value| [key, from_payload(value)] }.to_h + end + def to_payloads(data) payload_converter.to_payloads(data) end @@ -41,6 +45,10 @@ def to_signal_payloads(data) to_payloads([data]) end + def to_payload_map(data) + data.transform_values(&method(:to_payload)) + end + private def payload_converter diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 828c561f..8d36615c 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -14,6 +14,7 @@ class Configuration attr_writer :converter attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers + # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. # We choose an 10-year execution timeout because that's the maximum the cassandra DB supports, # matching the go SDK, see https://github.com/temporalio/sdk-go/blob/d96130dad3d2bc189bc7626543bd5911cc07ff6d/internal/internal_workflow_testsuite.go#L68 @@ -21,9 +22,12 @@ class Configuration execution: 86_400 * 365 * 10, # End-to-end workflow time, including all recurrences if it's scheduled. # Time for a single run, excluding retries. Server defaults to execution timeout; we default here as well to be explicit. run: 86_400 * 365 * 10, - task: 10, # Workflow task processing time + # Workflow task processing time. Workflows should not use the network and should execute very quickly. + task: 10, schedule_to_close: nil, # End-to-end activity time (default: schedule_to_start + start_to_close) - schedule_to_start: 10, # Queue time for an activity + # Max queue time for an activity. Default: none. This is dangerous; most teams don't use. + # See # https://docs.temporal.io/blog/activity-timeouts/#schedule-to-start-timeout + schedule_to_start: nil, start_to_close: 30, # Time spent processing an activity heartbeat: nil # Max time between heartbeats (off by default) }.freeze diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 848d2de0..b2ce0350 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -81,7 +81,8 @@ def start_workflow_execution( task_timeout:, workflow_id_reuse_policy: nil, headers: nil, - cron_schedule: nil + cron_schedule: nil, + memo: nil ) request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new( identity: identity, @@ -99,9 +100,12 @@ def start_workflow_execution( workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporal::Api::Common::V1::Header.new( - fields: headers + fields: to_payload_map(headers || {}) ), - cron_schedule: cron_schedule + cron_schedule: cron_schedule, + memo: Temporal::Api::Common::V1::Memo.new( + fields: to_payload_map(memo || {}) + ) ) if workflow_id_reuse_policy @@ -292,8 +296,66 @@ def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: client.signal_workflow_execution(request) end - def signal_with_start_workflow_execution - raise NotImplementedError + def signal_with_start_workflow_execution( + namespace:, + workflow_id:, + workflow_name:, + task_queue:, + input: nil, + execution_timeout:, + run_timeout:, + task_timeout:, + workflow_id_reuse_policy: nil, + headers: nil, + cron_schedule: nil, + signal_name:, + signal_input:, + memo: nil + ) + proto_header_fields = if headers.nil? + to_payload_map({}) + elsif headers.class == Hash + to_payload_map(headers) + else + # Preserve backward compatability for headers specified using proto objects + warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' + headers + end + + request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new( + identity: identity, + namespace: namespace, + workflow_type: Temporal::Api::Common::V1::WorkflowType.new( + name: workflow_name + ), + workflow_id: workflow_id, + task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( + name: task_queue + ), + input: to_payloads(input), + workflow_execution_timeout: execution_timeout, + workflow_run_timeout: run_timeout, + workflow_task_timeout: task_timeout, + request_id: SecureRandom.uuid, + header: Temporal::Api::Common::V1::Header.new( + fields: proto_header_fields, + ), + cron_schedule: cron_schedule, + signal_name: signal_name, + signal_input: to_signal_payloads(signal_input), + memo: Temporal::Api::Common::V1::Memo.new( + fields: to_payload_map(memo || {}) + ), + ) + + if workflow_id_reuse_policy + policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy] + raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy + + request.workflow_id_reuse_policy = policy + end + + client.signal_with_start_workflow_execution(request) end def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:) diff --git a/lib/temporal/connection/retryer.rb b/lib/temporal/connection/retryer.rb index d70ba3b4..2948f05f 100644 --- a/lib/temporal/connection/retryer.rb +++ b/lib/temporal/connection/retryer.rb @@ -1,3 +1,5 @@ +require 'grpc/errors' + module Temporal module Connection module Retryer @@ -11,15 +13,15 @@ module Retryer # No amount of retrying will help in these cases. def self.do_not_retry_errors [ - GRPC::AlreadyExists, - GRPC::Cancelled, - GRPC::FailedPrecondition, - GRPC::InvalidArgument, + ::GRPC::AlreadyExists, + ::GRPC::Cancelled, + ::GRPC::FailedPrecondition, + ::GRPC::InvalidArgument, # If the activity has timed out, the server will return this and will never accept a retry - GRPC::NotFound, - GRPC::PermissionDenied, - GRPC::Unauthenticated, - GRPC::Unimplemented, + ::GRPC::NotFound, + ::GRPC::PermissionDenied, + ::GRPC::Unauthenticated, + ::GRPC::Unimplemented, ] end diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb index 2d1e588c..357f0008 100644 --- a/lib/temporal/connection/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -19,7 +19,8 @@ def to_proto workflow_run_timeout: object.timeouts[:execution], workflow_task_timeout: object.timeouts[:task], retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, - header: serialize_headers(object.headers) + header: serialize_headers(object.headers), + memo: serialize_memo(object.memo) ) ) end @@ -29,7 +30,13 @@ def to_proto def serialize_headers(headers) return unless headers - Temporal::Api::Common::V1::Header.new(fields: object.headers) + Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + end + + def serialize_memo(memo) + return unless memo + + Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) end end end diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb index 55312e50..5f6b350d 100644 --- a/lib/temporal/connection/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -22,7 +22,8 @@ def to_proto workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, - header: serialize_headers(object.headers) + header: serialize_headers(object.headers), + memo: serialize_memo(object.memo), ) ) end @@ -32,7 +33,13 @@ def to_proto def serialize_headers(headers) return unless headers - Temporal::Api::Common::V1::Header.new(fields: object.headers) + Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + end + + def serialize_memo(memo) + return unless memo + + Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) end end end diff --git a/lib/temporal/error_handler.rb b/lib/temporal/error_handler.rb index 9702edcb..98fca99f 100644 --- a/lib/temporal/error_handler.rb +++ b/lib/temporal/error_handler.rb @@ -1,7 +1,7 @@ module Temporal module ErrorHandler - def self.handle(error, metadata: nil) - Temporal.configuration.error_handlers.each do |handler| + def self.handle(error, configuration, metadata: nil) + configuration.error_handlers.each do |handler| handler.call(error, metadata: metadata) rescue StandardError => e Temporal.logger.error("Error handler failed", { error: e.inspect }) diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb index 5730224c..2e7f4ebe 100644 --- a/lib/temporal/errors.rb +++ b/lib/temporal/errors.rb @@ -19,6 +19,9 @@ class TimeoutError < ClientError; end # with the intent to propagate to a workflow class ActivityException < ClientError; end + # Represents cancellation of a non-started activity + class ActivityCanceled < ActivityException; end + class ActivityNotRegistered < ClientError; end class WorkflowNotRegistered < ClientError; end diff --git a/lib/temporal/execution_options.rb b/lib/temporal/execution_options.rb index 6fcaf371..182c4079 100644 --- a/lib/temporal/execution_options.rb +++ b/lib/temporal/execution_options.rb @@ -3,7 +3,7 @@ module Temporal class ExecutionOptions - attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers + attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo def initialize(object, options, defaults = nil) # Options are treated as overrides and take precedence @@ -13,6 +13,7 @@ def initialize(object, options, defaults = nil) @retry_policy = options[:retry_policy] || {} @timeouts = options[:timeouts] || {} @headers = options[:headers] || {} + @memo = options[:memo] || {} # For Temporal::Workflow and Temporal::Activity use defined values as the next option if has_executable_concern?(object) diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index e39f8da9..df7165ea 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -6,37 +6,11 @@ module Temporal module Metadata - ACTIVITY_TYPE = :activity - WORKFLOW_TASK_TYPE = :workflow_task - WORKFLOW_TYPE = :workflow class << self include Concerns::Payloads - def generate(type, data, namespace = nil) - case type - when ACTIVITY_TYPE - activity_metadata_from(data, namespace) - when WORKFLOW_TASK_TYPE - workflow_task_metadata_from(data, namespace) - when WORKFLOW_TYPE - workflow_metadata_from(data) - else - raise InternalError, 'Unsupported metadata type' - end - end - - private - - def headers(fields) - result = {} - fields.each do |field, payload| - result[field] = from_payload(payload) - end - result - end - - def activity_metadata_from(task, namespace) + def generate_activity_metadata(task, namespace) Metadata::Activity.new( namespace: namespace, id: task.activity_id, @@ -46,12 +20,12 @@ def activity_metadata_from(task, namespace) workflow_run_id: task.workflow_execution.run_id, workflow_id: task.workflow_execution.workflow_id, workflow_name: task.workflow_type.name, - headers: headers(task.header&.fields), + headers: from_payload_map(task.header&.fields || {}), heartbeat_details: from_details_payloads(task.heartbeat_details) ) end - def workflow_task_metadata_from(task, namespace) + def generate_workflow_task_metadata(task, namespace) Metadata::WorkflowTask.new( namespace: namespace, id: task.started_event_id, @@ -63,12 +37,20 @@ def workflow_task_metadata_from(task, namespace) ) end - def workflow_metadata_from(event) + # @param event [Temporal::Workflow::History::Event] Workflow started history event + # @param event [WorkflowExecutionStartedEventAttributes] :attributes + # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata + def generate_workflow_metadata(event, task_metadata) Metadata::Workflow.new( - name: event.workflow_type.name, - run_id: event.original_execution_run_id, - attempt: event.attempt, - headers: headers(event.header&.fields) + name: event.attributes.workflow_type.name, + id: task_metadata.workflow_id, + run_id: event.attributes.original_execution_run_id, + attempt: event.attributes.attempt, + namespace: task_metadata.namespace, + task_queue: event.attributes.task_queue.name, + headers: from_payload_map(event.attributes.header&.fields || {}), + run_started_at: event.timestamp, + memo: from_payload_map(event.attributes.memo&.fields || {}), ) end end diff --git a/lib/temporal/metadata/workflow.rb b/lib/temporal/metadata/workflow.rb index 86d14de4..f4715dde 100644 --- a/lib/temporal/metadata/workflow.rb +++ b/lib/temporal/metadata/workflow.rb @@ -3,13 +3,18 @@ module Temporal module Metadata class Workflow < Base - attr_reader :name, :run_id, :attempt, :headers + attr_reader :namespace, :id, :name, :run_id, :attempt, :task_queue, :headers, :run_started_at, :memo - def initialize(name:, run_id:, attempt:, headers: {}) + def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:, run_started_at:, memo:) + @namespace = namespace + @id = id @name = name @run_id = run_id @attempt = attempt + @task_queue = task_queue @headers = headers + @run_started_at = run_started_at + @memo = memo freeze end @@ -20,9 +25,14 @@ def workflow? def to_h { + 'namespace' => namespace, + 'workflow_id' => id, 'workflow_name' => name, 'workflow_run_id' => run_id, - 'attempt' => attempt + 'attempt' => attempt, + 'task_queue' => task_queue, + 'run_started_at' => run_started_at.to_f, + 'memo' => memo, } end end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index 3642a7d3..5543452d 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -9,7 +9,7 @@ module Temporal module Testing class LocalWorkflowContext - attr_reader :metadata + attr_reader :metadata, :config def initialize(execution, workflow_id, run_id, disabled_releases, metadata, config = Temporal.configuration) @last_event_id = 0 @@ -165,9 +165,16 @@ def wait_for_all(*futures) return end - def wait_for(future) - # Point of communication - Fiber.yield while !future.finished? + def wait_for(*futures, &unblock_condition) + if futures.empty? && unblock_condition.nil? + raise 'You must pass either a future or an unblock condition block to wait_for' + end + + while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call) + Fiber.yield + end + + return end def now @@ -175,20 +182,20 @@ def now end def on_signal(&block) - raise NotImplementedError, 'not yet available for testing' + raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' end def cancel_activity(activity_id) - raise NotImplementedError, 'not yet available for testing' + raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on' end def cancel(target, cancelation_id) - raise NotImplementedError, 'not yet available for testing' + raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on' end private - attr_reader :execution, :run_id, :workflow_id, :disabled_releases, :config + attr_reader :execution, :run_id, :workflow_id, :disabled_releases def completed! @completed = true diff --git a/lib/temporal/testing/temporal_override.rb b/lib/temporal/testing/temporal_override.rb index de61b591..d44da24f 100644 --- a/lib/temporal/testing/temporal_override.rb +++ b/lib/temporal/testing/temporal_override.rb @@ -73,9 +73,15 @@ def start_locally(workflow, schedule, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? + # signals aren't supported at all, so let's prohibit start_workflow calls that try to signal + signal_name = options.delete(:signal_name) + signal_input = options.delete(:signal_input) + raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' if signal_name || signal_input + reuse_policy = options[:workflow_id_reuse_policy] || :allow_failed workflow_id = options[:workflow_id] || SecureRandom.uuid run_id = SecureRandom.uuid + memo = options[:memo] || {} if !allowed?(workflow_id, reuse_policy) raise Temporal::WorkflowExecutionAlreadyStartedFailure.new( @@ -89,7 +95,15 @@ def start_locally(workflow, schedule, *input, **args) execution_options = ExecutionOptions.new(workflow, options) metadata = Metadata::Workflow.new( - name: workflow_id, run_id: run_id, attempt: 1, headers: execution_options.headers + namespace: execution_options.namespace, + id: workflow_id, + name: execution_options.name, + run_id: run_id, + attempt: 1, + task_queue: execution_options.task_queue, + run_started_at: Time.now, + memo: memo, + headers: execution_options.headers ) context = Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, workflow.disabled_releases, metadata diff --git a/lib/temporal/testing/workflow_override.rb b/lib/temporal/testing/workflow_override.rb index a36e843e..9d43f953 100644 --- a/lib/temporal/testing/workflow_override.rb +++ b/lib/temporal/testing/workflow_override.rb @@ -28,7 +28,15 @@ def execute_locally(*input) run_id = SecureRandom.uuid execution = WorkflowExecution.new metadata = Temporal::Metadata::Workflow.new( - name: workflow_id, run_id: run_id, attempt: 1 + namespace: nil, + id: workflow_id, + name: name, # Workflow class name + run_id: run_id, + attempt: 1, + task_queue: 'unit-test-task-queue', + headers: {}, + run_started_at: Time.now, + memo: {}, ) context = Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, disabled_releases, metadata diff --git a/lib/temporal/workflow.rb b/lib/temporal/workflow.rb index 06bf2b80..3b5dcfe6 100644 --- a/lib/temporal/workflow.rb +++ b/lib/temporal/workflow.rb @@ -20,7 +20,7 @@ def self.execute_in_context(context, input) Temporal.logger.error("Workflow execution failed", context.metadata.to_h.merge(error: error.inspect)) Temporal.logger.debug(error.backtrace.join("\n")) - Temporal::ErrorHandler.handle(error, metadata: context.metadata) + Temporal::ErrorHandler.handle(error, context.config, metadata: context.metadata) context.fail(error) ensure diff --git a/lib/temporal/workflow/command.rb b/lib/temporal/workflow/command.rb index 0f0d6ed9..8297abe9 100644 --- a/lib/temporal/workflow/command.rb +++ b/lib/temporal/workflow/command.rb @@ -3,8 +3,8 @@ class Workflow module Command # TODO: Move these classes into their own directories under workflow/command/* ScheduleActivity = Struct.new(:activity_type, :activity_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true) - StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true) - ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, keyword_init: true) + StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, keyword_init: true) + ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, :memo, keyword_init: true) RequestActivityCancellation = Struct.new(:activity_id, keyword_init: true) RecordMarker = Struct.new(:name, :details, keyword_init: true) StartTimer = Struct.new(:timeout, :timer_id, keyword_init: true) diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index d4541840..d88d8f30 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -15,7 +15,7 @@ module Temporal class Workflow class Context - attr_reader :metadata + attr_reader :metadata, :config def initialize(state_manager, dispatcher, workflow_class, metadata, config) @state_manager = state_manager @@ -195,6 +195,13 @@ def continue_as_new(*input, **args) options = args.delete(:options) || {} input << args unless args.empty? + # If memo or headers are not overridden, use those from the current run + options_from_metadata = { + memo: metadata.memo, + headers: metadata.headers, + } + options = options_from_metadata.merge(options) + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) command = Command::ContinueAsNew.new( @@ -203,7 +210,8 @@ def continue_as_new(*input, **args) input: input, timeouts: execution_options.timeouts, retry_policy: execution_options.retry_policy, - headers: execution_options.headers + headers: execution_options.headers, + memo: execution_options.memo, ) schedule_command(command) completed! @@ -215,14 +223,54 @@ def wait_for_all(*futures) return end - def wait_for(future) + # Block workflow progress until any future is finished or any unblock_condition + # block evaluates to true. + def wait_for(*futures, &unblock_condition) + if futures.empty? && unblock_condition.nil? + raise 'You must pass either a future or an unblock condition block to wait_for' + end + fiber = Fiber.current + should_yield = false + blocked = true + + if futures.any? + if futures.any?(&:finished?) + blocked = false + else + should_yield = true + futures.each do |future| + dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do + if blocked && future.finished? + # Because this block can run for any dispatch, ensure the fiber is only + # resumed one time by checking if it's already been unblocked. + blocked = false + fiber.resume + end + end + end + end + end - dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do - fiber.resume if future.finished? + if blocked && unblock_condition + if unblock_condition.call + blocked = false + should_yield = false + else + should_yield = true + + dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do + # Because this block can run for any dispatch, ensure the fiber is only + # resumed one time by checking if it's already been unblocked. + if blocked && unblock_condition.call + blocked = false + fiber.resume + end + end + end end - Fiber.yield + Fiber.yield if should_yield return end @@ -258,7 +306,7 @@ def cancel(target, cancelation_id) private - attr_reader :state_manager, :dispatcher, :workflow_class, :config + attr_reader :state_manager, :dispatcher, :workflow_class def completed! @completed = true diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 55c581fb..2a768e54 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -2,6 +2,7 @@ module Temporal class Workflow class Dispatcher WILDCARD = '*'.freeze + TARGET_WILDCARD = '*'.freeze def initialize @handlers = Hash.new { |hash, key| hash[key] = [] } @@ -23,6 +24,7 @@ def dispatch(target, event_name, args = nil) def handlers_for(target, event_name) handlers[target] + .concat(handlers[TARGET_WILDCARD]) .select { |(name, _)| name == event_name || name == WILDCARD } .map(&:last) end diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index 67cad260..46b8e4cb 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -1,6 +1,10 @@ +require 'temporal/concerns/payloads' + module Temporal class Workflow - class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, keyword_init: true) + class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, :memo, keyword_init: true) + extend Concerns::Payloads + RUNNING_STATUS = :RUNNING COMPLETED_STATUS = :COMPLETED FAILED_STATUS = :FAILED @@ -38,6 +42,7 @@ def self.generate_from(response) close_time: response.close_time&.to_time, status: API_STATUS_MAP.fetch(response.status), history_length: response.history_length, + memo: self.from_payload_map(response.memo.fields), ).freeze end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index c81703cb..78feb61b 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -4,15 +4,21 @@ require 'temporal/workflow/state_manager' require 'temporal/workflow/context' require 'temporal/workflow/history/event_target' +require 'temporal/metadata' module Temporal class Workflow class Executor - def initialize(workflow_class, history, config) + # @param workflow_class [Class] + # @param history [Workflow::History] + # @param task_metadata [Metadata::WorkflowTask] + # @param config [Configuration] + def initialize(workflow_class, history, task_metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new @state_manager = StateManager.new(dispatcher) @history = history + @task_metadata = task_metadata @config = config end @@ -32,9 +38,10 @@ def run private - attr_reader :workflow_class, :dispatcher, :state_manager, :history, :config + attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config - def execute_workflow(input, metadata) + def execute_workflow(input, workflow_started_event) + metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) Fiber.new do diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb index a54dab55..8f605a01 100644 --- a/lib/temporal/workflow/history/event_target.rb +++ b/lib/temporal/workflow/history/event_target.rb @@ -19,7 +19,7 @@ class UnexpectedEventType < InternalError; end # NOTE: The order is important, first prefix match wins (will be a longer match) TARGET_TYPES = { - 'ACTIVITY_TASK_CANCEL' => CANCEL_ACTIVITY_REQUEST_TYPE, + 'ACTIVITY_TASK_CANCEL_REQUESTED' => CANCEL_ACTIVITY_REQUEST_TYPE, 'ACTIVITY_TASK' => ACTIVITY_TYPE, 'REQUEST_CANCEL_ACTIVITY_TASK' => CANCEL_ACTIVITY_REQUEST_TYPE, 'TIMER_CANCELED' => CANCEL_TIMER_REQUEST_TYPE, @@ -61,7 +61,7 @@ def initialize(id, type) end def ==(other) - id == other.id && type == other.type + self.class == other.class && id == other.id && type == other.type end def eql?(other) diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb index f312d0f9..c0e7b950 100644 --- a/lib/temporal/workflow/poller.rb +++ b/lib/temporal/workflow/poller.rb @@ -77,7 +77,7 @@ def poll_for_task connection.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue) rescue StandardError => error Temporal.logger.error("Unable to poll Workflow task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) - Temporal::ErrorHandler.handle(error) + Temporal::ErrorHandler.handle(error, config) nil end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 693f1305..ac4c1846 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -3,7 +3,6 @@ require 'temporal/workflow/command' require 'temporal/workflow/command_state_machine' require 'temporal/workflow/history/event_target' -require 'temporal/metadata' require 'temporal/concerns/payloads' require 'temporal/workflow/errors' @@ -106,7 +105,7 @@ def apply_event(event) History::EventTarget.workflow, 'started', from_payloads(event.attributes.input), - Metadata.generate(Metadata::WORKFLOW_TYPE, event.attributes) + event, ) when 'WORKFLOW_EXECUTION_COMPLETED' @@ -163,7 +162,7 @@ def apply_event(event) when 'ACTIVITY_TASK_CANCELED' state_machine.cancel - dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(target, 'failed', Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details))) when 'TIMER_STARTED' state_machine.start diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index c22e2c7d..63b4c76e 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -12,7 +12,7 @@ class TaskProcessor def initialize(task, namespace, workflow_lookup, middleware_chain, config) @task = task @namespace = namespace - @metadata = Metadata.generate(Metadata::WORKFLOW_TASK_TYPE, task, namespace) + @metadata = Metadata.generate_workflow_task_metadata(task, namespace) @task_token = task.task_token @workflow_name = task.workflow_type.name @workflow_class = workflow_lookup.find(workflow_name) @@ -32,7 +32,7 @@ def process history = fetch_full_history # TODO: For sticky workflows we need to cache the Executor instance - executor = Workflow::Executor.new(workflow_class, history, config) + executor = Workflow::Executor.new(workflow_class, history, metadata, config) commands = middleware_chain.invoke(metadata) do executor.run @@ -40,7 +40,7 @@ def process complete_task(commands) rescue StandardError => error - Temporal::ErrorHandler.handle(error, metadata: metadata) + Temporal::ErrorHandler.handle(error, config, metadata: metadata) fail_task(error) ensure @@ -110,7 +110,7 @@ def fail_task(error) rescue StandardError => error Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect)) - Temporal::ErrorHandler.handle(error, metadata: metadata) + Temporal::ErrorHandler.handle(error, config, metadata: metadata) end end end diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 0d7e9e48..9e8538eb 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -1,13 +1,24 @@ require 'securerandom' +class TestSerializer + extend Temporal::Concerns::Payloads +end + Fabricator(:api_history_event, from: Temporal::Api::History::V1::HistoryEvent) do event_id { 1 } event_time { Time.now } end Fabricator(:api_workflow_execution_started_event, from: :api_history_event) do + transient :headers event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED } - workflow_execution_started_event_attributes do + event_time { Time.now } + workflow_execution_started_event_attributes do |attrs| + header_fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| + h[field] = Temporal.configuration.converter.to_payload(value) + end + header = Temporal::Api::Common::V1::Header.new(fields: header_fields) + Temporal::Api::History::V1::WorkflowExecutionStartedEventAttributes.new( workflow_type: Fabricate(:api_workflow_type), task_queue: Fabricate(:api_task_queue), @@ -19,7 +30,7 @@ first_execution_run_id: SecureRandom.uuid, retry_policy: nil, attempt: 0, - header: Fabricate(:api_header) + header: header, ) end end @@ -115,6 +126,28 @@ end end +Fabricator(:api_activity_task_canceled_event, from: :api_history_event) do + event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCELED } + activity_task_canceled_event_attributes do |attrs| + Temporal::Api::History::V1::ActivityTaskCanceledEventAttributes.new( + details: TestSerializer.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), + scheduled_event_id: attrs[:event_id] - 2, + started_event_id: nil, + identity: 'test-worker@test-host' + ) + end +end + +Fabricator(:api_activity_task_cancel_requested_event, from: :api_history_event) do + event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED } + activity_task_cancel_requested_event_attributes do |attrs| + Temporal::Api::History::V1::ActivityTaskCancelRequestedEventAttributes.new( + scheduled_event_id: attrs[:event_id] - 1, + workflow_task_completed_event_id: attrs[:event_id] - 2, + ) + end +end + Fabricator(:api_timer_started_event, from: :api_history_event) do event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_TIMER_STARTED } timer_started_event_attributes do |attrs| diff --git a/spec/fabricators/grpc/memo_fabricator.rb b/spec/fabricators/grpc/memo_fabricator.rb new file mode 100644 index 00000000..6c9fd726 --- /dev/null +++ b/spec/fabricators/grpc/memo_fabricator.rb @@ -0,0 +1,7 @@ +Fabricator(:memo, from: Temporal::Api::Common::V1::Memo) do + fields do + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload).tap do |m| + m['foo'] = Temporal.configuration.converter.to_payload('bar') + end + end +end diff --git a/spec/fabricators/grpc/workflow_execution_info_fabricator.rb b/spec/fabricators/grpc/workflow_execution_info_fabricator.rb index 296bde87..4f1d577e 100644 --- a/spec/fabricators/grpc/workflow_execution_info_fabricator.rb +++ b/spec/fabricators/grpc/workflow_execution_info_fabricator.rb @@ -5,4 +5,5 @@ close_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } status { Temporal::Api::Enums::V1::WorkflowExecutionStatus::WORKFLOW_EXECUTION_STATUS_COMPLETED } history_length { rand(100) } + memo { Fabricate(:memo) } end diff --git a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb index a3e72609..0cc19e16 100644 --- a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb +++ b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb @@ -9,6 +9,7 @@ workflow_type { Fabricate(:api_workflow_type) } original_execution_run_id { SecureRandom.uuid } attempt 1 + task_queue { Fabricate(:api_task_queue) } header do |attrs| fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| h[field] = Temporal.configuration.converter.to_payload(value) diff --git a/spec/fabricators/workflow_metadata_fabricator.rb b/spec/fabricators/workflow_metadata_fabricator.rb index 5a609bb8..c32fd3e1 100644 --- a/spec/fabricators/workflow_metadata_fabricator.rb +++ b/spec/fabricators/workflow_metadata_fabricator.rb @@ -1,8 +1,13 @@ require 'securerandom' Fabricator(:workflow_metadata, from: :open_struct) do + namespace 'test-namespace' + id { SecureRandom.uuid } name 'TestWorkflow' run_id { SecureRandom.uuid } attempt 1 + task_queue { Fabricate(:api_task_queue) } + run_started_at { Time.now } + memo { {} } headers { {} } end diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 91e1eccf..4cc18a5e 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -11,10 +11,10 @@ Fabricate( :api_activity_task, activity_name: activity_name, - input: Temporal.configuration.converter.to_payloads(input) + input: config.converter.to_payloads(input) ) end - let(:metadata) { Temporal::Metadata.generate(Temporal::Metadata::ACTIVITY_TYPE, task) } + let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace) } let(:activity_name) { 'TestActivity' } let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:middleware_chain) { Temporal::Middleware::Chain.new } @@ -30,8 +30,8 @@ .with(config.for_connection) .and_return(connection) allow(Temporal::Metadata) - .to receive(:generate) - .with(Temporal::Metadata::ACTIVITY_TYPE, task, namespace) + .to receive(:generate_activity_metadata) + .with(task, namespace) .and_return(metadata) allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata).and_return(context) @@ -70,7 +70,7 @@ reported_error = nil reported_metadata = nil - Temporal.configuration.on_error do |error, metadata: nil| + config.on_error do |error, metadata: nil| reported_error = error reported_metadata = metadata.to_h end @@ -187,7 +187,7 @@ reported_error = nil reported_metadata = nil - Temporal.configuration.on_error do |error, metadata: nil| + config.on_error do |error, metadata: nil| reported_error = error reported_metadata = metadata end diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index 92ec094b..68e5076a 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -61,7 +61,8 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: nil, - headers: {} + headers: {}, + memo: {} ) end @@ -73,7 +74,8 @@ class TestStartWorkflow < Temporal::Workflow name: 'test-workflow', namespace: 'test-namespace', task_queue: 'test-task-queue', - headers: { 'Foo' => 'Bar' } + headers: { 'Foo' => 'Bar' }, + memo: { 'MemoKey1' => 'MemoValue1' } } ) @@ -89,7 +91,8 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: nil, - headers: { 'Foo' => 'Bar' } + headers: { 'Foo' => 'Bar' }, + memo: { 'MemoKey1' => 'MemoValue1' } ) end @@ -114,7 +117,8 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: nil, - headers: {} + headers: {}, + memo: {} ) end @@ -133,7 +137,8 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: nil, - headers: {} + headers: {}, + memo: {} ) end @@ -154,7 +159,8 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: :allow, - headers: {} + headers: {}, + memo: {} ) end end @@ -179,12 +185,91 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: nil, - headers: {} + headers: {}, + memo: {} ) end end end + describe '#start_workflow with a signal' do + let(:temporal_response) do + Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx') + end + + before { allow(connection).to receive(:signal_with_start_workflow_execution).and_return(temporal_response) } + + def expect_signal_with_start(expected_arguments, expected_signal_argument) + expect(connection) + .to have_received(:signal_with_start_workflow_execution) + .with( + namespace: 'default-test-namespace', + workflow_id: an_instance_of(String), + workflow_name: 'TestStartWorkflow', + task_queue: 'default-test-task-queue', + input: expected_arguments, + task_timeout: Temporal.configuration.timeouts[:task], + run_timeout: Temporal.configuration.timeouts[:run], + execution_timeout: Temporal.configuration.timeouts[:execution], + workflow_id_reuse_policy: nil, + headers: {}, + memo: {}, + signal_name: 'the question', + signal_input: expected_signal_argument, + ) + end + + it 'starts a workflow with a signal and no arguments' do + subject.start_workflow( + TestStartWorkflow, + options: { signal_name: 'the question' } + ) + + expect_signal_with_start([], nil) + end + + it 'starts a workflow with a signal and one scalar argument' do + signal_input = 'what do you get if you multiply six by nine?' + subject.start_workflow( + TestStartWorkflow, + 42, + options: { + signal_name: 'the question', + signal_input: signal_input, + } + ) + + expect_signal_with_start([42], signal_input) + end + + it 'starts a workflow with a signal and multiple arguments and signal_inputs' do + signal_input = ['what do you get', 'if you multiply six by nine?'] + subject.start_workflow( + TestStartWorkflow, + 42, + 43, + options: { + signal_name: 'the question', + # signals can't have multiple scalar args, but you can pass an array + signal_input: signal_input + } + ) + + expect_signal_with_start([42, 43], signal_input) + end + + it 'raises when signal_input is given but signal_name is not' do + expect do + subject.start_workflow( + TestStartWorkflow, + [42, 54], + [43, 55], + options: { signal_input: 'what do you get if you multiply six by nine?', } + ) + end.to raise_error(ArgumentError) + end + end + describe '#schedule_workflow' do let(:temporal_response) do Temporal::Api::WorkflowService::V1::StartWorkflowExecutionResponse.new(run_id: 'xxx') @@ -208,7 +293,8 @@ class TestStartWorkflow < Temporal::Workflow run_timeout: Temporal.configuration.timeouts[:run], execution_timeout: Temporal.configuration.timeouts[:execution], workflow_id_reuse_policy: nil, - headers: {} + memo: {}, + headers: {}, ) end end @@ -233,6 +319,17 @@ class TestStartWorkflow < Temporal::Workflow end end + describe '#describe_namespace' do + before { allow(connection).to receive(:describe_namespace).and_return(Temporal::Api::WorkflowService::V1::DescribeNamespaceResponse.new) } + + it 'passes the namespace to the connection' do + result = subject.describe_namespace('new-namespace') + + expect(connection) + .to have_received(:describe_namespace) + .with(name: 'new-namespace') + end + end describe '#signal_workflow' do before { allow(connection).to receive(:signal_workflow_execution).and_return(nil) } diff --git a/spec/unit/lib/temporal/configuration_spec.rb b/spec/unit/lib/temporal/configuration_spec.rb new file mode 100644 index 00000000..7e083429 --- /dev/null +++ b/spec/unit/lib/temporal/configuration_spec.rb @@ -0,0 +1,29 @@ +require 'temporal/configuration' + +describe Temporal::Configuration do + describe '#initialize' do + it 'initializes proper default workflow timeouts' do + timeouts = subject.timeouts + + # By default, we don't ever want to timeout workflows, because workflows "always succeed" and + # they may be long-running + expect(timeouts[:execution]).to be >= 86_400 * 365 * 10 + expect(timeouts[:run]).to eq(timeouts[:execution]) + expect(timeouts[:task]).to eq(10) + end + + it 'initializes proper default activity timeouts' do + timeouts = subject.timeouts + + # Schedule to start timeouts are dangerous because there is no retry. + # https://docs.temporal.io/blog/activity-timeouts/#schedule-to-start-timeout recommends to use them rarely + expect(timeouts[:schedule_to_start]).to be(nil) + # We keep retrying until the workflow times out, by default + expect(timeouts[:schedule_to_close]).to be(nil) + # Activity invocations should be short-lived by default so they can be retried relatively quickly + expect(timeouts[:start_to_close]).to eq(30) + # No heartbeating for a default (short-lived) activity + expect(timeouts[:heartbeat]).to be(nil) + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/temporal/connection/retryer.rb b/spec/unit/lib/temporal/connection/retryer_spec.rb similarity index 100% rename from spec/unit/lib/temporal/connection/retryer.rb rename to spec/unit/lib/temporal/connection/retryer_spec.rb diff --git a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb index fbb00623..18de4355 100644 --- a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb @@ -5,10 +5,12 @@ describe 'to_proto' do it 'produces a protobuf' do command = Temporal::Workflow::Command::ContinueAsNew.new( - workflow_type: 'Test', - task_queue: 'Test', + workflow_type: 'my-workflow-type', + task_queue: 'my-task-queue', input: ['one', 'two'], - timeouts: Temporal.configuration.timeouts + timeouts: Temporal.configuration.timeouts, + headers: {'foo-header': 'bar'}, + memo: {'foo-memo': 'baz'}, ) result = described_class.new(command).to_proto @@ -17,6 +19,18 @@ expect(result.command_type).to eql( :COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION ) + expect(result.continue_as_new_workflow_execution_command_attributes).not_to be_nil + attribs = result.continue_as_new_workflow_execution_command_attributes + + expect(attribs.workflow_type.name).to eq('my-workflow-type') + + expect(attribs.task_queue.name).to eq('my-task-queue') + + expect(attribs.input.payloads[0].data).to eq('"one"') + expect(attribs.input.payloads[1].data).to eq('"two"') + + expect(attribs.header.fields['foo-header'].data).to eq('"bar"') + expect(attribs.memo.fields['foo-memo'].data).to eq('"baz"') end end end diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_client_spec.rb index bcc0458d..a168c359 100644 --- a/spec/unit/lib/temporal/grpc_client_spec.rb +++ b/spec/unit/lib/temporal/grpc_client_spec.rb @@ -27,13 +27,51 @@ task_queue: 'test', execution_timeout: 0, run_timeout: 0, - task_timeout: 0 + task_timeout: 0, + memo: {} ) end.to raise_error(Temporal::WorkflowExecutionAlreadyStartedFailure) do |e| expect(e.run_id).to eql('baaf1d86-4459-4ecd-a288-47aeae55245d') end end end + + describe '#signal_with_start_workflow' do + let(:temporal_response) do + Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx') + end + + before { allow(grpc_stub).to receive(:signal_with_start_workflow_execution).and_return(temporal_response) } + + it 'starts a workflow with a signal with scalar arguments' do + subject.signal_with_start_workflow_execution( + namespace: namespace, + workflow_id: workflow_id, + workflow_name: 'workflow_name', + task_queue: 'task_queue', + input: ['foo'], + execution_timeout: 1, + run_timeout: 2, + task_timeout: 3, + signal_name: 'the question', + signal_input: 'what do you get if you multiply six by nine?' + ) + + expect(grpc_stub).to have_received(:signal_with_start_workflow_execution) do |request| + expect(request).to be_an_instance_of(Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest) + expect(request.namespace).to eq(namespace) + expect(request.workflow_id).to eq(workflow_id) + expect(request.workflow_type.name).to eq('workflow_name') + expect(request.task_queue.name).to eq('task_queue') + expect(request.input.payloads[0].data).to eq('"foo"') + expect(request.workflow_execution_timeout.seconds).to eq(1) + expect(request.workflow_run_timeout.seconds).to eq(2) + expect(request.workflow_task_timeout.seconds).to eq(3) + expect(request.signal_name).to eq('the question') + expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"') + end + end + end describe '#get_workflow_execution_history' do let(:response) do diff --git a/spec/unit/lib/temporal/metadata/workflow_spec.rb b/spec/unit/lib/temporal/metadata/workflow_spec.rb index 6bbe3af6..be3f50b9 100644 --- a/spec/unit/lib/temporal/metadata/workflow_spec.rb +++ b/spec/unit/lib/temporal/metadata/workflow_spec.rb @@ -6,6 +6,8 @@ let(:args) { Fabricate(:workflow_metadata) } it 'sets the attributes' do + expect(subject.namespace).to eq(args.namespace) + expect(subject.id).to eq(args.id) expect(subject.name).to eq(args.name) expect(subject.run_id).to eq(args.run_id) expect(subject.attempt).to eq(args.attempt) @@ -25,9 +27,14 @@ it 'returns a hash' do expect(subject.to_h).to eq({ + 'namespace' => subject.namespace, + 'workflow_id' => subject.id, 'attempt' => subject.attempt, 'workflow_name' => subject.name, - 'workflow_run_id' => subject.run_id + 'workflow_run_id' => subject.run_id, + 'task_queue' => subject.task_queue, + 'run_started_at' => subject.run_started_at.to_f, + 'memo' => subject.memo, }) end end diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index 18134e26..cd21fb76 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -1,80 +1,76 @@ require 'temporal/metadata' describe Temporal::Metadata do - describe '.generate' do - subject { described_class.generate(type, data, namespace) } + describe '.generate_activity_metadata' do + subject { described_class.generate_activity_metadata(data, namespace) } - context 'with activity type' do - let(:type) { described_class::ACTIVITY_TYPE } - let(:data) { Fabricate(:api_activity_task) } - let(:namespace) { 'test-namespace' } + let(:data) { Fabricate(:api_activity_task) } + let(:namespace) { 'test-namespace' } - it 'generates metadata' do - expect(subject.namespace).to eq(namespace) - expect(subject.id).to eq(data.activity_id) - expect(subject.name).to eq(data.activity_type.name) - expect(subject.task_token).to eq(data.task_token) - expect(subject.attempt).to eq(data.attempt) - expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id) - expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id) - expect(subject.workflow_name).to eq(data.workflow_type.name) - expect(subject.headers).to eq({}) - end - - context 'with headers' do - let(:data) { Fabricate(:api_activity_task, headers: { 'Foo' => 'Bar' }) } - - it 'assigns headers' do - expect(subject.headers).to eq('Foo' => 'Bar') - end - end + it 'generates metadata' do + expect(subject.namespace).to eq(namespace) + expect(subject.id).to eq(data.activity_id) + expect(subject.name).to eq(data.activity_type.name) + expect(subject.task_token).to eq(data.task_token) + expect(subject.attempt).to eq(data.attempt) + expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id) + expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id) + expect(subject.workflow_name).to eq(data.workflow_type.name) + expect(subject.headers).to eq({}) end - context 'with workflow task type' do - let(:type) { described_class::WORKFLOW_TASK_TYPE } - let(:data) { Fabricate(:api_workflow_task) } - let(:namespace) { 'test-namespace' } + context 'with headers' do + let(:data) { Fabricate(:api_activity_task, headers: { 'Foo' => 'Bar' }) } - it 'generates metadata' do - expect(subject.namespace).to eq(namespace) - expect(subject.id).to eq(data.started_event_id) - expect(subject.task_token).to eq(data.task_token) - expect(subject.attempt).to eq(data.attempt) - expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id) - expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id) - expect(subject.workflow_name).to eq(data.workflow_type.name) + it 'assigns headers' do + expect(subject.headers).to eq('Foo' => 'Bar') end end + end - context 'with workflow type' do - let(:type) { described_class::WORKFLOW_TYPE } - let(:data) { Fabricate(:api_workflow_execution_started_event_attributes) } - let(:namespace) { nil } + describe '.generate_workflow_task_metadata' do + subject { described_class.generate_workflow_task_metadata(data, namespace) } - it 'generates metadata' do - expect(subject.run_id).to eq(data.original_execution_run_id) - expect(subject.attempt).to eq(data.attempt) - expect(subject.headers).to eq({}) - end + let(:data) { Fabricate(:api_workflow_task) } + let(:namespace) { 'test-namespace' } - context 'with headers' do - let(:data) do - Fabricate(:api_workflow_execution_started_event_attributes, headers: { 'Foo' => 'Bar' }) - end + it 'generates metadata' do + expect(subject.namespace).to eq(namespace) + expect(subject.id).to eq(data.started_event_id) + expect(subject.task_token).to eq(data.task_token) + expect(subject.attempt).to eq(data.attempt) + expect(subject.workflow_run_id).to eq(data.workflow_execution.run_id) + expect(subject.workflow_id).to eq(data.workflow_execution.workflow_id) + expect(subject.workflow_name).to eq(data.workflow_type.name) + end + end - it 'assigns headers' do - expect(subject.headers).to eq('Foo' => 'Bar') - end - end + context '.generate_workflow_metadata' do + subject { described_class.generate_workflow_metadata(event, task_metadata) } + let(:event) { Temporal::Workflow::History::Event.new(Fabricate(:api_workflow_execution_started_event)) } + let(:task_metadata) { Fabricate(:workflow_task_metadata) } + let(:namespace) { nil } + + it 'generates metadata' do + expect(subject.run_id).to eq(event.attributes.original_execution_run_id) + expect(subject.id).to eq(task_metadata.workflow_id) + expect(subject.attempt).to eq(event.attributes.attempt) + expect(subject.headers).to eq({}) + expect(subject.memo).to eq({}) + expect(subject.namespace).to eq(task_metadata.namespace) + expect(subject.task_queue).to eq(event.attributes.task_queue.name) + expect(subject.run_started_at).to eq(event.timestamp) end - context 'with unknown type' do - let(:type) { :unknown } - let(:data) { nil } - let(:namespace) { nil } + context 'with headers' do + let(:event) do + Temporal::Workflow::History::Event.new( + Fabricate(:api_workflow_execution_started_event, headers: { 'Foo' => 'Bar' }) + ) + end - it 'raises' do - expect { subject }.to raise_error(Temporal::InternalError, 'Unsupported metadata type') + it 'assigns headers' do + expect(subject.headers).to eq('Foo' => 'Bar') end end end diff --git a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb index 29b4b2a3..9397ef88 100644 --- a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb +++ b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb @@ -6,13 +6,24 @@ let(:workflow_id) { 'workflow_id_1' } let(:run_id) { 'run_id_1' } let(:execution) { Temporal::Testing::WorkflowExecution.new } + let(:task_queue) { 'my_test_queue' } let(:workflow_context) do Temporal::Testing::LocalWorkflowContext.new( execution, workflow_id, run_id, [], - Temporal::Metadata::Workflow.new(name: workflow_id, run_id: run_id, attempt: 1) + Temporal::Metadata::Workflow.new( + namespace: 'ruby-samples', + id: workflow_id, + name: 'HelloWorldWorkflow', + run_id: run_id, + attempt: 1, + task_queue: task_queue, + headers: {}, + run_started_at: Time.now, + memo: {}, + ) ) end let(:async_token) do @@ -120,10 +131,69 @@ def execute result = workflow_context.execute_activity!(TestActivity) expect(result).to eq('ok') end + + it 'can heartbeat' do + # Heartbeat doesn't do anything in local mode, but at least it can be called. + workflow_context.execute_activity!(TestHeartbeatingActivity) + end end - it 'can heartbeat' do - # Heartbeat doesn't do anything in local mode, but at least it can be called. - workflow_context.execute_activity!(TestHeartbeatingActivity) + describe '#wait_for' do + it 'await unblocks once condition changes' do + can_continue = false + exited = false + fiber = Fiber.new do + workflow_context.wait_for do + can_continue + end + + exited = true + end + + fiber.resume # start running + expect(exited).to eq(false) + + can_continue = true # change condition + fiber.resume # resume running after the Fiber.yield done in context.await + expect(exited).to eq(true) + end + + it 'condition or future unblocks' do + exited = false + + future = workflow_context.execute_activity(TestAsyncActivity) + + fiber = Fiber.new do + workflow_context.wait_for(future) do + false + end + + exited = true + end + + fiber.resume # start running + expect(exited).to eq(false) + + execution.complete_activity(async_token, 'async_ok') + + fiber.resume # resume running after the Fiber.yield done in context.await + expect(exited).to eq(true) + end + + it 'any future unblocks' do + exited = false + + async_future = workflow_context.execute_activity(TestAsyncActivity) + future = workflow_context.execute_activity(TestActivity) + future.wait + + fiber = Fiber.new do + workflow_context.wait_for(future, async_future) + exited = true + end + + fiber.resume # start running + expect(exited).to eq(true) + end end end diff --git a/spec/unit/lib/temporal/testing/temporal_override_spec.rb b/spec/unit/lib/temporal/testing/temporal_override_spec.rb index 09e4eeee..42d72a9c 100644 --- a/spec/unit/lib/temporal/testing/temporal_override_spec.rb +++ b/spec/unit/lib/temporal/testing/temporal_override_spec.rb @@ -136,6 +136,14 @@ def execute .with(an_instance_of(Temporal::Testing::LocalWorkflowContext)) end + it 'explicitly does not support staring a workflow with a signal' do + expect { + client.start_workflow(TestTemporalOverrideWorkflow, options: { signal_name: 'breakme' }) + }.to raise_error(NotImplementedError) do |e| + expect(e.message).to eql("Signals are not available when Temporal::Testing.local! is on") + end + end + describe 'execution control' do subject do client.start_workflow( diff --git a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb index d5e008f8..43ccc8fc 100644 --- a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb +++ b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb @@ -1,13 +1,17 @@ require 'temporal/workflow/dispatcher' +require 'temporal/workflow/history/event_target' describe Temporal::Workflow::Dispatcher do + let(:target) { Temporal::Workflow::History::EventTarget.new(1, Temporal::Workflow::History::EventTarget::ACTIVITY_TYPE) } + let(:other_target) { Temporal::Workflow::History::EventTarget.new(2, Temporal::Workflow::History::EventTarget::TIMER_TYPE) } + describe '#register_handler' do it 'stores a given handler against the target' do block = -> { 'handler body' } - subject.register_handler('target', 'signaled', &block) + subject.register_handler(target, 'signaled', &block) - expect(subject.send(:handlers)).to include('target' => [['signaled', block]]) + expect(subject.send(:handlers)).to include(target => [['signaled', block]]) end end @@ -22,14 +26,14 @@ allow(handler).to receive(:call) end - subject.register_handler('target', 'completed', &handler_1) - subject.register_handler('other_target', 'completed', &handler_2) - subject.register_handler('target', 'failed', &handler_3) - subject.register_handler('target', 'completed', &handler_4) + subject.register_handler(target, 'completed', &handler_1) + subject.register_handler(other_target, 'completed', &handler_2) + subject.register_handler(target, 'failed', &handler_3) + subject.register_handler(target, 'completed', &handler_4) end it 'calls all matching handlers in the original order' do - subject.dispatch('target', 'completed') + subject.dispatch(target, 'completed') expect(handler_1).to have_received(:call).ordered expect(handler_4).to have_received(:call).ordered @@ -39,7 +43,7 @@ end it 'passes given arguments to the handlers' do - subject.dispatch('target', 'failed', ['TIME_OUT', 'Exceeded execution time']) + subject.dispatch(target, 'failed', ['TIME_OUT', 'Exceeded execution time']) expect(handler_3).to have_received(:call).with('TIME_OUT', 'Exceeded execution time') @@ -54,15 +58,36 @@ before do allow(handler_5).to receive(:call) - subject.register_handler('target', described_class::WILDCARD, &handler_5) + subject.register_handler(target, described_class::WILDCARD, &handler_5) end it 'calls the handler' do - subject.dispatch('target', 'completed') + subject.dispatch(target, 'completed') expect(handler_5).to have_received(:call) end + end + + context 'with TARGET_WILDCARD target handler' do + let(:handler_6) { -> { 'sixth block' } } + before do + allow(handler_6).to receive(:call) + + subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6) + end + it 'calls the handler' do + subject.dispatch(target, 'completed') + + # Target handlers still invoked + expect(handler_1).to have_received(:call).ordered + expect(handler_4).to have_received(:call).ordered + expect(handler_6).to have_received(:call).ordered + end + + it 'TARGET_WILDCARD can be compared to an EventTarget object' do + expect(target.eql?(described_class::TARGET_WILDCARD)).to be(false) + end end end end diff --git a/spec/unit/lib/temporal/workflow/execution_info_spec.rb b/spec/unit/lib/temporal/workflow/execution_info_spec.rb index fbac5ee0..a064e8ba 100644 --- a/spec/unit/lib/temporal/workflow/execution_info_spec.rb +++ b/spec/unit/lib/temporal/workflow/execution_info_spec.rb @@ -14,6 +14,7 @@ expect(subject.close_time).to be_a(Time) expect(subject.status).to eq(:COMPLETED) expect(subject.history_length).to eq(api_info.history_length) + expect(subject.memo).to eq({ 'foo' => 'bar' }) end it 'freezes the info' do diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb new file mode 100644 index 00000000..a74c3387 --- /dev/null +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -0,0 +1,81 @@ +require 'temporal/workflow/executor' +require 'temporal/workflow/history' +require 'temporal/workflow' + +describe Temporal::Workflow::Executor do + subject { described_class.new(workflow, history, workflow_metadata, config) } + + let(:workflow_started_event) { Fabricate(:api_workflow_execution_started_event, event_id: 1) } + let(:history) do + Temporal::Workflow::History.new([ + workflow_started_event, + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4) + ]) + end + let(:workflow) { TestWorkflow } + let(:workflow_metadata) { Fabricate(:workflow_metadata) } + let(:config) { Temporal::Configuration.new } + + class TestWorkflow < Temporal::Workflow + def execute + 'test' + end + end + + describe '#run' do + it 'runs a workflow' do + allow(workflow).to receive(:execute_in_context).and_call_original + + subject.run + + expect(workflow) + .to have_received(:execute_in_context) + .with( + an_instance_of(Temporal::Workflow::Context), + nil + ) + end + + it 'returns a complete workflow decision' do + decisions = subject.run + + expect(decisions.length).to eq(1) + + decision_id, decision = decisions.first + expect(decision_id).to eq(history.events.length + 1) + expect(decision).to be_an_instance_of(Temporal::Workflow::Command::CompleteWorkflow) + expect(decision.result).to eq('test') + end + + it 'generates workflow metadata' do + allow(Temporal::Metadata::Workflow).to receive(:new).and_call_original + payload = Temporal::Api::Common::V1::Payload.new( + metadata: { 'encoding' => 'json/plain' }, + data: '"bar"'.b + ) + header = + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload, { 'Foo' => payload }) + workflow_started_event.workflow_execution_started_event_attributes.header = + Fabricate(:api_header, fields: header) + + subject.run + + event_attributes = workflow_started_event.workflow_execution_started_event_attributes + expect(Temporal::Metadata::Workflow) + .to have_received(:new) + .with( + namespace: workflow_metadata.namespace, + id: workflow_metadata.workflow_id, + name: event_attributes.workflow_type.name, + run_id: event_attributes.original_execution_run_id, + attempt: event_attributes.attempt, + task_queue: event_attributes.task_queue.name, + run_started_at: workflow_started_event.event_time.to_time, + memo: {}, + headers: {'Foo' => 'bar'} + ) + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/temporal/workflow/history/event_target_spec.rb b/spec/unit/lib/temporal/workflow/history/event_target_spec.rb index 717c4572..2f2c80bd 100644 --- a/spec/unit/lib/temporal/workflow/history/event_target_spec.rb +++ b/spec/unit/lib/temporal/workflow/history/event_target_spec.rb @@ -21,5 +21,21 @@ expect(subject.type).to eq(described_class::CANCEL_TIMER_REQUEST_TYPE) end end + + context 'when event is ACTIVITY_CANCELED' do + let(:raw_event) { Fabricate(:api_activity_task_canceled_event) } + + it 'sets type to activity' do + expect(subject.type).to eq(described_class::ACTIVITY_TYPE) + end + end + + context 'when event is ACTIVITY_TASK_CANCEL_REQUESTED' do + let(:raw_event) { Fabricate(:api_activity_task_cancel_requested_event) } + + it 'sets type to cancel_activity_request' do + expect(subject.type).to eq(described_class::CANCEL_ACTIVITY_REQUEST_TYPE) + end + end end end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index d1537bcc..695b3da6 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -44,7 +44,7 @@ reported_error = nil reported_metadata = nil - Temporal.configuration.on_error do |error, metadata: nil| + config.on_error do |error, metadata: nil| reported_error = error reported_metadata = metadata end @@ -154,7 +154,7 @@ reported_error = nil reported_metadata = nil - Temporal.configuration.on_error do |error, metadata: nil| + config.on_error do |error, metadata: nil| reported_error = error reported_metadata = metadata end diff --git a/spec/unit/lib/temporal_spec.rb b/spec/unit/lib/temporal_spec.rb index 0d9daefd..47ccd73d 100644 --- a/spec/unit/lib/temporal_spec.rb +++ b/spec/unit/lib/temporal_spec.rb @@ -28,6 +28,10 @@ describe '.register_namespace' do it_behaves_like 'a forwarded method', :register_namespace, 'test-namespace', 'This is a test namespace' end + + describe '.describe_namespace' do + it_behaves_like 'a forwarded method', :describe_namespace, 'test-namespace' + end describe '.signal_workflow' do it_behaves_like 'a forwarded method', :signal_workflow, 'TestWorkflow', 'TST_SIGNAL', 'x', 'y' diff --git a/temporal.gemspec b/temporal.gemspec index 59ef1192..c8589c9e 100644 --- a/temporal.gemspec +++ b/temporal.gemspec @@ -21,4 +21,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec' spec.add_development_dependency 'fabrication' spec.add_development_dependency 'grpc-tools' + spec.add_development_dependency 'yard' end