Skip to content

Commit

Permalink
No longer rely on caller_runs for backpressure in sqs active job exec…
Browse files Browse the repository at this point in the history
…utor (#124)
  • Loading branch information
alextwoods authored Jun 6, 2024
1 parent f054371 commit 62c99c0
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
AWS_SERVICE_ENDPOINT: http://localhost:8000
AWS_ENDPOINT_URL: http://localhost:8000
steps:
- uses: actions/checkout@v4

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Issue - No longer rely on caller_runs for backpressure in sqs active job executor (#123).

3.12.0 (2024-04-02)
------------------
* Feature - Drop support for Ruby 2.3 and Ruby 2.4 (#117).
Expand Down
4 changes: 2 additions & 2 deletions lib/active_job/queue_adapters/amazon_sqs_async_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ def _enqueue(job, body = nil, send_message_opts = {})
# FIFO jobs must be queued in order, so do not queue async
queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(job.queue_name)
if Aws::Rails::SqsActiveJob.fifo?(queue_url)
super(job, body, send_message_opts)
super
else
# Serialize is called here because the job’s locale needs to be
# determined in this thread and not in some other thread.
body = job.serialize
Concurrent::Promises
.future { super(job, body, send_message_opts) }
.future { super }
.rescue do |e|
Rails.logger.error "Failed to queue job #{job}. Reason: #{e}"
error_handler = Aws::Rails::SqsActiveJob.config.async_queue_error_handler
Expand Down
43 changes: 28 additions & 15 deletions lib/aws/rails/sqs_active_job/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,41 @@ class Executor
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60, # 1 minute
fallback_policy: :caller_runs # slow down the producer thread
# TODO: Consider catching the exception and sleeping instead of using :caller_runs
fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}.freeze

def initialize(options = {})
@executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
@retry_standard_errors = options[:retry_standard_errors]
@logger = options[:logger] || ActiveSupport::Logger.new($stdout)
@task_complete = Concurrent::Event.new
end

def execute(message)
post_task(message)
rescue Concurrent::RejectedExecutionError
# no capacity, wait for a task to complete
@task_complete.reset
@task_complete.wait
retry
end

def shutdown(timeout = nil)
@executor.shutdown
clean_shutdown = @executor.wait_for_termination(timeout)
if clean_shutdown
@logger.info 'Clean shutdown complete. All executing jobs finished.'
else
@logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \
'finished cleanly. Unfinished jobs will not be removed from ' \
'the queue and can be ru-run once their visibility timeout ' \
'passes.'
end
end

private

def post_task(message)
@executor.post(message) do |message|
job = JobRunner.new(message)
@logger.info("Running job: #{job.id}[#{job.class_name}]")
Expand All @@ -43,19 +67,8 @@ def execute(message)
else
message.delete
end
end
end

def shutdown(timeout = nil)
@executor.shutdown
clean_shutdown = @executor.wait_for_termination(timeout)
if clean_shutdown
@logger.info 'Clean shutdown complete. All executing jobs finished.'
else
@logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \
'finished cleanly. Unfinished jobs will not be removed from ' \
'the queue and can be ru-run once their visibility timeout ' \
'passes.'
ensure
@task_complete.set
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions sample_app/bin/start_server
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ if [ -e /workspace/sample_app/tmp/pids/*.pid ]; then rm /workspace/sample_app/tm

bundle exec rails db:prepare

aws sqs --endpoint $AWS_SERVICE_ENDPOINT create-queue --queue-name async-job-queue --no-cli-pager
aws ses --endpoint $AWS_SERVICE_ENDPOINT verify-email-identity --email $ACTION_MAILER_EMAIL
aws sqs --endpoint $AWS_ENDPOINT_URL create-queue --queue-name async-job-queue --no-cli-pager
aws ses --endpoint $AWS_ENDPOINT_URL verify-email-identity --email $ACTION_MAILER_EMAIL

bin/rails runner Aws::SessionStore::DynamoDB::Table.create_table

Expand Down
5 changes: 0 additions & 5 deletions sample_app/config/initializers/aws.rb

This file was deleted.

4 changes: 2 additions & 2 deletions sample_app/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
dockerfile: ./Dockerfile
command: ["bundle", "exec", "rails", "s", "-b", "0.0.0.0"]
environment:
AWS_SERVICE_ENDPOINT: http://localstack:4566
AWS_ENDPOINT_URL: http://localstack:4566
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
Expand All @@ -28,7 +28,7 @@ services:
dockerfile: ./Dockerfile
command: ["bundle", "exec", "aws_sqs_active_job", "--queue", "default"]
environment:
AWS_SERVICE_ENDPOINT: http://localstack:4566
AWS_ENDPOINT_URL: http://localstack:4566
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
Expand Down
21 changes: 20 additions & 1 deletion test/aws/rails/sqs_active_job/executor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module SqsActiveJob
# message is a reserved minitest name
let(:msg) { double(data: double(body: body)) }
let(:executor) { Executor.new }
let(:runner) { double('runner', id: 'jobid', class_name: 'jobclass') }
let(:runner) { double('runner', id: 'jobid', class_name: 'jobclass', exception_executions?: false) }

it 'executes the job and deletes the message' do
expect(JobRunner).to receive(:new).and_return(runner)
Expand Down Expand Up @@ -53,6 +53,25 @@ module SqsActiveJob
executor.shutdown # give the job a chance to run
end
end

describe 'backpressure' do
let(:executor) { Executor.new(max_threads: 1, max_queue: 1) }
let(:trigger) { Concurrent::Event.new }

it 'waits for a tasks to complete before attempting to post new tasks' do
task_complete_event = executor.instance_variable_get(:@task_complete)
expect(JobRunner).to receive(:new).at_least(:once).and_return(runner)
allow(runner).to receive(:run) do
trigger.wait
end
executor.execute(msg) # first message runs
executor.execute(msg) # second message enters queue
expect(task_complete_event).to receive(:wait).at_least(:once) do
trigger.set # unblock the task
end
executor.execute(msg) # third message triggers wait
end
end
end

describe '#shutdown' do
Expand Down

0 comments on commit 62c99c0

Please sign in to comment.