diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8bf79e62..61af76cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -100,7 +100,7 @@ jobs: AWS_REGION: us-east-1 AWS_ACCESS_KEY_ID: dummy AWS_SECRET_ACCESS_KEY: dummy - AWS_SERVICE_ENDPOINT: http://localhost:8000 + AWS_ENDPOINT_URL: http://localhost:8000 steps: - uses: actions/checkout@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 278d4801..16451511 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Issue - No longer rely on caller_runs for backpressure in sqs active job executor (#123). + 3.12.0 (2024-04-02) ------------------ * Feature - Drop support for Ruby 2.3 and Ruby 2.4 (#117). diff --git a/lib/active_job/queue_adapters/amazon_sqs_async_adapter.rb b/lib/active_job/queue_adapters/amazon_sqs_async_adapter.rb index 650b2dce..bba3dfbc 100644 --- a/lib/active_job/queue_adapters/amazon_sqs_async_adapter.rb +++ b/lib/active_job/queue_adapters/amazon_sqs_async_adapter.rb @@ -20,13 +20,13 @@ def _enqueue(job, body = nil, send_message_opts = {}) # FIFO jobs must be queued in order, so do not queue async queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(job.queue_name) if Aws::Rails::SqsActiveJob.fifo?(queue_url) - super(job, body, send_message_opts) + super else # Serialize is called here because the job’s locale needs to be # determined in this thread and not in some other thread. body = job.serialize Concurrent::Promises - .future { super(job, body, send_message_opts) } + .future { super } .rescue do |e| Rails.logger.error "Failed to queue job #{job}. Reason: #{e}" error_handler = Aws::Rails::SqsActiveJob.config.async_queue_error_handler diff --git a/lib/aws/rails/sqs_active_job/executor.rb b/lib/aws/rails/sqs_active_job/executor.rb index 6b4f852d..0785e956 100644 --- a/lib/aws/rails/sqs_active_job/executor.rb +++ b/lib/aws/rails/sqs_active_job/executor.rb @@ -12,17 +12,41 @@ class Executor max_threads: Concurrent.processor_count, auto_terminate: true, idletime: 60, # 1 minute - fallback_policy: :caller_runs # slow down the producer thread - # TODO: Consider catching the exception and sleeping instead of using :caller_runs + fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled }.freeze def initialize(options = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options)) @retry_standard_errors = options[:retry_standard_errors] @logger = options[:logger] || ActiveSupport::Logger.new($stdout) + @task_complete = Concurrent::Event.new end def execute(message) + post_task(message) + rescue Concurrent::RejectedExecutionError + # no capacity, wait for a task to complete + @task_complete.reset + @task_complete.wait + retry + end + + def shutdown(timeout = nil) + @executor.shutdown + clean_shutdown = @executor.wait_for_termination(timeout) + if clean_shutdown + @logger.info 'Clean shutdown complete. All executing jobs finished.' + else + @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ + 'finished cleanly. Unfinished jobs will not be removed from ' \ + 'the queue and can be ru-run once their visibility timeout ' \ + 'passes.' + end + end + + private + + def post_task(message) @executor.post(message) do |message| job = JobRunner.new(message) @logger.info("Running job: #{job.id}[#{job.class_name}]") @@ -43,19 +67,8 @@ def execute(message) else message.delete end - end - end - - def shutdown(timeout = nil) - @executor.shutdown - clean_shutdown = @executor.wait_for_termination(timeout) - if clean_shutdown - @logger.info 'Clean shutdown complete. All executing jobs finished.' - else - @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ - 'finished cleanly. Unfinished jobs will not be removed from ' \ - 'the queue and can be ru-run once their visibility timeout ' \ - 'passes.' + ensure + @task_complete.set end end end diff --git a/sample_app/bin/start_server b/sample_app/bin/start_server index 524f8fd1..dda5879f 100755 --- a/sample_app/bin/start_server +++ b/sample_app/bin/start_server @@ -6,8 +6,8 @@ if [ -e /workspace/sample_app/tmp/pids/*.pid ]; then rm /workspace/sample_app/tm bundle exec rails db:prepare -aws sqs --endpoint $AWS_SERVICE_ENDPOINT create-queue --queue-name async-job-queue --no-cli-pager -aws ses --endpoint $AWS_SERVICE_ENDPOINT verify-email-identity --email $ACTION_MAILER_EMAIL +aws sqs --endpoint $AWS_ENDPOINT_URL create-queue --queue-name async-job-queue --no-cli-pager +aws ses --endpoint $AWS_ENDPOINT_URL verify-email-identity --email $ACTION_MAILER_EMAIL bin/rails runner Aws::SessionStore::DynamoDB::Table.create_table diff --git a/sample_app/config/initializers/aws.rb b/sample_app/config/initializers/aws.rb deleted file mode 100644 index cae15362..00000000 --- a/sample_app/config/initializers/aws.rb +++ /dev/null @@ -1,5 +0,0 @@ -# frozen_string_literal: true - -Aws.config.update( - endpoint: ENV['AWS_SERVICE_ENDPOINT'], -) diff --git a/sample_app/docker-compose.yml b/sample_app/docker-compose.yml index 4231c64a..bd3db0c0 100644 --- a/sample_app/docker-compose.yml +++ b/sample_app/docker-compose.yml @@ -7,7 +7,7 @@ services: dockerfile: ./Dockerfile command: ["bundle", "exec", "rails", "s", "-b", "0.0.0.0"] environment: - AWS_SERVICE_ENDPOINT: http://localstack:4566 + AWS_ENDPOINT_URL: http://localstack:4566 AWS_REGION: us-east-1 AWS_ACCESS_KEY_ID: dummy AWS_SECRET_ACCESS_KEY: dummy @@ -28,7 +28,7 @@ services: dockerfile: ./Dockerfile command: ["bundle", "exec", "aws_sqs_active_job", "--queue", "default"] environment: - AWS_SERVICE_ENDPOINT: http://localstack:4566 + AWS_ENDPOINT_URL: http://localstack:4566 AWS_REGION: us-east-1 AWS_ACCESS_KEY_ID: dummy AWS_SECRET_ACCESS_KEY: dummy diff --git a/test/aws/rails/sqs_active_job/executor_test.rb b/test/aws/rails/sqs_active_job/executor_test.rb index 4d013b9d..bbeac0be 100644 --- a/test/aws/rails/sqs_active_job/executor_test.rb +++ b/test/aws/rails/sqs_active_job/executor_test.rb @@ -24,7 +24,7 @@ module SqsActiveJob # message is a reserved minitest name let(:msg) { double(data: double(body: body)) } let(:executor) { Executor.new } - let(:runner) { double('runner', id: 'jobid', class_name: 'jobclass') } + let(:runner) { double('runner', id: 'jobid', class_name: 'jobclass', exception_executions?: false) } it 'executes the job and deletes the message' do expect(JobRunner).to receive(:new).and_return(runner) @@ -53,6 +53,25 @@ module SqsActiveJob executor.shutdown # give the job a chance to run end end + + describe 'backpressure' do + let(:executor) { Executor.new(max_threads: 1, max_queue: 1) } + let(:trigger) { Concurrent::Event.new } + + it 'waits for a tasks to complete before attempting to post new tasks' do + task_complete_event = executor.instance_variable_get(:@task_complete) + expect(JobRunner).to receive(:new).at_least(:once).and_return(runner) + allow(runner).to receive(:run) do + trigger.wait + end + executor.execute(msg) # first message runs + executor.execute(msg) # second message enters queue + expect(task_complete_event).to receive(:wait).at_least(:once) do + trigger.set # unblock the task + end + executor.execute(msg) # third message triggers wait + end + end end describe '#shutdown' do