From 862e9b1d3597539703d082f63642e80e6b4478ba Mon Sep 17 00:00:00 2001 From: Matthew Riddle Date: Mon, 30 Mar 2020 16:08:17 +0200 Subject: [PATCH] Add ability to suppress InvalidIncrementException Add option to suppress exceptions so that clients can update the `auto_increment_increment` while the alloc servers are in use. --- README.md | 17 +++---- lib/global_uid/base.rb | 55 +++++++++++++++-------- test/global_uid_test.rb | 99 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 134 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 8d426a5..4e2dea5 100644 --- a/README.md +++ b/README.md @@ -49,14 +49,15 @@ The `increment_by` value configured here does not dictate the value on your allo Here's a complete list of the options you can use: -| Name | Default | Description | -| --------------------- | ------------------------------------------ | ---------------------------------------------------------------------------------------------------------- | -| `:disabled` | `false` | Disable GlobalUid entirely | -| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server | -| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server | -| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry | -| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message | -| `:increment_by` | 5 | Used to validate number of ID servers, preventing connections if there are more servers than the given increment | +| Name | Default | Description | +| -------------------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------ | +| `:disabled` | `false` | Disable GlobalUid entirely | +| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server | +| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server | +| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry | +| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message | +| `:increment_by` | 5 | Used for validation, compared with the value on the alloc servers to prevent allocation of duplicate IDs | +| `:suppress_increment_exceptions` | `false` | Suppress configuration validation, allowing updates to `auto_increment_increment` while alloc servers in use | ### Migration diff --git a/lib/global_uid/base.rb b/lib/global_uid/base.rb index f4b5407..8466e4b 100644 --- a/lib/global_uid/base.rb +++ b/lib/global_uid/base.rb @@ -13,7 +13,8 @@ class Base :query_timeout => 10, :increment_by => 5, # This will define the maximum number of servers that you can have :disabled => false, - :per_process_affinity => true + :per_process_affinity => true, + :suppress_increment_exceptions => false } def self.servers @@ -95,7 +96,6 @@ def self.disconnect! def self.setup_connections! connection_timeout = self.global_uid_options[:connection_timeout] - increment_by = self.global_uid_options[:increment_by] if self.servers.nil? self.servers = init_server_info @@ -111,9 +111,7 @@ def self.setup_connections! begin connection = new_connection(info[:name], connection_timeout) - if !connection.nil? && increment_by != (db_increment = connection.select_value("SELECT @@auto_increment_increment")) - raise InvalidIncrementPreventionException, "Configured: '#{increment_by}', Found: '#{db_increment}' on '#{connection.current_database}'" - end + fetch_and_validate_increment!(connection) info[:cx] = connection info[:retry_at] = Time.now + self.global_uid_options[:connection_retry] if connection.nil? @@ -169,20 +167,11 @@ def self.get_connections with_connections {} end - def self.validate_increment!(new_record_id, connection) - server = self.servers.find { |s| connection.current_database.include?(s[:name]) } - - unless server[:allocated_ids] << new_record_id - raise InvalidIncrementException, "Recently allocated IDs: #{server[:allocated_ids]} on '#{connection.current_database}'" - end - end - def self.get_uid_for_class(klass) with_connections do |connection| Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES ('a')") - validate_increment!(id, connection) - return id + return validate_increment!(id, connection) end end raise NoServersAvailableException, "All global UID servers are gone!" @@ -192,10 +181,7 @@ def self.get_many_uids_for_class(klass, count) return [] unless count > 0 with_connections do |connection| Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do - increment_by = connection.select_value("SELECT @@auto_increment_increment") - if increment_by != self.global_uid_options[:increment_by] - raise InvalidIncrementPreventionException, "Configured: '#{self.global_uid_options[:increment_by]}', Found: '#{increment_by}' on '#{connection.current_database}'" - end + increment_by = fetch_and_validate_increment!(connection) start_id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES " + (["('a')"] * count).join(',')) return start_id.step(start_id + (count-1) * increment_by, increment_by).to_a end @@ -203,6 +189,37 @@ def self.get_many_uids_for_class(klass, count) raise NoServersAvailableException, "All global UID servers are gone!" end + def self.validate_increment!(new_record_id, connection) + server = self.servers.find { |s| connection.current_database.include?(s[:name]) } + + begin + unless server[:allocated_ids] << new_record_id + raise InvalidIncrementException, \ + "Recently allocated IDs: #{server[:allocated_ids]} on '#{connection.current_database}'" + end + return new_record_id + rescue InvalidIncrementException => e + raise unless self.global_uid_options[:suppress_increment_exceptions] + notify e, "#{e.message}" + return new_record_id + end + end + + def self.fetch_and_validate_increment!(connection) + return if connection.nil? + db_increment = connection.select_value("SELECT @@auto_increment_increment") + + begin + return db_increment if db_increment == self.global_uid_options[:increment_by] + raise InvalidIncrementPreventionException, "Configured: '#{self.global_uid_options[:increment_by]}', " \ + "Found: '#{db_increment}' on '#{connection.current_database}'" + rescue InvalidIncrementPreventionException => e + raise unless self.global_uid_options[:suppress_increment_exceptions] + notify e, "#{e.message}" + return db_increment + end + end + def self.global_uid_options=(options) @global_uid_options = GLOBAL_UID_DEFAULTS.merge(options.symbolize_keys) end diff --git a/test/global_uid_test.rb b/test/global_uid_test.rb index 1808271..1ae653b 100644 --- a/test/global_uid_test.rb +++ b/test/global_uid_test.rb @@ -9,6 +9,10 @@ ActiveRecord::Base.establish_connection(:test) reset_connections! restore_defaults! + + # Randomize connections for test processes to ensure they're not + # sticky during tests + GlobalUid::Base.global_uid_options[:per_process_affinity] = false end after do @@ -232,6 +236,76 @@ def table_exists?(connection, table) end end + describe "Updating the auto_increment_increment on active alloc servers" do + let(:updated_increment) { GlobalUid::Base::GLOBAL_UID_DEFAULTS[:increment_by] * 2 } + + before do + CreateWithNoParams.up + CreateWithoutGlobalUIDs.up + + # Set an interesting offset to ensure there's no impact + ActiveRecord::Base.configurations.to_h["test_id_server_1"]["variables"]["auto_increment_offset"] = 2 + ActiveRecord::Base.configurations.to_h["test_id_server_2"]["variables"]["auto_increment_offset"] = 4 + reset_connections! + + @notifications = [] + GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message| + GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message) + @notifications << exception.class + end + end + + describe 'with increment exceptions raised' do + it 'takes the servers out of the pool, preventing usage during update' do + assert_raises(GlobalUid::NoServersAvailableException) do + with_modified_connections(increment: updated_increment, servers: ["test_id_server_1", "test_id_server_2"]) do + 25.times { WithGlobalUID.create! } + end + end + end + end + + describe "with increment exceptions suppressed " do + before do + GlobalUid::Base.global_uid_options[:suppress_increment_exceptions] = true + end + + it "allows the increment to be updated" do + # Prefill alloc servers with a few records + records = [] + records += (25.times.map { WithGlobalUID.create! }) + + # With only one server updated + with_modified_connections(increment: updated_increment, servers: ["test_id_server_1"]) do + records += 50.times.map { WithGlobalUID.create! } + end + + # With all servers updated + with_modified_connections(increment: updated_increment, servers: ["test_id_server_1", "test_id_server_2"]) do + records += 25.times.map { WithGlobalUID.create! } + end + + # Verify that unique records were created, exceptions weren't raised + # during the update and the client was notified of the discrepancy. + seen = {} + records.each do |record| + refute_nil record.id + refute seen.has_key?(record.id) + seen[record.id] = true + end + + assert_equal(100, records.length) + assert_includes(@notifications, GlobalUid::InvalidIncrementPreventionException) + end + end + + after do + reset_connections! + CreateWithNoParams.down + CreateWithoutGlobalUIDs.down + end + end + describe "With GlobalUID" do before do CreateWithNoParams.up @@ -255,7 +329,7 @@ def table_exists?(connection, table) @notifications = [] GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message| GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message) - @notifications << exception + @notifications << exception.class end end @@ -300,15 +374,7 @@ def table_exists?(connection, table) describe "and only one server reports a value other than what's configured" do it "notifies the client when configuration incorrect during initialization" do - modified_connection = Proc.new do |name, connection_timeout, offset, increment_by| - config = ActiveRecord::Base.configurations.to_h[name] - ActiveRecord::Base.mysql2_connection(config).tap do |connection| - connection.execute("SET SESSION auto_increment_increment = 42") if name == "test_id_server_1" - end - end - - GlobalUid::Base.stub :new_connection, modified_connection do - reset_connections! + with_modified_connections(increment: 42, servers: ["test_id_server_1"]) do test_unique_ids(32) assert_includes(@notifications, GlobalUid::InvalidIncrementPreventionException) end @@ -534,4 +600,17 @@ def restore_defaults! def show_create_sql(klass, table) klass.connection.select_rows("show create table #{table}")[0][1] end + + def with_modified_connections(increment:, servers:) + modified_connection = Proc.new do |name, _, _, _| + config = ActiveRecord::Base.configurations.to_h[name] + ActiveRecord::Base.mysql2_connection(config).tap do |connection| + connection.execute("SET SESSION auto_increment_increment = #{increment}") if servers.include?(name) + end + end + GlobalUid::Base.stub :new_connection, modified_connection do + reset_connections! + yield + end + end end