From 4b3bdbc250ddd02da2ea060e97d28107a576848e Mon Sep 17 00:00:00 2001 From: Matthew Riddle Date: Mon, 23 Mar 2020 14:46:53 +0100 Subject: [PATCH] ID validation [Breaking change] This patch adds `auto_increment_increment` validation to the following scenarios: * Connection initalization * Generating one ID * Generating multiple IDs These changes should have no noticeable impact on performance and are thread safe. The intent is to prevent data corruption when a server is misconfigured and in the event that one is found, it's removed from the pool of available alloc servers allowing creation to continue with the valid alloc servers. Validation during initialization is done by comparing the client configuration and with what's set on each server. If the server is misconfigured, an error is reported via the `notifier` and it's left out of the pool. Validation during creation happens by comparing each pair of identifiers and ensuring they've been incremented correctly by the configured amount. If the `auto_increment_increment` changes while the clients are using the server, the alloc server is removed from the list of servers and an error is reported via the `notifier`. An InvalidIncrementException is raised if all servers are misconfigured. The changeset is considered a breaking change as any clients upgrading will see an increment exception if their `increment_by` was set incorrectly, this previously would have gone unnoticed. [squashing] Add more detail to what the spec is doing Addressing this comment > Could we please expand a bit on this comment? Maybe tell which IDs are > created now, and which IDs would be created after calling > updated_increment once or twice. > It feels like I have to read a lot of code to understand this test in > its current form. I could do that now, but I know that I may well be > to lazy to do so when looking at this test a year or two in the future Asserting the IDs could lead to spurious failures since it'll come from one of the two servers. [squashing] Rename max_size to max_window_size max_size is misleading, one may presume that it's the maximum amount of IDs allowed to be allocated for the server. If they passed 100000 in, it would have a negative performance hit [squashing] Rename allocation methods The method names were inconsistent, the method checks the allocation, comparing it with the previous. It doesn't check the increment, except in `validate_connection_increment` which remains unchanged [squashing] Cleanup allocator validation The previous implementation was a mess, it was cleaned up in the follow- up PR, bringing it strait into this PR. The InvalidIncrementException message has also been improved, when the ID returned is an unexpected value, we'll query the database before closing the connection to get the set `auto_increment_increment` value. E.g GlobalUid::InvalidIncrementException Configured: '5', Found: '42' on 'global_uid_test_id_server_1'. Recently allocated IDs: [43, 85] Also, while re-reading the code I realised that the previous `alert` implementation would log the error twice when exceptions aren't suppressed --- Changelog.md | 3 + README.md | 17 ++-- lib/global_uid.rb | 2 + lib/global_uid/allocator.rb | 67 ++++++++++++ lib/global_uid/base.rb | 30 ++++-- test/lib/allocator_test.rb | 96 ++++++++++++++++++ test/lib/global_uid_test.rb | 197 ++++++++++++++++++++++++++++++++---- 7 files changed, 377 insertions(+), 35 deletions(-) create mode 100644 lib/global_uid/allocator.rb create mode 100644 test/lib/allocator_test.rb diff --git a/Changelog.md b/Changelog.md index 81c072f..ddd448d 100644 --- a/Changelog.md +++ b/Changelog.md @@ -6,6 +6,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added +- [Breaking change] ID Validation, ensure the ID coming back has been incremented using the configured `auto_increment_increment`. (https://github.com/zendesk/global_uid/pull/63) + ### Removed - Removed the `dry_run` option (https://github.com/zendesk/global_uid/pull/64) - Removed `GlobalUid::ServerVariables` module (https://github.com/zendesk/global_uid/pull/66) diff --git a/README.md b/README.md index 8d426a5..4e2dea5 100644 --- a/README.md +++ b/README.md @@ -49,14 +49,15 @@ The `increment_by` value configured here does not dictate the value on your allo Here's a complete list of the options you can use: -| Name | Default | Description | -| --------------------- | ------------------------------------------ | ---------------------------------------------------------------------------------------------------------- | -| `:disabled` | `false` | Disable GlobalUid entirely | -| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server | -| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server | -| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry | -| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message | -| `:increment_by` | 5 | Used to validate number of ID servers, preventing connections if there are more servers than the given increment | +| Name | Default | Description | +| -------------------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------ | +| `:disabled` | `false` | Disable GlobalUid entirely | +| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server | +| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server | +| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry | +| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message | +| `:increment_by` | 5 | Used for validation, compared with the value on the alloc servers to prevent allocation of duplicate IDs | +| `:suppress_increment_exceptions` | `false` | Suppress configuration validation, allowing updates to `auto_increment_increment` while alloc servers in use | ### Migration diff --git a/lib/global_uid.rb b/lib/global_uid.rb index 9a0e7f5..c792950 100644 --- a/lib/global_uid.rb +++ b/lib/global_uid.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true require "global_uid/base" +require "global_uid/allocator" require "global_uid/active_record_extension" require "global_uid/has_and_belongs_to_many_builder_extension" require "global_uid/migration_extension" @@ -9,6 +10,7 @@ module GlobalUid class NoServersAvailableException < StandardError ; end class ConnectionTimeoutException < StandardError ; end class TimeoutException < StandardError ; end + class InvalidIncrementException < StandardError ; end end ActiveRecord::Base.send(:include, GlobalUid::ActiveRecordExtension) diff --git a/lib/global_uid/allocator.rb b/lib/global_uid/allocator.rb new file mode 100644 index 0000000..380a99c --- /dev/null +++ b/lib/global_uid/allocator.rb @@ -0,0 +1,67 @@ +module GlobalUid + class Allocator + attr_reader :recent_allocations, :max_window_size, :incrementing_by, :connection + + def initialize(incrementing_by:, connection:) + @recent_allocations = [] + @max_window_size = 5 + @incrementing_by = incrementing_by + @connection = connection + validate_connection_increment + end + + def allocate_one(table) + identifier = connection.insert("REPLACE INTO #{table} (stub) VALUES ('a')") + allocate(identifier) + end + + def allocate_many(table, count:) + increment_by = validate_connection_increment + + start_id = connection.insert("REPLACE INTO #{table} (stub) VALUES " + (["('a')"] * count).join(',')) + identifiers = start_id.step(start_id + (count - 1) * increment_by, increment_by).to_a + identifiers.each { |identifier| allocate(identifier) } + identifiers + end + + private + + def allocate(identifier) + recent_allocations.shift if recent_allocations.size >= max_window_size + recent_allocations << identifier + + if !valid_allocation? + db_increment = connection.select_value("SELECT @@auto_increment_increment") + message = "Configured: '#{incrementing_by}', Found: '#{db_increment}' on '#{connection.current_database}'. Recently allocated IDs: #{recent_allocations}" + alert(InvalidIncrementException.new(message)) + end + + identifier + end + + def valid_allocation? + recent_allocations[1..-1].all? do |identifier| + (identifier > recent_allocations[0]) && + (identifier - recent_allocations[0]) % incrementing_by == 0 + end + end + + def validate_connection_increment + db_increment = connection.select_value("SELECT @@auto_increment_increment") + + if db_increment != incrementing_by + alert(InvalidIncrementException.new("Configured: '#{incrementing_by}', Found: '#{db_increment}' on '#{connection.current_database}'")) + end + + db_increment + end + + def alert(exception) + if GlobalUid::Base.global_uid_options[:suppress_increment_exceptions] + GlobalUid::Base.notify(exception, exception.message) + else + raise exception + end + end + end +end diff --git a/lib/global_uid/base.rb b/lib/global_uid/base.rb index b52d84a..ffe4c8e 100644 --- a/lib/global_uid/base.rb +++ b/lib/global_uid/base.rb @@ -11,9 +11,10 @@ class Base :connection_retry => 10.minutes, :notifier => Proc.new { |exception, message| ActiveRecord::Base.logger.error("GlobalUID error: #{exception.class} #{message}") }, :query_timeout => 10, - :increment_by => 5, # This will define the maximum number of servers that you can have + :increment_by => 5, # This will define the maximum number of servers that you can have :disabled => false, - :per_process_affinity => true + :per_process_affinity => true, + :suppress_increment_exceptions => false } def self.servers @@ -94,6 +95,7 @@ def self.disconnect! def self.setup_connections! connection_timeout = self.global_uid_options[:connection_timeout] + increment_by = self.global_uid_options[:increment_by] if self.servers.nil? self.servers = init_server_info @@ -107,9 +109,18 @@ def self.setup_connections! if info[:new?] || ( info[:retry_at] && Time.now > info[:retry_at] ) info[:new?] = false - connection = new_connection(info[:name], connection_timeout) - info[:cx] = connection - info[:retry_at] = Time.now + self.global_uid_options[:connection_retry] if connection.nil? + begin + connection = new_connection(info[:name], connection_timeout) + info[:cx] = connection + if connection.nil? + info[:retry_at] = Time.now + self.global_uid_options[:connection_retry] + else + info[:allocator] = Allocator.new(incrementing_by: increment_by, connection: connection) + end + rescue InvalidIncrementException => e + notify e, "#{e.message}" + info[:cx] = nil + end end end @@ -160,9 +171,9 @@ def self.get_connections def self.get_uid_for_class(klass) with_connections do |connection| + server = self.servers.find { |s| connection.current_database.include?(s[:name]) } Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do - id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES ('a')") - return id + return server[:allocator].allocate_one(klass.global_uid_table) end end raise NoServersAvailableException, "All global UID servers are gone!" @@ -171,10 +182,9 @@ def self.get_uid_for_class(klass) def self.get_many_uids_for_class(klass, count) return [] unless count > 0 with_connections do |connection| + server = self.servers.find { |s| connection.current_database.include?(s[:name]) } Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do - increment_by = connection.select_value("SELECT @@auto_increment_increment") - start_id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES " + (["('a')"] * count).join(',')) - return start_id.step(start_id + (count-1) * increment_by, increment_by).to_a + return server[:allocator].allocate_many(klass.global_uid_table, count: count) end end raise NoServersAvailableException, "All global UID servers are gone!" diff --git a/test/lib/allocator_test.rb b/test/lib/allocator_test.rb new file mode 100644 index 0000000..1bb4a8a --- /dev/null +++ b/test/lib/allocator_test.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true +require_relative '../test_helper' + +describe GlobalUid::Allocator do + let(:connection) { mock('connection') } + let(:increment_by) { 10 } + let(:allocator) { GlobalUid::Allocator.new(incrementing_by: increment_by, connection: connection) } + let(:table) { mock('table_class').class } + + before do + restore_defaults! + connection.stubs(:current_database).returns('database_name') + end + + describe 'with a configured increment_by that differs from the connections auto_increment_increment' do + it 'raises an exception to prevent erroneous ID allocation' do + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(50) + exception = assert_raises(GlobalUid::InvalidIncrementException) do + GlobalUid::Allocator.new(incrementing_by: 10, connection: connection) + end + + assert_equal("Configured: '10', Found: '50' on 'database_name'", exception.message) + end + end + + describe 'with a configured increment_by that matches the connections auto_increment_increment' do + before do + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(increment_by) + end + + describe '#allocate_one' do + it 'allocates IDs, maintaining a small rolling selection of IDs for comparison' do + [10, 20, 30, 40, 50, 60, 70, 80].each do |id| + connection.expects(:insert).returns(id) + allocator.allocate_one(table) + end + + assert_equal(5, allocator.max_window_size) + assert_equal([40, 50, 60, 70, 80], allocator.recent_allocations) + end + + describe 'gap between ID not divisible by increment_by' do + it 'raises an error' do + connection.expects(:insert).returns(20) + allocator.allocate_one(table) + + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(5) + connection.expects(:insert).returns(25) + exception = assert_raises(GlobalUid::InvalidIncrementException) do + allocator.allocate_one(table) + end + + assert_equal("Configured: '10', Found: '5' on 'database_name'. Recently allocated IDs: [20, 25]", exception.message) + end + end + + describe 'ID value does not increment upwards' do + it 'raises an error' do + connection.expects(:insert).returns(20) + allocator.allocate_one(table) + + connection.expects(:insert).returns(20 - increment_by) + exception = assert_raises(GlobalUid::InvalidIncrementException) do + allocator.allocate_one(table) + end + + assert_equal("Configured: '10', Found: '10' on 'database_name'. Recently allocated IDs: [20, 10]", exception.message) + end + end + end + + describe '#allocate_many' do + it 'allocates IDs, maintaining a small rolling selection of IDs for comparison' do + connection.expects(:insert) + .with("REPLACE INTO Mocha::Mock (stub) VALUES ('a'),('a'),('a'),('a'),('a'),('a'),('a'),('a')") + .returns(10) + allocator.allocate_many(table, count: 8) + + assert_equal(5, allocator.max_window_size) + assert_equal([40, 50, 60, 70, 80], allocator.recent_allocations) + end + + describe 'with a configured increment_by that differs from the active connection auto_increment_increment' do + it 'raises an error' do + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(5) + connection.expects(:insert).never + exception = assert_raises(GlobalUid::InvalidIncrementException) do + allocator.allocate_many(table, count: 8) + end + + assert_equal("Configured: '10', Found: '5' on 'database_name'", exception.message) + end + end + end + end +end diff --git a/test/lib/global_uid_test.rb b/test/lib/global_uid_test.rb index 0d97f38..a43ea8f 100644 --- a/test/lib/global_uid_test.rb +++ b/test/lib/global_uid_test.rb @@ -214,6 +214,69 @@ def table_exists?(connection, table) end end + describe "Updating the auto_increment_increment on active alloc servers" do + before do + CreateWithNoParams.up + CreateWithoutGlobalUIDs.up + + @notifications = [] + GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message| + GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message) + @notifications << exception.class + end + end + + describe 'with increment exceptions raised' do + it 'takes the servers out of the pool, preventing usage during update' do + assert_raises(GlobalUid::NoServersAvailableException) do + # Double the increment_by value and set it on the database connection (auto_increment_increment) + # Record creation will fail as all connections will be rejected since they're configured incorrectly. + # The client is expecting `auto_increment_increment` to equal the configured `increment_by` + with_modified_connections(increment: 10, servers: ["test_id_server_1", "test_id_server_2"]) do + 25.times { WithGlobalUID.create! } + end + end + end + end + + describe "with increment exceptions suppressed " do + before do + GlobalUid::Base.global_uid_options[:suppress_increment_exceptions] = true + end + + it "allows the increment to be updated" do + # Prefill alloc servers with a few records, initializing a connection to both alloc servers + test_unique_ids(25) + assert_empty(@notifications) + + # Update the active `test_id_server_1` connection, setting a `auto_increment_increment` + # value that differs to what's configured and expected + # The change should be noted and record creation should continue on both servers + with_modified_connections(increment: 10, servers: ["test_id_server_1"]) do + test_unique_ids(25) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + assert_equal(2, GlobalUid::Base.get_connections.length) + end + + # Update both active `test_id_server_1` and `test_id_server_2` connections, setting a `auto_increment_increment` + # value that differs to what's configured and expected + # The change should be noted and record creation should continue on both servers + @notifications = [] + with_modified_connections(increment: 10, servers: ["test_id_server_1", "test_id_server_2"]) do + test_unique_ids(25) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + assert_equal(2, GlobalUid::Base.get_connections.length) + end + end + end + + after do + reset_connections! + CreateWithNoParams.down + CreateWithoutGlobalUIDs.down + end + end + describe "With GlobalUID" do before do CreateWithNoParams.up @@ -231,34 +294,121 @@ def table_exists?(connection, table) it "get a unique id" do test_unique_ids end + + describe 'when the auto_increment_increment changes' do + before do + @notifications = [] + GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message| + GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message) + @notifications << exception.class + end + end + + describe "and all servers report a value other than what's configured" do + it "raises an exception when configuration incorrect during initialization" do + GlobalUid::Base.global_uid_options[:increment_by] = 42 + reset_connections! + assert_raises(GlobalUid::NoServersAvailableException) { test_unique_ids(10) } + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "raises an exception, preventing duplicate ID generation" do + GlobalUid::Base.with_connections do |con| + con.execute("SET SESSION auto_increment_increment = 42") + end + + assert_raises(GlobalUid::NoServersAvailableException) { test_unique_ids(10) } + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "raises an exception before attempting to generate many UIDs" do + GlobalUid::Base.with_connections do |con| + con.execute("SET SESSION auto_increment_increment = 42") + end + + assert_raises GlobalUid::NoServersAvailableException do + WithGlobalUID.generate_many_uids(10) + end + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "doesn't cater for increment_by being increased by a factor of x" do + GlobalUid::Base.with_connections do |connection| + connection.execute("SET SESSION auto_increment_increment = #{GlobalUid::Base::GLOBAL_UID_DEFAULTS[:increment_by] * 2}") + end + # Due to multiple processes and threads sharing the same alloc server, identifiers may be provisioned + # before the current thread receives its next one. We rely on the gap being divisible by the configured increment + test_unique_ids(10) + assert_empty(@notifications) + end + end + + describe "and only one server reports a value other than what's configured" do + it "notifies the client when configuration incorrect during initialization" do + with_modified_connections(increment: 42, servers: ["test_id_server_1"]) do + + # Trigger the exception, one call may not hit the server, there's still a 1/(2^32) chance of failure. + test_unique_ids(32) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + end + + it "notifies the client and continues with the other connection" do + con = GlobalUid::Base.get_connections.first + con.execute("SET SESSION auto_increment_increment = 42") + + # Trigger the exception, one call may not hit the server, there's still a 1/(2^32) chance of failure. + test_unique_ids(32) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "notifies the client and continues when attempting to generate many UIDs" do + con = GlobalUid::Base.get_connections.first + con.execute("SET SESSION auto_increment_increment = 42") + + # Trigger the exception, one call may not hit the server, there's still a 1/(2^32) chance of failure. + 32.times { WithGlobalUID.generate_many_uids(10) } + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + end + end end describe "With a timing out server" do - before do - reset_connections! - @a_decent_cx = GlobalUid::Base.new_connection(GlobalUid::Base.global_uid_servers.first, 50) - ActiveRecord::Base.stubs(:mysql2_connection).raises(GlobalUid::ConnectionTimeoutException).then.returns(@a_decent_cx) - @connections = GlobalUid::Base.get_connections + def with_timed_out_connection(server:, end_time:) + modified_connection = lambda do |config| + if config["database"].include?(server) + raise GlobalUid::ConnectionTimeoutException if end_time > Time.now + end + + ActiveRecord::Base.__minitest_stub__mysql2_connection(config) + end + ActiveRecord::Base.stub :mysql2_connection, modified_connection do + reset_connections! + yield + end end it "limp along with one functioning server" do - assert_includes @connections, @a_decent_cx - assert_equal GlobalUid::Base.global_uid_servers.size - 1, @connections.size, "get_connections size" + with_timed_out_connection(server: "test_id_server_1", end_time: Time.now + 10.minutes) do + test_unique_ids(10) + assert_equal 1, GlobalUid::Base.get_connections.size + assert_equal 'global_uid_test_id_server_2', GlobalUid::Base.get_connections[0].current_database + end end it "eventually retry the connection and get it back in place" do - # clear the state machine expectation - ActiveRecord::Base.mysql2_connection rescue nil - ActiveRecord::Base.mysql2_connection rescue nil + with_timed_out_connection(server: "test_id_server_1", end_time: Time.now + 10.minutes) do + test_unique_ids(10) + assert_equal 1, GlobalUid::Base.get_connections.size + assert_equal 'global_uid_test_id_server_2', GlobalUid::Base.get_connections[0].current_database - awhile = Time.now + 10.hours - Time.stubs(:now).returns(awhile) - - assert_equal GlobalUid::Base.get_connections.size, GlobalUid::Base.global_uid_servers.size - end + after_timeout_end_time = Time.now + 11.minutes + Time.stubs(:now).returns(after_timeout_end_time) - it "get some unique ids" do - test_unique_ids + test_unique_ids(10) + assert_equal 2, GlobalUid::Base.get_connections.size + end end end @@ -406,4 +556,17 @@ def parent_child_fork_values def show_create_sql(klass, table) klass.connection.select_rows("show create table #{table}")[0][1] end + + def with_modified_connections(increment:, servers:) + modified_connection = lambda do |name, _connection_timeout| + config = ActiveRecord::Base.configurations.to_h[name] + ActiveRecord::Base.mysql2_connection(config).tap do |connection| + connection.execute("SET SESSION auto_increment_increment = #{increment}") if servers.include?(name) + end + end + GlobalUid::Base.stub :new_connection, modified_connection do + reset_connections! + yield + end + end end