Skip to content
This repository has been archived by the owner on Nov 23, 2024. It is now read-only.

Commit

Permalink
Add ability to suppress InvalidIncrementException
Browse files Browse the repository at this point in the history
Add option to suppress exceptions so that clients can update the
`auto_increment_increment` while the alloc servers are in use.
  • Loading branch information
mriddle committed Apr 2, 2020
1 parent f7ecd48 commit 862e9b1
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 37 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ The `increment_by` value configured here does not dictate the value on your allo

Here's a complete list of the options you can use:

| Name | Default | Description |
| --------------------- | ------------------------------------------ | ---------------------------------------------------------------------------------------------------------- |
| `:disabled` | `false` | Disable GlobalUid entirely |
| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server |
| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server |
| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry |
| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message |
| `:increment_by` | 5 | Used to validate number of ID servers, preventing connections if there are more servers than the given increment |
| Name | Default | Description |
| -------------------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------ |
| `:disabled` | `false` | Disable GlobalUid entirely |
| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server |
| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server |
| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry |
| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message |
| `:increment_by` | 5 | Used for validation, compared with the value on the alloc servers to prevent allocation of duplicate IDs |
| `:suppress_increment_exceptions` | `false` | Suppress configuration validation, allowing updates to `auto_increment_increment` while alloc servers in use |

### Migration

Expand Down
55 changes: 36 additions & 19 deletions lib/global_uid/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Base
:query_timeout => 10,
:increment_by => 5, # This will define the maximum number of servers that you can have
:disabled => false,
:per_process_affinity => true
:per_process_affinity => true,
:suppress_increment_exceptions => false
}

def self.servers
Expand Down Expand Up @@ -95,7 +96,6 @@ def self.disconnect!

def self.setup_connections!
connection_timeout = self.global_uid_options[:connection_timeout]
increment_by = self.global_uid_options[:increment_by]

if self.servers.nil?
self.servers = init_server_info
Expand All @@ -111,9 +111,7 @@ def self.setup_connections!

begin
connection = new_connection(info[:name], connection_timeout)
if !connection.nil? && increment_by != (db_increment = connection.select_value("SELECT @@auto_increment_increment"))
raise InvalidIncrementPreventionException, "Configured: '#{increment_by}', Found: '#{db_increment}' on '#{connection.current_database}'"
end
fetch_and_validate_increment!(connection)

info[:cx] = connection
info[:retry_at] = Time.now + self.global_uid_options[:connection_retry] if connection.nil?
Expand Down Expand Up @@ -169,20 +167,11 @@ def self.get_connections
with_connections {}
end

def self.validate_increment!(new_record_id, connection)
server = self.servers.find { |s| connection.current_database.include?(s[:name]) }

unless server[:allocated_ids] << new_record_id
raise InvalidIncrementException, "Recently allocated IDs: #{server[:allocated_ids]} on '#{connection.current_database}'"
end
end

def self.get_uid_for_class(klass)
with_connections do |connection|
Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do
id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES ('a')")
validate_increment!(id, connection)
return id
return validate_increment!(id, connection)
end
end
raise NoServersAvailableException, "All global UID servers are gone!"
Expand All @@ -192,17 +181,45 @@ def self.get_many_uids_for_class(klass, count)
return [] unless count > 0
with_connections do |connection|
Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do
increment_by = connection.select_value("SELECT @@auto_increment_increment")
if increment_by != self.global_uid_options[:increment_by]
raise InvalidIncrementPreventionException, "Configured: '#{self.global_uid_options[:increment_by]}', Found: '#{increment_by}' on '#{connection.current_database}'"
end
increment_by = fetch_and_validate_increment!(connection)
start_id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES " + (["('a')"] * count).join(','))
return start_id.step(start_id + (count-1) * increment_by, increment_by).to_a
end
end
raise NoServersAvailableException, "All global UID servers are gone!"
end

def self.validate_increment!(new_record_id, connection)
server = self.servers.find { |s| connection.current_database.include?(s[:name]) }

begin
unless server[:allocated_ids] << new_record_id
raise InvalidIncrementException, \
"Recently allocated IDs: #{server[:allocated_ids]} on '#{connection.current_database}'"
end
return new_record_id
rescue InvalidIncrementException => e
raise unless self.global_uid_options[:suppress_increment_exceptions]
notify e, "#{e.message}"
return new_record_id
end
end

def self.fetch_and_validate_increment!(connection)
return if connection.nil?
db_increment = connection.select_value("SELECT @@auto_increment_increment")

begin
return db_increment if db_increment == self.global_uid_options[:increment_by]
raise InvalidIncrementPreventionException, "Configured: '#{self.global_uid_options[:increment_by]}', " \
"Found: '#{db_increment}' on '#{connection.current_database}'"
rescue InvalidIncrementPreventionException => e
raise unless self.global_uid_options[:suppress_increment_exceptions]
notify e, "#{e.message}"
return db_increment
end
end

def self.global_uid_options=(options)
@global_uid_options = GLOBAL_UID_DEFAULTS.merge(options.symbolize_keys)
end
Expand Down
99 changes: 89 additions & 10 deletions test/global_uid_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
ActiveRecord::Base.establish_connection(:test)
reset_connections!
restore_defaults!

# Randomize connections for test processes to ensure they're not
# sticky during tests
GlobalUid::Base.global_uid_options[:per_process_affinity] = false
end

after do
Expand Down Expand Up @@ -232,6 +236,76 @@ def table_exists?(connection, table)
end
end

describe "Updating the auto_increment_increment on active alloc servers" do
let(:updated_increment) { GlobalUid::Base::GLOBAL_UID_DEFAULTS[:increment_by] * 2 }

before do
CreateWithNoParams.up
CreateWithoutGlobalUIDs.up

# Set an interesting offset to ensure there's no impact
ActiveRecord::Base.configurations.to_h["test_id_server_1"]["variables"]["auto_increment_offset"] = 2
ActiveRecord::Base.configurations.to_h["test_id_server_2"]["variables"]["auto_increment_offset"] = 4
reset_connections!

@notifications = []
GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message|
GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message)
@notifications << exception.class
end
end

describe 'with increment exceptions raised' do
it 'takes the servers out of the pool, preventing usage during update' do
assert_raises(GlobalUid::NoServersAvailableException) do
with_modified_connections(increment: updated_increment, servers: ["test_id_server_1", "test_id_server_2"]) do
25.times { WithGlobalUID.create! }
end
end
end
end

describe "with increment exceptions suppressed " do
before do
GlobalUid::Base.global_uid_options[:suppress_increment_exceptions] = true
end

it "allows the increment to be updated" do
# Prefill alloc servers with a few records
records = []
records += (25.times.map { WithGlobalUID.create! })

# With only one server updated
with_modified_connections(increment: updated_increment, servers: ["test_id_server_1"]) do
records += 50.times.map { WithGlobalUID.create! }
end

# With all servers updated
with_modified_connections(increment: updated_increment, servers: ["test_id_server_1", "test_id_server_2"]) do
records += 25.times.map { WithGlobalUID.create! }
end

# Verify that unique records were created, exceptions weren't raised
# during the update and the client was notified of the discrepancy.
seen = {}
records.each do |record|
refute_nil record.id
refute seen.has_key?(record.id)
seen[record.id] = true
end

assert_equal(100, records.length)
assert_includes(@notifications, GlobalUid::InvalidIncrementPreventionException)
end
end

after do
reset_connections!
CreateWithNoParams.down
CreateWithoutGlobalUIDs.down
end
end

describe "With GlobalUID" do
before do
CreateWithNoParams.up
Expand All @@ -255,7 +329,7 @@ def table_exists?(connection, table)
@notifications = []
GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message|
GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message)
@notifications << exception
@notifications << exception.class
end
end

Expand Down Expand Up @@ -300,15 +374,7 @@ def table_exists?(connection, table)

describe "and only one server reports a value other than what's configured" do
it "notifies the client when configuration incorrect during initialization" do
modified_connection = Proc.new do |name, connection_timeout, offset, increment_by|
config = ActiveRecord::Base.configurations.to_h[name]
ActiveRecord::Base.mysql2_connection(config).tap do |connection|
connection.execute("SET SESSION auto_increment_increment = 42") if name == "test_id_server_1"
end
end

GlobalUid::Base.stub :new_connection, modified_connection do
reset_connections!
with_modified_connections(increment: 42, servers: ["test_id_server_1"]) do
test_unique_ids(32)
assert_includes(@notifications, GlobalUid::InvalidIncrementPreventionException)
end
Expand Down Expand Up @@ -534,4 +600,17 @@ def restore_defaults!
def show_create_sql(klass, table)
klass.connection.select_rows("show create table #{table}")[0][1]
end

def with_modified_connections(increment:, servers:)
modified_connection = Proc.new do |name, _, _, _|
config = ActiveRecord::Base.configurations.to_h[name]
ActiveRecord::Base.mysql2_connection(config).tap do |connection|
connection.execute("SET SESSION auto_increment_increment = #{increment}") if servers.include?(name)
end
end
GlobalUid::Base.stub :new_connection, modified_connection do
reset_connections!
yield
end
end
end

0 comments on commit 862e9b1

Please sign in to comment.