Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE-475: Fix AWS instance credential failures leaving Chore in an unrecoverable state #71

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
**Cleanups**
- N/A

## [v4.7.0](https://github.com/Tapjoy/chore/tree/v4.7.0)

**Features**
- N/A

**Fixed bugs**
- Fix AWS instance credential fetching never retrying on failure
- Fix SQS consumer never recovering from certain AWS client errors such as authentication failures

**Cleanups**
- N/A

## [v4.6.0](https://github.com/Tapjoy/chore/tree/v4.6.0)

**Cleanups**
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.latest.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
chore-core (4.6.0)
chore-core (4.7.0)
aws-sdk-sqs (>= 1)
get_process_mem (>= 0.2.0)
multi_json
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.ruby27.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
chore-core (4.6.0)
chore-core (4.7.0)
aws-sdk-sqs (>= 1)
get_process_mem (>= 0.2.0)
multi_json
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.ruby31.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
chore-core (4.6.0)
chore-core (4.7.0)
aws-sdk-sqs (>= 1)
get_process_mem (>= 0.2.0)
multi_json
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.ruby32.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
chore-core (4.6.0)
chore-core (4.7.0)
aws-sdk-sqs (>= 1)
get_process_mem (>= 0.2.0)
multi_json
Expand Down
7 changes: 6 additions & 1 deletion lib/chore/queues/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ module Chore
module Queues
module SQS
def self.sqs_client
Aws::SQS::Client.new(logger: Chore.logger, log_level: Chore.log_level_to_sym)
Aws::SQS::Client.new(
logger: Chore.logger,
log_level: Chore.log_level_to_sym,
instance_profile_credentials_timeout: 5, # default: 1
instance_profile_credentials_retries: 5, # default: 0
)
end

# Helper method to create queues based on the currently known list as provided by your configured Chore::Jobs
Expand Down
26 changes: 20 additions & 6 deletions lib/chore/queues/sqs/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ class Consumer < Chore::Consumer
Chore::CLI.register_option 'aws_secret_key', '--aws-secret-key KEY', 'Valid AWS Secret Key'
Chore::CLI.register_option 'dedupe_servers', '--dedupe-servers SERVERS', 'List of mememcache compatible server(s) to use for storing SQS Message Dedupe cache'

# Resets the API client connection and provides @@reset_at so we know when the last time that was done
def self.reset_connection!
@@reset_at = Time.now
end

# @param [String] queue_name Name of SQS queue
# @param [Hash] opts Options
def initialize(queue_name, opts={})
super(queue_name, opts)
raise Chore::TerribleMistake, "Cannot specify a queue polling size greater than 10" if sqs_polling_amount > 10
end

# Resets the API client connection and provides @@reset_at so we know when the last time that was done
def self.reset_connection!
@@reset_at = Time.now
# Ensure that that consumer is capable of running
def verify_connection!
queue.data
end

# Begins requesting messages from SQS, which will invoke the +&handler+ over each message
Expand All @@ -36,9 +41,6 @@ def consume(&handler)
begin
messages = handle_messages(&handler)
sleep (Chore.config.consumer_sleep_interval) if messages.empty?
rescue Aws::SQS::Errors::NonExistentQueue => e
Chore.logger.error "You specified a queue '#{queue_name}' that does not exist. You must create the queue before starting Chore. Shutting down..."
raise Chore::TerribleMistake
rescue => e
Chore.logger.error { "SQSConsumer#Consume: #{e.inspect} #{e.backtrace * "\n"}" }
end
Expand Down Expand Up @@ -86,6 +88,18 @@ def delay(item, backoff_calc)
#
# @return [Array<Aws::SQS::Message>]
def handle_messages(&block)
begin
verify_connection!
rescue => e
# We shut down on connection failures for a few reasons:
# * The AWS SQS client has already been configured to retry on temporal issues like authentication failures
# * We rely on the operating system to re-run chore if it shuts down
# * We don't want chore to keep spinning if there's an unrecoverable exception with the client;
# it's safest to restart chore in these situations
Chore.logger.error "There was a problem verifying the connection to the queue: #{e.message}. Shutting down..."
raise Chore::TerribleMistake
end

msg = queue.receive_messages(:max_number_of_messages => sqs_polling_amount, :attribute_names => ['ApproximateReceiveCount'])
messages = *msg
received_timestamp = Time.now
Expand Down
2 changes: 1 addition & 1 deletion lib/chore/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Chore
module Version #:nodoc:
MAJOR = 4
MINOR = 6
MINOR = 7
PATCH = 0

STRING = [ MAJOR, MINOR, PATCH ].join('.')
Expand Down
31 changes: 31 additions & 0 deletions spec/chore/queues/sqs/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def consume(&block)
before do
allow(Aws::SQS::Client).to receive(:new).and_return(sqs)
allow(Aws::SQS::Queue).to receive(:new).and_return(queue)
allow(queue).to receive(:data).and_return(nil)
allow(queue).to receive(:receive_messages).and_return(receive_message_result)
allow(message).to receive(:attributes).and_return({ 'ApproximateReceiveCount' => rand(10) })
end
Expand Down Expand Up @@ -135,6 +136,36 @@ def consume(&block)
end
end
end

context 'on queue lookup failure' do
before(:each) do
allow(consumer).to receive(:verify_connection!).and_raise(Aws::SQS::Errors::NonExistentQueue)
end

it 'should raise exception' do
expect { consume }.to raise_error(Chore::TerribleMistake)
end
end

context 'on aws credential failure' do
before(:each) do
allow(consumer).to receive(:verify_connection!).and_raise(Aws::Errors::MissingCredentialsError)
end

it 'should raise exception' do
expect { consume }.to raise_error(Chore::TerribleMistake)
end
end

context 'on unexpected failure' do
before(:each) do
allow(consumer).to receive(:verify_connection!).and_raise(Seahorse::Client::NetworkingError)
end

it 'should raise exception' do
expect { consume }.to raise_error(Chore::TerribleMistake)
end
end
end

describe "completing work" do
Expand Down
Loading