Skip to content

Commit

Permalink
fix: Use Standard Interface for Metrics Tags
Browse files Browse the repository at this point in the history
Convert keyword argument into hashes in order to fulfill Temporal::Metrics API contract obligations.

Fixes: #90
Signed-off-by: Progyan Bhattacharya <[email protected]>
  • Loading branch information
0xTheProDev committed May 20, 2023
1 parent 41a8e92 commit a4a65c1
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 35 deletions.
2 changes: 1 addition & 1 deletion lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def poll_for_task
def process(task)
middleware_chain = Middleware::Chain.new(middleware)

TaskProcessor.new(task, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool).process
TaskProcessor.new(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool).process
end

def poll_retry_seconds
Expand Down
18 changes: 14 additions & 4 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_activity_metadata(task, namespace)
@task_token = task.task_token
Expand All @@ -28,7 +29,7 @@ def process
start_time = Time.now

Temporal.logger.debug("Processing Activity task", metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, queue_time_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name)
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME, queue_time_ms, metric_tags)

context = Activity::Context.new(connection, metadata, config, heartbeat_thread_pool)

Expand All @@ -52,13 +53,22 @@ def process
end

time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, time_diff_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name)
Temporal.metrics.timing(Temporal::MetricKeys::ACTIVITY_TASK_LATENCY, time_diff_ms, metric_tags)
Temporal.logger.debug("Activity task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

def metric_tags
{
activity: activity_name,
namespace: namespace,
task_queue: task_queue,
workflow: metadata.workflow_name
}
end

private

attr_reader :task, :namespace, :task_token, :activity_name, :activity_class,
attr_reader :task, :task_queue, :namespace, :task_token, :activity_name, :activity_class,
:middleware_chain, :metadata, :config, :heartbeat_thread_pool

def connection
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def process(task)
middleware_chain = Middleware::Chain.new(middleware)
workflow_middleware_chain = Middleware::Chain.new(workflow_middleware)

TaskProcessor.new(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum).process
TaskProcessor.new(task, task_queue, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum).process
end

def thread_pool
Expand Down
19 changes: 14 additions & 5 deletions lib/temporal/workflow/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def query_args
MAX_FAILED_ATTEMPTS = 1
LEGACY_QUERY_KEY = :legacy_query

def initialize(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
def initialize(task, task_queue, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_workflow_task_metadata(task, namespace)
@task_token = task.task_token
Expand All @@ -41,7 +42,7 @@ def process
start_time = Time.now

Temporal.logger.debug("Processing Workflow task", metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, workflow: workflow_name, namespace: namespace)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, metric_tags)

if !workflow_class
raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker'
Expand Down Expand Up @@ -73,13 +74,21 @@ def process
fail_task(error)
ensure
time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, workflow: workflow_name, namespace: namespace)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, metric_tags)
Temporal.logger.debug("Workflow task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

def metric_tags
{
workflow: workflow_name,
namespace: namespace,
task_queue: task_queue
}
end

private

attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class,
attr_reader :task, :task_queue, :namespace, :task_token, :workflow_name, :workflow_class,
:middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum

def connection
Expand Down Expand Up @@ -152,7 +161,7 @@ def complete_query(result)
end

def fail_task(error)
Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, workflow: workflow_name, namespace: namespace)
Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, metric_tags)
Temporal.logger.error('Workflow task failed', metadata.to_h.merge(error: error.inspect))
Temporal.logger.debug(error.backtrace.join("\n"))

Expand Down
4 changes: 2 additions & 2 deletions spec/unit/lib/temporal/activity/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def poll(task, times: 1)

expect(Temporal::Activity::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
.with(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
expect(task_processor).to have_received(:process)
end

Expand Down Expand Up @@ -143,7 +143,7 @@ def call(_); end
expect(Temporal::Middleware::Chain).to have_received(:new).with(middleware)
expect(Temporal::Activity::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
.with(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool)
end
end
end
Expand Down
21 changes: 14 additions & 7 deletions spec/unit/lib/temporal/activity/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
require 'temporal/scheduled_thread_pool'

describe Temporal::Activity::TaskProcessor do
subject { described_class.new(task, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) }
subject { described_class.new(task, task_queue, namespace, lookup, middleware_chain, config, heartbeat_thread_pool) }

let(:namespace) { 'test-namespace' }
let(:task_queue) { 'test-queue' }
let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) }
let(:task) do
Fabricate(
Expand Down Expand Up @@ -143,9 +144,11 @@
.with(
Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME,
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
workflow: workflow_name
hash_including({
activity: activity_name,
namespace: namespace,
workflow: workflow_name
})
)
end

Expand All @@ -159,6 +162,7 @@
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
task_queue: task_queue,
workflow: workflow_name
)
end
Expand Down Expand Up @@ -234,9 +238,11 @@
.with(
Temporal::MetricKeys::ACTIVITY_TASK_QUEUE_TIME,
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
workflow: workflow_name
hash_including({
activity: activity_name,
namespace: namespace,
workflow: workflow_name
})
)
end

Expand All @@ -250,6 +256,7 @@
an_instance_of(Integer),
activity: activity_name,
namespace: namespace,
task_queue: task_queue,
workflow: workflow_name
)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/unit/lib/temporal/workflow/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def poll(task, times: 1)

expect(Temporal::Workflow::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, empty_middleware_chain, empty_middleware_chain, config, binary_checksum)
.with(task, task_queue, namespace, lookup, empty_middleware_chain, empty_middleware_chain, config, binary_checksum)
expect(task_processor).to have_received(:process)
end

Expand Down Expand Up @@ -151,7 +151,7 @@ def call(_); end
expect(Temporal::Middleware::Chain).to have_received(:new).with(workflow_middleware)
expect(Temporal::Workflow::TaskProcessor)
.to have_received(:new)
.with(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
.with(task, task_queue, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum)
end
end
end
Expand Down
39 changes: 26 additions & 13 deletions spec/unit/lib/temporal/workflow/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
require 'temporal/workflow/task_processor'

describe Temporal::Workflow::TaskProcessor do
subject { described_class.new(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) }
subject { described_class.new(task, task_queue, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) }

let(:namespace) { 'test-namespace' }
let(:task_queue) { 'test-queue' }
let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) }
let(:query) { nil }
let(:queries) { nil }
Expand Down Expand Up @@ -69,8 +70,10 @@
.to have_received(:increment)
.with(
Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED,
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down Expand Up @@ -187,8 +190,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end

Expand All @@ -200,8 +205,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_LATENCY,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down Expand Up @@ -235,8 +242,10 @@
.to have_received(:increment)
.with(
Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED,
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down Expand Up @@ -296,8 +305,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end

Expand All @@ -309,8 +320,10 @@
.with(
Temporal::MetricKeys::WORKFLOW_TASK_LATENCY,
an_instance_of(Integer),
workflow: workflow_name,
namespace: namespace
hash_including({
workflow: workflow_name,
namespace: namespace
})
)
end
end
Expand Down

0 comments on commit a4a65c1

Please sign in to comment.