Skip to content

Commit

Permalink
Merge pull request coinbase#103 from stripe-private-oss-forks/jeffsch…
Browse files Browse the repository at this point in the history
…oner/idle-metrics

Thread pool and poller metrics
  • Loading branch information
jeffschoner-stripe authored and GitHub Enterprise committed Oct 20, 2022
2 parents 85c6bae + 70efdb4 commit 88eb156
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
8 changes: 7 additions & 1 deletion lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ def poll_loop

task = poll_for_task
last_poll_time = Time.now

Temporal.metrics.increment(
Temporal::MetricKeys::ACTIVITY_POLLER_POLL_COMPLETED,
metrics_tags.merge(received_task: !task.nil?.to_s)
)

next unless task&.activity_type

thread_pool.schedule { process(task) }
Expand All @@ -98,7 +104,7 @@ def process(task)
end

def thread_pool
@thread_pool ||= ThreadPool.new(options[:thread_pool_size])
@thread_pool ||= ThreadPool.new(options[:thread_pool_size], { pool_name: 'activity_task_poller' })
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/metric_keys.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
module Temporal
class MetricKeys
ACTIVITY_POLLER_TIME_SINCE_LAST_POLL = 'activity_poller.time_since_last_poll'.freeze
ACTIVITY_POLLER_POLL_COMPLETED = 'activity_poller.poll_completed'.freeze
ACTIVITY_TASK_QUEUE_TIME = 'activity_task.queue_time'.freeze
ACTIVITY_TASK_LATENCY = 'activity_task.latency'.freeze

WORKFLOW_POLLER_TIME_SINCE_LAST_POLL = 'workflow_poller.time_since_last_poll'.freeze
WORKFLOW_POLLER_POLL_COMPLETED = 'workflow_poller.poll_completed'.freeze
WORKFLOW_TASK_QUEUE_TIME = 'workflow_task.queue_time'.freeze
WORKFLOW_TASK_LATENCY = 'workflow_task.latency'.freeze
WORKFLOW_TASK_EXECUTION_FAILED = 'workflow_task.execution_failed'.freeze

THREAD_POOL_AVAILABLE_THREADS = 'thread_pool.available_threads'.freeze
THREAD_POOL_QUEUE_SIZE = 'thread_pool.queue_size'.freeze
end
end
12 changes: 11 additions & 1 deletion lib/temporal/thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ module Temporal
class ThreadPool
attr_reader :size

def initialize(size)
def initialize(size, metrics_tags)
@size = size
@metrics_tags = metrics_tags
@queue = Queue.new
@mutex = Mutex.new
@availability = ConditionVariable.new
Expand All @@ -20,6 +21,11 @@ def initialize(size)
end
end

def report_metrics
Temporal.metrics.gauge(Temporal::MetricKeys::THREAD_POOL_AVAILABLE_THREADS, @available_threads, @metrics_tags)
Temporal.metrics.gauge(Temporal::MetricKeys::THREAD_POOL_QUEUE_SIZE, @queue.size, @metrics_tags)
end

def wait_for_available_threads
@mutex.synchronize do
while @available_threads <= 0
Expand All @@ -33,6 +39,8 @@ def schedule(&block)
@available_threads -= 1
@queue << block
end

report_metrics
end

def shutdown
Expand All @@ -56,6 +64,8 @@ def poll
@available_threads += 1
@availability.signal
end

report_metrics
end
end
end
Expand Down
8 changes: 7 additions & 1 deletion lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ def poll_loop

task = poll_for_task
last_poll_time = Time.now

Temporal.metrics.increment(
Temporal::MetricKeys::WORKFLOW_POLLER_POLL_COMPLETED,
metrics_tags.merge(received_task: !task.nil?.to_s)
)

next unless task&.workflow_type

thread_pool.schedule { process(task) }
Expand All @@ -98,7 +104,7 @@ def process(task)
end

def thread_pool
@thread_pool ||= ThreadPool.new(options[:thread_pool_size])
@thread_pool ||= ThreadPool.new(options[:thread_pool_size], { pool_name: 'workflow_task_poller' })
end

def binary_checksum
Expand Down

0 comments on commit 88eb156

Please sign in to comment.