Skip to content

Commit

Permalink
Update queues to be latency based and add monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
andersonkrs committed Oct 12, 2024
1 parent b686c14 commit 5dff484
Show file tree
Hide file tree
Showing 17 changed files with 45 additions and 119 deletions.
16 changes: 4 additions & 12 deletions app/controllers/health_check_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@ class HealthCheckController < ApplicationController
)

def index
queues = SolidQueue::Helper.queues_from_config
active_workers = SolidQueue::Process
.where(kind: "Worker")
.where("last_heartbeat_at >= ?", SolidQueue.process_alive_threshold.ago)

queues.each do |queue|
unless SolidQueue::Helper.active_processes_for_queue?(queue_name: queue)
render json: { message: "No workers active for queue #{queue}" }, status: :service_unavailable
return
end

if SolidQueue::Helper.stuck_executions_for_queue?(queue_name: queue, threshold: 3.hours)
render json: { message: "Stuck executions on #{queue}" }, status: :service_unavailable
return
end
end
return head :service_unavailable if active_workers.none?

head :ok
end
Expand Down
23 changes: 23 additions & 0 deletions app/controllers/latency_check_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
class LatencyCheckController < ApplicationController
http_basic_authenticate_with(
name: Rails.application.credentials.health_check[:username],
password: Rails.application.credentials.health_check[:password]
)

def show
queue = SolidQueue::Queue.find_by_name(params.require(:queue))

if queue.latency > threshold
response = { queue: queue.name, latency: ActiveSupport::Duration.build(queue.latency.to_i).inspect }
render json: response, status: :service_unavailable
else
head :ok
end
end

private

def threshold
params[:threshold_in_seconds].to_i.seconds
end
end
7 changes: 0 additions & 7 deletions app/jobs/cron_job.rb

This file was deleted.

15 changes: 0 additions & 15 deletions app/jobs/rake_job.rb

This file was deleted.

13 changes: 0 additions & 13 deletions app/jobs/user/crawler_pipeline_job.rb

This file was deleted.

2 changes: 1 addition & 1 deletion app/jobs/user/periodic_mal_sync_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class User::PeriodicMALSyncJob < ApplicationJob
queue_as :low
queue_as :within_10_hours

uniqueness_control key: -> { _1.id }, expires_in: 48.hours

Expand Down
2 changes: 1 addition & 1 deletion app/jobs/user/schedule_periodic_mal_sync_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class User::SchedulePeriodicMALSyncJob < ApplicationJob
queue_as :default
queue_as :within_3_minutes

def perform
User.eligible_for_mal_sync.find_each do |user|
Expand Down
15 changes: 0 additions & 15 deletions app/jobs/user/update_geolocation_job.rb

This file was deleted.

3 changes: 2 additions & 1 deletion app/models/concerns/purgeable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def purge_due_later

class PurgeDueLaterJob < ApplicationJob
limits_concurrency to: 1, key: ->(klass) { klass }
queue_as :logging

queue_as :within_3_minutes

retry_on ActiveRecord::StatementInvalid, attempts: 3, wait: :polynomially_longer

Expand Down
2 changes: 1 addition & 1 deletion app/models/user/calendar_images.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class GenerateJob < ApplicationJob

limits_concurrency to: 2, key: :screenshots, duration: 2.hours

queue_as :screenshots
queue_as :within_3_minutes

def perform(user)
user.calendar_images.generate
Expand Down
2 changes: 0 additions & 2 deletions app/models/user/geolocatable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ def geocode_async
end

class FetchGeolocationJob < ApplicationJob
queue_as :default

def perform(user)
user.geocode
user.save!(validate: false)
Expand Down
2 changes: 1 addition & 1 deletion app/models/user/mal_syncable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def eligible_for_mal_sync?
end

def enqueue_immediate_sync
User::PeriodicMALSyncJob.set(queue: :default).perform_later(self)
User::PeriodicMALSyncJob.set(queue: :within_30_seconds, priority: 0).perform_later(self)
end
end
end
14 changes: 8 additions & 6 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ class Application < Rails::Application
config.geocoder = config_for(:geocoder)
config.mal_api = config_for(:mal_api)

config.active_job.queue_adapter = :solid_queue
config.active_job.queue_name = :within_30_seconds
config.solid_queue.connects_to = { database: { writing: :queue } }

config.active_storage.service = :local
config.active_storage.variant_processor = :vips
config.active_storage.queues.analysis = :active_storage
config.active_storage.queues.purge = :active_storage
config.active_storage.queues.mirror = :active_storage
config.active_storage.queues.transform = :active_storage

config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }
config.active_storage.queues.analysis = :within_3_minutes
config.active_storage.queues.purge = :within_3_minutes
config.active_storage.queues.mirror = :within_3_minutes
config.active_storage.queues.transform = :within_3_minutes

config.mission_control.jobs.base_controller_class = "AdminController"

Expand Down
5 changes: 0 additions & 5 deletions config/database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ sqlite_defaults: &sqlite_defaults
# Solid queue takes two queries by default + number of queue threads
pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 10 } %>
timeout: 5000
default_transaction_mode: immediate
pragmas:
mmap_size: 64000000

development:
primary:
Expand Down Expand Up @@ -40,8 +37,6 @@ production:
<<: *sqlite_defaults
database: "storage/malheatmap_primary_production.sqlite3"
migrations_paths: "db/migrate"
pragmas:
mmap_size: 256000000
ops:
<<: *sqlite_defaults
database: "storage/malheatmap_ops_production.sqlite3"
Expand Down
6 changes: 3 additions & 3 deletions config/queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ default: &default
- polling_interval: 1
batch_size: 500
workers:
- queues: [default, active_storage, logging, screenshots, solid_queue_recurring]
- queues: [within_30_seconds, within_3_minutes, solid_queue_recurring]
threads: 5
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.5
- queues: [low]
- queues: [within_10_hours]
threads: 1
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.5
polling_interval: 2

development:
<<: *default
Expand Down
1 change: 1 addition & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
match "/500" => "application#internal_error", :via => :all, :as => "internal_error"
get "/health-check" => "health_check#index", :as => "health_check"
get "/up" => "monitoring#show"
get "/latency-check/:queue" => "latency_check#show", as: "latency_check"

mount MissionControl::Jobs::Engine, at: "/jobs"
mount SolidErrors::Engine, at: "/errors"
Expand Down
36 changes: 0 additions & 36 deletions test/controllers/health_check_controller_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,4 @@ class HealthCheckControllerTest < ActionDispatch::IntegrationTest

assert_response :ok
end

test "returns service unavailable if there is a stuck execution expired" do
job = SolidQueue::Job.create!(queue_name: "default", class_name: "CronJob")
process = SolidQueue::Process.create!({
name: "test",
last_heartbeat_at: 10.seconds.ago,
kind: "Worker",
pid: 123,
metadata: {
queues: "screenshots,default,active_storage,low,logging,solid_queue_recurring"
}
})
process.claimed_executions.create(job: job, created_at: 6.hours.ago)

get health_check_path, headers: { "HTTP_AUTHORIZATION" => @authorization }

assert_response :service_unavailable
end

test "returns ok if there is a stuck execution but not expired" do
job = SolidQueue::Job.create!(queue_name: "backups", class_name: "CronJob")
process = SolidQueue::Process.create!({
name: "test",
last_heartbeat_at: 10.seconds.ago,
kind: "Worker",
pid: 123,
metadata: {
queues: "screenshots,default,active_storage,low,logging,solid_queue_recurring"
}
})
process.claimed_executions.create(job: job, created_at: (2.hours + 10.minutes).ago)

get health_check_path, headers: { "HTTP_AUTHORIZATION" => @authorization }

assert_response :ok
end
end

0 comments on commit 5dff484

Please sign in to comment.