diff --git a/Changelog.md b/Changelog.md index 81c072f..ddd448d 100644 --- a/Changelog.md +++ b/Changelog.md @@ -6,6 +6,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added +- [Breaking change] ID Validation, ensure the ID coming back has been incremented using the configured `auto_increment_increment`. (https://github.com/zendesk/global_uid/pull/63) + ### Removed - Removed the `dry_run` option (https://github.com/zendesk/global_uid/pull/64) - Removed `GlobalUid::ServerVariables` module (https://github.com/zendesk/global_uid/pull/66) diff --git a/README.md b/README.md index 8d426a5..4e2dea5 100644 --- a/README.md +++ b/README.md @@ -49,14 +49,15 @@ The `increment_by` value configured here does not dictate the value on your allo Here's a complete list of the options you can use: -| Name | Default | Description | -| --------------------- | ------------------------------------------ | ---------------------------------------------------------------------------------------------------------- | -| `:disabled` | `false` | Disable GlobalUid entirely | -| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server | -| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server | -| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry | -| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message | -| `:increment_by` | 5 | Used to validate number of ID servers, preventing connections if there are more servers than the given increment | +| Name | Default | Description | +| -------------------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------ | +| `:disabled` | `false` | Disable GlobalUid entirely | +| `:connection_timeout` | 3 seconds | Timeout for connecting to a global UID server | +| `:query_timeout` | 10 seconds | Timeout for retrieving a global UID from a server before we move on to the next server | +| `:connection_retry` | 10 minutes | After failing to connect or query a UID server, how long before we retry | +| `:notifier` | A proc calling `ActiveRecord::Base.logger` | This proc is called with two parameters upon UID server failure -- an exception and a message | +| `:increment_by` | 5 | Used for validation, compared with the value on the alloc servers to prevent allocation of duplicate IDs | +| `:suppress_increment_exceptions` | `false` | Suppress configuration validation, allowing updates to `auto_increment_increment` while alloc servers in use | ### Migration diff --git a/lib/global_uid.rb b/lib/global_uid.rb index 9a0e7f5..c792950 100644 --- a/lib/global_uid.rb +++ b/lib/global_uid.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true require "global_uid/base" +require "global_uid/allocator" require "global_uid/active_record_extension" require "global_uid/has_and_belongs_to_many_builder_extension" require "global_uid/migration_extension" @@ -9,6 +10,7 @@ module GlobalUid class NoServersAvailableException < StandardError ; end class ConnectionTimeoutException < StandardError ; end class TimeoutException < StandardError ; end + class InvalidIncrementException < StandardError ; end end ActiveRecord::Base.send(:include, GlobalUid::ActiveRecordExtension) diff --git a/lib/global_uid/allocator.rb b/lib/global_uid/allocator.rb new file mode 100644 index 0000000..380a99c --- /dev/null +++ b/lib/global_uid/allocator.rb @@ -0,0 +1,67 @@ +module GlobalUid + class Allocator + attr_reader :recent_allocations, :max_window_size, :incrementing_by, :connection + + def initialize(incrementing_by:, connection:) + @recent_allocations = [] + @max_window_size = 5 + @incrementing_by = incrementing_by + @connection = connection + validate_connection_increment + end + + def allocate_one(table) + identifier = connection.insert("REPLACE INTO #{table} (stub) VALUES ('a')") + allocate(identifier) + end + + def allocate_many(table, count:) + increment_by = validate_connection_increment + + start_id = connection.insert("REPLACE INTO #{table} (stub) VALUES " + (["('a')"] * count).join(',')) + identifiers = start_id.step(start_id + (count - 1) * increment_by, increment_by).to_a + identifiers.each { |identifier| allocate(identifier) } + identifiers + end + + private + + def allocate(identifier) + recent_allocations.shift if recent_allocations.size >= max_window_size + recent_allocations << identifier + + if !valid_allocation? + db_increment = connection.select_value("SELECT @@auto_increment_increment") + message = "Configured: '#{incrementing_by}', Found: '#{db_increment}' on '#{connection.current_database}'. Recently allocated IDs: #{recent_allocations}" + alert(InvalidIncrementException.new(message)) + end + + identifier + end + + def valid_allocation? + recent_allocations[1..-1].all? do |identifier| + (identifier > recent_allocations[0]) && + (identifier - recent_allocations[0]) % incrementing_by == 0 + end + end + + def validate_connection_increment + db_increment = connection.select_value("SELECT @@auto_increment_increment") + + if db_increment != incrementing_by + alert(InvalidIncrementException.new("Configured: '#{incrementing_by}', Found: '#{db_increment}' on '#{connection.current_database}'")) + end + + db_increment + end + + def alert(exception) + if GlobalUid::Base.global_uid_options[:suppress_increment_exceptions] + GlobalUid::Base.notify(exception, exception.message) + else + raise exception + end + end + end +end diff --git a/lib/global_uid/base.rb b/lib/global_uid/base.rb index b52d84a..ffe4c8e 100644 --- a/lib/global_uid/base.rb +++ b/lib/global_uid/base.rb @@ -11,9 +11,10 @@ class Base :connection_retry => 10.minutes, :notifier => Proc.new { |exception, message| ActiveRecord::Base.logger.error("GlobalUID error: #{exception.class} #{message}") }, :query_timeout => 10, - :increment_by => 5, # This will define the maximum number of servers that you can have + :increment_by => 5, # This will define the maximum number of servers that you can have :disabled => false, - :per_process_affinity => true + :per_process_affinity => true, + :suppress_increment_exceptions => false } def self.servers @@ -94,6 +95,7 @@ def self.disconnect! def self.setup_connections! connection_timeout = self.global_uid_options[:connection_timeout] + increment_by = self.global_uid_options[:increment_by] if self.servers.nil? self.servers = init_server_info @@ -107,9 +109,18 @@ def self.setup_connections! if info[:new?] || ( info[:retry_at] && Time.now > info[:retry_at] ) info[:new?] = false - connection = new_connection(info[:name], connection_timeout) - info[:cx] = connection - info[:retry_at] = Time.now + self.global_uid_options[:connection_retry] if connection.nil? + begin + connection = new_connection(info[:name], connection_timeout) + info[:cx] = connection + if connection.nil? + info[:retry_at] = Time.now + self.global_uid_options[:connection_retry] + else + info[:allocator] = Allocator.new(incrementing_by: increment_by, connection: connection) + end + rescue InvalidIncrementException => e + notify e, "#{e.message}" + info[:cx] = nil + end end end @@ -160,9 +171,9 @@ def self.get_connections def self.get_uid_for_class(klass) with_connections do |connection| + server = self.servers.find { |s| connection.current_database.include?(s[:name]) } Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do - id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES ('a')") - return id + return server[:allocator].allocate_one(klass.global_uid_table) end end raise NoServersAvailableException, "All global UID servers are gone!" @@ -171,10 +182,9 @@ def self.get_uid_for_class(klass) def self.get_many_uids_for_class(klass, count) return [] unless count > 0 with_connections do |connection| + server = self.servers.find { |s| connection.current_database.include?(s[:name]) } Timeout.timeout(self.global_uid_options[:query_timeout], TimeoutException) do - increment_by = connection.select_value("SELECT @@auto_increment_increment") - start_id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES " + (["('a')"] * count).join(',')) - return start_id.step(start_id + (count-1) * increment_by, increment_by).to_a + return server[:allocator].allocate_many(klass.global_uid_table, count: count) end end raise NoServersAvailableException, "All global UID servers are gone!" diff --git a/test/lib/allocator_test.rb b/test/lib/allocator_test.rb new file mode 100644 index 0000000..1bb4a8a --- /dev/null +++ b/test/lib/allocator_test.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true +require_relative '../test_helper' + +describe GlobalUid::Allocator do + let(:connection) { mock('connection') } + let(:increment_by) { 10 } + let(:allocator) { GlobalUid::Allocator.new(incrementing_by: increment_by, connection: connection) } + let(:table) { mock('table_class').class } + + before do + restore_defaults! + connection.stubs(:current_database).returns('database_name') + end + + describe 'with a configured increment_by that differs from the connections auto_increment_increment' do + it 'raises an exception to prevent erroneous ID allocation' do + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(50) + exception = assert_raises(GlobalUid::InvalidIncrementException) do + GlobalUid::Allocator.new(incrementing_by: 10, connection: connection) + end + + assert_equal("Configured: '10', Found: '50' on 'database_name'", exception.message) + end + end + + describe 'with a configured increment_by that matches the connections auto_increment_increment' do + before do + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(increment_by) + end + + describe '#allocate_one' do + it 'allocates IDs, maintaining a small rolling selection of IDs for comparison' do + [10, 20, 30, 40, 50, 60, 70, 80].each do |id| + connection.expects(:insert).returns(id) + allocator.allocate_one(table) + end + + assert_equal(5, allocator.max_window_size) + assert_equal([40, 50, 60, 70, 80], allocator.recent_allocations) + end + + describe 'gap between ID not divisible by increment_by' do + it 'raises an error' do + connection.expects(:insert).returns(20) + allocator.allocate_one(table) + + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(5) + connection.expects(:insert).returns(25) + exception = assert_raises(GlobalUid::InvalidIncrementException) do + allocator.allocate_one(table) + end + + assert_equal("Configured: '10', Found: '5' on 'database_name'. Recently allocated IDs: [20, 25]", exception.message) + end + end + + describe 'ID value does not increment upwards' do + it 'raises an error' do + connection.expects(:insert).returns(20) + allocator.allocate_one(table) + + connection.expects(:insert).returns(20 - increment_by) + exception = assert_raises(GlobalUid::InvalidIncrementException) do + allocator.allocate_one(table) + end + + assert_equal("Configured: '10', Found: '10' on 'database_name'. Recently allocated IDs: [20, 10]", exception.message) + end + end + end + + describe '#allocate_many' do + it 'allocates IDs, maintaining a small rolling selection of IDs for comparison' do + connection.expects(:insert) + .with("REPLACE INTO Mocha::Mock (stub) VALUES ('a'),('a'),('a'),('a'),('a'),('a'),('a'),('a')") + .returns(10) + allocator.allocate_many(table, count: 8) + + assert_equal(5, allocator.max_window_size) + assert_equal([40, 50, 60, 70, 80], allocator.recent_allocations) + end + + describe 'with a configured increment_by that differs from the active connection auto_increment_increment' do + it 'raises an error' do + connection.stubs(:select_value).with('SELECT @@auto_increment_increment').returns(5) + connection.expects(:insert).never + exception = assert_raises(GlobalUid::InvalidIncrementException) do + allocator.allocate_many(table, count: 8) + end + + assert_equal("Configured: '10', Found: '5' on 'database_name'", exception.message) + end + end + end + end +end diff --git a/test/lib/global_uid_test.rb b/test/lib/global_uid_test.rb index 0d97f38..a43ea8f 100644 --- a/test/lib/global_uid_test.rb +++ b/test/lib/global_uid_test.rb @@ -214,6 +214,69 @@ def table_exists?(connection, table) end end + describe "Updating the auto_increment_increment on active alloc servers" do + before do + CreateWithNoParams.up + CreateWithoutGlobalUIDs.up + + @notifications = [] + GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message| + GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message) + @notifications << exception.class + end + end + + describe 'with increment exceptions raised' do + it 'takes the servers out of the pool, preventing usage during update' do + assert_raises(GlobalUid::NoServersAvailableException) do + # Double the increment_by value and set it on the database connection (auto_increment_increment) + # Record creation will fail as all connections will be rejected since they're configured incorrectly. + # The client is expecting `auto_increment_increment` to equal the configured `increment_by` + with_modified_connections(increment: 10, servers: ["test_id_server_1", "test_id_server_2"]) do + 25.times { WithGlobalUID.create! } + end + end + end + end + + describe "with increment exceptions suppressed " do + before do + GlobalUid::Base.global_uid_options[:suppress_increment_exceptions] = true + end + + it "allows the increment to be updated" do + # Prefill alloc servers with a few records, initializing a connection to both alloc servers + test_unique_ids(25) + assert_empty(@notifications) + + # Update the active `test_id_server_1` connection, setting a `auto_increment_increment` + # value that differs to what's configured and expected + # The change should be noted and record creation should continue on both servers + with_modified_connections(increment: 10, servers: ["test_id_server_1"]) do + test_unique_ids(25) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + assert_equal(2, GlobalUid::Base.get_connections.length) + end + + # Update both active `test_id_server_1` and `test_id_server_2` connections, setting a `auto_increment_increment` + # value that differs to what's configured and expected + # The change should be noted and record creation should continue on both servers + @notifications = [] + with_modified_connections(increment: 10, servers: ["test_id_server_1", "test_id_server_2"]) do + test_unique_ids(25) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + assert_equal(2, GlobalUid::Base.get_connections.length) + end + end + end + + after do + reset_connections! + CreateWithNoParams.down + CreateWithoutGlobalUIDs.down + end + end + describe "With GlobalUID" do before do CreateWithNoParams.up @@ -231,34 +294,121 @@ def table_exists?(connection, table) it "get a unique id" do test_unique_ids end + + describe 'when the auto_increment_increment changes' do + before do + @notifications = [] + GlobalUid::Base.global_uid_options[:notifier] = Proc.new do |exception, message| + GlobalUid::Base::GLOBAL_UID_DEFAULTS[:notifier].call(exception, message) + @notifications << exception.class + end + end + + describe "and all servers report a value other than what's configured" do + it "raises an exception when configuration incorrect during initialization" do + GlobalUid::Base.global_uid_options[:increment_by] = 42 + reset_connections! + assert_raises(GlobalUid::NoServersAvailableException) { test_unique_ids(10) } + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "raises an exception, preventing duplicate ID generation" do + GlobalUid::Base.with_connections do |con| + con.execute("SET SESSION auto_increment_increment = 42") + end + + assert_raises(GlobalUid::NoServersAvailableException) { test_unique_ids(10) } + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "raises an exception before attempting to generate many UIDs" do + GlobalUid::Base.with_connections do |con| + con.execute("SET SESSION auto_increment_increment = 42") + end + + assert_raises GlobalUid::NoServersAvailableException do + WithGlobalUID.generate_many_uids(10) + end + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "doesn't cater for increment_by being increased by a factor of x" do + GlobalUid::Base.with_connections do |connection| + connection.execute("SET SESSION auto_increment_increment = #{GlobalUid::Base::GLOBAL_UID_DEFAULTS[:increment_by] * 2}") + end + # Due to multiple processes and threads sharing the same alloc server, identifiers may be provisioned + # before the current thread receives its next one. We rely on the gap being divisible by the configured increment + test_unique_ids(10) + assert_empty(@notifications) + end + end + + describe "and only one server reports a value other than what's configured" do + it "notifies the client when configuration incorrect during initialization" do + with_modified_connections(increment: 42, servers: ["test_id_server_1"]) do + + # Trigger the exception, one call may not hit the server, there's still a 1/(2^32) chance of failure. + test_unique_ids(32) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + end + + it "notifies the client and continues with the other connection" do + con = GlobalUid::Base.get_connections.first + con.execute("SET SESSION auto_increment_increment = 42") + + # Trigger the exception, one call may not hit the server, there's still a 1/(2^32) chance of failure. + test_unique_ids(32) + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + + it "notifies the client and continues when attempting to generate many UIDs" do + con = GlobalUid::Base.get_connections.first + con.execute("SET SESSION auto_increment_increment = 42") + + # Trigger the exception, one call may not hit the server, there's still a 1/(2^32) chance of failure. + 32.times { WithGlobalUID.generate_many_uids(10) } + assert_includes(@notifications, GlobalUid::InvalidIncrementException) + end + end + end end describe "With a timing out server" do - before do - reset_connections! - @a_decent_cx = GlobalUid::Base.new_connection(GlobalUid::Base.global_uid_servers.first, 50) - ActiveRecord::Base.stubs(:mysql2_connection).raises(GlobalUid::ConnectionTimeoutException).then.returns(@a_decent_cx) - @connections = GlobalUid::Base.get_connections + def with_timed_out_connection(server:, end_time:) + modified_connection = lambda do |config| + if config["database"].include?(server) + raise GlobalUid::ConnectionTimeoutException if end_time > Time.now + end + + ActiveRecord::Base.__minitest_stub__mysql2_connection(config) + end + ActiveRecord::Base.stub :mysql2_connection, modified_connection do + reset_connections! + yield + end end it "limp along with one functioning server" do - assert_includes @connections, @a_decent_cx - assert_equal GlobalUid::Base.global_uid_servers.size - 1, @connections.size, "get_connections size" + with_timed_out_connection(server: "test_id_server_1", end_time: Time.now + 10.minutes) do + test_unique_ids(10) + assert_equal 1, GlobalUid::Base.get_connections.size + assert_equal 'global_uid_test_id_server_2', GlobalUid::Base.get_connections[0].current_database + end end it "eventually retry the connection and get it back in place" do - # clear the state machine expectation - ActiveRecord::Base.mysql2_connection rescue nil - ActiveRecord::Base.mysql2_connection rescue nil + with_timed_out_connection(server: "test_id_server_1", end_time: Time.now + 10.minutes) do + test_unique_ids(10) + assert_equal 1, GlobalUid::Base.get_connections.size + assert_equal 'global_uid_test_id_server_2', GlobalUid::Base.get_connections[0].current_database - awhile = Time.now + 10.hours - Time.stubs(:now).returns(awhile) - - assert_equal GlobalUid::Base.get_connections.size, GlobalUid::Base.global_uid_servers.size - end + after_timeout_end_time = Time.now + 11.minutes + Time.stubs(:now).returns(after_timeout_end_time) - it "get some unique ids" do - test_unique_ids + test_unique_ids(10) + assert_equal 2, GlobalUid::Base.get_connections.size + end end end @@ -406,4 +556,17 @@ def parent_child_fork_values def show_create_sql(klass, table) klass.connection.select_rows("show create table #{table}")[0][1] end + + def with_modified_connections(increment:, servers:) + modified_connection = lambda do |name, _connection_timeout| + config = ActiveRecord::Base.configurations.to_h[name] + ActiveRecord::Base.mysql2_connection(config).tap do |connection| + connection.execute("SET SESSION auto_increment_increment = #{increment}") if servers.include?(name) + end + end + GlobalUid::Base.stub :new_connection, modified_connection do + reset_connections! + yield + end + end end