diff --git a/.rubocop.yml b/.rubocop.yml index e9f70da9..c2d0303a 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -3,3 +3,6 @@ inherit_from: AllCops: TargetRubyVersion: 2.4 + +Layout/BeginEndAlignment: + EnforcedStyleAlignWith: start_of_line diff --git a/dev.yml b/dev.yml index 2f16955b..4ce17165 100644 --- a/dev.yml +++ b/dev.yml @@ -6,7 +6,7 @@ up: - mysql-client@5.7: or: [mysql@5.7] conflicts: [mysql-connector-c, mysql, mysql-client] - - ruby: 2.6.5 + - ruby: 2.7.2 - railgun - bundler diff --git a/lib/identity_cache.rb b/lib/identity_cache.rb index c370be5f..cc01e702 100644 --- a/lib/identity_cache.rb +++ b/lib/identity_cache.rb @@ -65,6 +65,8 @@ class UnsupportedAssociationError < StandardError; end class DerivedModelError < StandardError; end + class LockWaitTimeout < StandardError; end + mattr_accessor :cache_namespace self.cache_namespace = "IDC:#{CACHE_VERSION}:" @@ -141,10 +143,13 @@ def should_use_cache? # :nodoc: # # == Parameters # +key+ A cache key string + # +cache_fetcher_options+ A hash of options to pass to the cache backend # - def fetch(key) + def fetch(key, cache_fetcher_options = {}) if should_use_cache? - unmap_cached_nil_for(cache.fetch(key) { map_cached_nil_for(yield) }) + unmap_cached_nil_for(cache.fetch(key, cache_fetcher_options) do + map_cached_nil_for(yield) + end) else yield end diff --git a/lib/identity_cache/cache_fetcher.rb b/lib/identity_cache/cache_fetcher.rb index 2688bfc6..d4c046e8 100644 --- a/lib/identity_cache/cache_fetcher.rb +++ b/lib/identity_cache/cache_fetcher.rb @@ -1,8 +1,52 @@ # frozen_string_literal: true + +require 'securerandom' + module IdentityCache class CacheFetcher attr_accessor :cache_backend + EMPTY_HASH = {}.freeze + + class FillLock + FILL_LOCKED = :fill_locked + FAILED_CLIENT_ID = 'fill_failed' + + class << self + def from_cache(marker, client_id, data_version) + raise ArgumentError unless marker == FILL_LOCKED + new(client_id: client_id, data_version: data_version) + end + + def cache_value?(cache_value) + cache_value.is_a?(Array) && cache_value.length == 3 && cache_value.first == FILL_LOCKED + end + end + + attr_reader :client_id, :data_version + + def initialize(client_id:, data_version:) + @client_id = client_id + @data_version = data_version + end + + def cache_value + [FILL_LOCKED, client_id, data_version] + end + + def mark_failed + @client_id = FAILED_CLIENT_ID + end + + def fill_failed? + @client_id == FAILED_CLIENT_ID + end + + def ==(other) + self.class == other.class && client_id == other.client_id && data_version == other.data_version + end + end + def initialize(cache_backend) @cache_backend = cache_backend end @@ -25,27 +69,198 @@ def fetch_multi(keys, &block) results end - def fetch(key) - result = nil - yielded = false - @cache_backend.cas(key) do |value| - yielded = true - unless IdentityCache::DELETED == value - result = value - break + def fetch(key, fill_lock_duration: nil, lock_wait_tries: 2) + if fill_lock_duration && IdentityCache.should_fill_cache? + fetch_with_fill_lock(key, fill_lock_duration, lock_wait_tries) do + yield + end + else + fetch_without_fill_lock(key) { yield } + end + end + + private + + def fetch_without_fill_lock(key) + data = nil + upsert(key) do |value| + value = nil if value == IdentityCache::DELETED || FillLock.cache_value?(value) + unless value.nil? + return value end - result = yield + data = yield break unless IdentityCache.should_fill_cache? - result + data + end + data + end + + def fetch_with_fill_lock(key, fill_lock_duration, lock_wait_tries) + raise ArgumentError, 'fill_lock_duration must be greater than 0.0' unless fill_lock_duration > 0.0 + raise ArgumentError, 'lock_wait_tries must be greater than 0' unless lock_wait_tries > 0 + lock = nil + using_fallback_key = false + expiration_options = EMPTY_HASH + (lock_wait_tries + 2).times do # +2 is for first attempt and retry with fallback key + result = fetch_or_take_lock(key, old_lock: lock, **expiration_options) + case result + when FillLock + lock = result + if lock.client_id == client_id # have lock + data = begin + yield + rescue + mark_fill_failure_on_lock(key, expiration_options) + raise + end + + if !fill_with_lock(key, data, lock, expiration_options) && !using_fallback_key + # fallback to storing data in the fallback key so it is available to clients waiting on the lock + expiration_options = fallback_key_expiration_options(fill_lock_duration) + @cache_backend.write(lock_fill_fallback_key(key, lock), data, expiration_options) + end + return data + else + raise LockWaitTimeout if lock_wait_tries <= 0 + lock_wait_tries -= 1 + + # If fill failed in the other client, then it might be failing fast + # so avoid waiting the typical amount of time for a lock wait. The + # semian gem can be used to handle failing fast when the database is slow. + if lock.fill_failed? + return fetch_without_fill_lock(key) { yield } + end + + # lock wait + sleep(fill_lock_duration) + # loop around to retry fetch_or_take_lock + end + when IdentityCache::DELETED # interrupted by cache invalidation + if using_fallback_key + raise "unexpected cache invalidation of versioned fallback key" + elsif lock + # Cache invalidated during lock wait, use a versioned fallback key + # to avoid further cache invalidation interruptions. + using_fallback_key = true + key = lock_fill_fallback_key(key, lock) + expiration_options = fallback_key_expiration_options(fill_lock_duration) + # loop around to retry with fallback key + else + # Cache invalidation prevented lock from being taken or read, so we don't + # have a data version to use to build a shared fallback key. In the future + # we could add the data version to the cache invalidation value so a fallback + # key could be used here. For now, we assume that a cache invalidation occuring + # just after the cache wasn't filled is more likely a sign of a key that is + # written more than read (which this cache isn't a good fit for), rather than + # a thundering herd or reads. + return yield + end + when nil # Errors talking to memcached + return yield + else # hit + return result + end + end + raise "unexpected number of loop iterations" + end + + def mark_fill_failure_on_lock(key, expiration_options) + @cache_backend.cas(key, expiration_options) do |value| + break unless FillLock.cache_value?(value) + lock = FillLock.from_cache(*value) + break if lock.client_id != client_id + lock.mark_failed + lock.cache_value + end + end + + def upsert(key, expiration_options = EMPTY_HASH) + yielded = false + upserted = @cache_backend.cas(key, expiration_options) do |value| + yielded = true + yield value end unless yielded - result = yield - add(key, result) + data = yield nil + upserted = add(key, data, expiration_options) end - result + upserted end - private + def fetch_or_take_lock(key, old_lock:, **expiration_options) + new_lock = nil + upserted = upsert(key, expiration_options) do |value| + if value.nil? || value == IdentityCache::DELETED + if old_lock # cache invalidated + return value + else + new_lock = FillLock.new(client_id: client_id, data_version: SecureRandom.uuid) + end + elsif FillLock.cache_value?(value) + fetched_lock = FillLock.from_cache(*value) + if old_lock == fetched_lock + # preserve data version since there hasn't been any cache invalidations + new_lock = FillLock.new(client_id: client_id, data_version: old_lock.data_version) + elsif old_lock && fetched_lock.data_version != old_lock.data_version + # Cache was invalidated, then another lock was taken during a lock wait. + # Treat it as any other cache invalidation, where the caller will switch + # to the fallback key. + return IdentityCache::DELETED + else + return fetched_lock + end + else # hit + return value + end + new_lock.cache_value # take lock + end + + return new_lock if upserted + + value = @cache_backend.read(key) + if FillLock.cache_value?(value) + FillLock.from_cache(*value) + else + value + end + end + + def fill_with_lock(key, data, my_lock, expiration_options) + upserted = upsert(key, expiration_options) do |value| + return false if value.nil? || value == IdentityCache::DELETED + return true unless FillLock.cache_value?(value) # already filled + current_lock = FillLock.from_cache(*value) + if current_lock.data_version != my_lock.data_version + return false # invalidated then relocked + end + data + end + + upserted + end + + def lock_fill_fallback_key(key, lock) + "lock_fill:#{lock.data_version}:#{key}" + end + + def fallback_key_expiration_options(fill_lock_duration) + # Override the default TTL for the fallback key lock since it won't be used for very long. + expires_in = fill_lock_duration * 2 + + # memcached uses integer number of seconds for TTL so round up to avoid having + # the cache store round down with `to_i` + expires_in = expires_in.ceil + + # memcached TTL only gets part of the first second (https://github.com/memcached/memcached/issues/307), + # so increase TTL by 1 to compensate + expires_in += 1 + + { expires_in: expires_in } + end + + def client_id + @client_id ||= SecureRandom.uuid + end def cas_multi(keys) result = nil @@ -81,8 +296,9 @@ def add_multi(keys) result.each { |k, v| add(k, v) } end - def add(key, value) - @cache_backend.write(key, value, unless_exist: true) if IdentityCache.should_fill_cache? + def add(key, value, expiration_options = EMPTY_HASH) + return false unless IdentityCache.should_fill_cache? + @cache_backend.write(key, value, { unless_exist: true, **expiration_options }) end end end diff --git a/lib/identity_cache/cache_key_loader.rb b/lib/identity_cache/cache_key_loader.rb index 4025858f..4cfcc9c5 100644 --- a/lib/identity_cache/cache_key_loader.rb +++ b/lib/identity_cache/cache_key_loader.rb @@ -25,12 +25,12 @@ class << self # @param cache_fetcher [_CacheFetcher] # @param db_key Reference to what to load from the database. # @return The database value corresponding to the database key. - def load(cache_fetcher, db_key) + def load(cache_fetcher, db_key, cache_fetcher_options = {}) cache_key = cache_fetcher.cache_key(db_key) db_value = nil - cache_value = IdentityCache.fetch(cache_key) do + cache_value = IdentityCache.fetch(cache_key, cache_fetcher_options) do db_value = cache_fetcher.load_one_from_db(db_key) cache_fetcher.cache_encode(db_value) end diff --git a/lib/identity_cache/cached/primary_index.rb b/lib/identity_cache/cached/primary_index.rb index ab7db75f..18c93198 100644 --- a/lib/identity_cache/cached/primary_index.rb +++ b/lib/identity_cache/cached/primary_index.rb @@ -9,11 +9,11 @@ def initialize(model) @model = model end - def fetch(id) + def fetch(id, cache_fetcher_options) id = cast_id(id) return unless id record = if model.should_use_cache? - object = CacheKeyLoader.load(self, id) + object = CacheKeyLoader.load(self, id, cache_fetcher_options) if object && object.id != id IdentityCache.logger.error( <<~MSG.squish diff --git a/lib/identity_cache/fallback_fetcher.rb b/lib/identity_cache/fallback_fetcher.rb index 4813678c..638b98c8 100644 --- a/lib/identity_cache/fallback_fetcher.rb +++ b/lib/identity_cache/fallback_fetcher.rb @@ -32,7 +32,10 @@ def fetch_multi(keys) results end - def fetch(key) + def fetch(key, **cache_fetcher_options) + unless cache_fetcher_options.empty? + raise ArgumentError, "unsupported cache_fetcher options: #{cache_fetcher_options.keys.join(', ')}" + end result = @cache_backend.read(key) if result.nil? result = yield diff --git a/lib/identity_cache/memoized_cache_proxy.rb b/lib/identity_cache/memoized_cache_proxy.rb index 743e67a9..b452cc40 100644 --- a/lib/identity_cache/memoized_cache_proxy.rb +++ b/lib/identity_cache/memoized_cache_proxy.rb @@ -69,7 +69,7 @@ def delete(key) end end - def fetch(key) + def fetch(key, cache_fetcher_options = {}) memo_misses = 0 cache_misses = 0 @@ -78,7 +78,7 @@ def fetch(key) value = fetch_memoized(key) do memo_misses = 1 - @cache_fetcher.fetch(key) do + @cache_fetcher.fetch(key, **cache_fetcher_options) do cache_misses = 1 instrument_duration(payload, :resolve_miss_time) do yield diff --git a/lib/identity_cache/with_primary_index.rb b/lib/identity_cache/with_primary_index.rb index 76400e15..da65f459 100644 --- a/lib/identity_cache/with_primary_index.rb +++ b/lib/identity_cache/with_primary_index.rb @@ -97,21 +97,47 @@ def exists_with_identity_cache?(id) !!fetch_by_id(id) end - # Default fetcher added to the model on inclusion, it behaves like - # ActiveRecord::Base.where(id: id).first - def fetch_by_id(id, includes: nil) + # Fetch the record by its primary key from the cache or read from + # the database and fill the cache on a cache miss. This behaves like + # `where(id: id).readonly.first` being called on the model. + # + # @param id Primary key value for the record to fetch. + # @param includes [Hash|Array|Symbol] Cached associations to prefetch from + # the cache on the returned record + # @param fill_lock_duration [Float] If provided, take a fill lock around cache fills + # and wait for this duration for cache to be filled when reading a lock provided + # by another client. Defaults to not setting the fill lock and trying to fill the + # cache from the database regardless of the presence of another client's fill lock. + # Set this to just above the typical amount of time it takes to do a cache fill. + # @param lock_wait_tries [Integer] Only applicable if fill_lock_duration is provided, + # in which case it specifies the number of times to do a lock wait. After the first + # lock wait it will try to take the lock, so will only do following lock waits due + # to another client taking the lock first. If another lock wait would be needed after + # reaching the limit, then a `LockWaitTimeout` exception is raised. Default is 2. Use + # this to control the maximum total lock wait duration + # (`lock_wait_tries * fill_lock_duration`). + # @raise [LockWaitTimeout] Timeout after waiting `lock_wait_tries * fill_lock_duration` + # seconds for `lock_wait_tries` other clients to fill the cache. + # @return [self|nil] An instance of this model for the record with the specified id or + # `nil` if no record with this `id` exists in the database + def fetch_by_id(id, includes: nil, **cache_fetcher_options) ensure_base_model raise_if_scoped - record = cached_primary_index.fetch(id) + record = cached_primary_index.fetch(id, cache_fetcher_options) prefetch_associations(includes, [record]) if record && includes record end - # Default fetcher added to the model on inclusion, it behaves like - # ActiveRecord::Base.find, but will raise IdentityCache::RecordNotFound - # if the id is not in the cache. - def fetch(id, includes: nil) - fetch_by_id(id, includes: includes) || raise( + # Fetch the record by its primary key from the cache or read from + # the database and fill the cache on a cache miss. This behaves like + # `readonly.find(id)` being called on the model. + # + # @param (see #fetch_by_id) + # @raise (see #fetch_by_id) + # @raise [ActiveRecord::RecordNotFound] if the record isn't found + # @return [self] An instance of this model for the record with the specified id + def fetch(id, **options) + fetch_by_id(id, **options) || raise( IdentityCache::RecordNotFound, "Couldn't find #{name} with ID=#{id}" ) end diff --git a/test/attribute_cache_test.rb b/test/attribute_cache_test.rb index 533bbb22..d79000d2 100644 --- a/test/attribute_cache_test.rb +++ b/test/attribute_cache_test.rb @@ -20,7 +20,7 @@ def test_attribute_values_are_fetched_and_returned_on_cache_misses assert_queries(1) do assert_equal('foo', AssociatedRecord.fetch_name_by_id(1)) end - assert(fetch.has_been_called_with?(@name_attribute_key)) + assert(fetch.has_been_called_with?(@name_attribute_key, {})) end def test_attribute_values_are_returned_on_cache_hits diff --git a/test/cache_fetcher_test.rb b/test/cache_fetcher_test.rb new file mode 100644 index 00000000..c9cb9724 --- /dev/null +++ b/test/cache_fetcher_test.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +require "test_helper" +require "socket" + +class CacheFetcherTest < IdentityCache::TestCase + attr_reader :key, :cache_fetcher + + def setup + super + @cache_fetcher = IdentityCache::CacheFetcher.new(backend) + @key = "key" + end + + def test_fetch_without_lock_miss + assert_memcache_operations(2) do # get, add + assert_equal(:fill_data, cache_fetcher.fetch(key) { :fill_data }) + end + assert_equal(:fill_data, backend.read(key)) + end + + def test_fetch_miss + assert_memcache_operations(3) do # get (miss), add (lock), get+cas (fill) + assert_equal(:fill_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { :fill_data }) + end + assert_equal(:fill_data, backend.read(key)) + end + + def test_fetch_hit + cache_fetcher.fetch(key) { :hit_data } + assert_memcache_operations(1) do # get + assert_equal(:hit_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { flunk("unexpected yield") }) + end + end + + def test_fetch_lock_wait + other_client_takes_lock + + # fill during lock wait + other_client_operations = 1 + cache_fetcher.expects(:sleep).with do |duration| + assert_memcache_operations(other_client_operations) do # get+cas + other_cache_fetcher.fetch(key) { :fill_data } + end + duration == 0.9 + end + + assert_memcache_operations(2 + other_client_operations) do # get (miss), get (hit) + assert_equal(:fill_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { flunk("unexpected yield") }) + end + end + + def test_fetch_lock_wait_timeout + other_client_takes_lock + + cache_fetcher.expects(:sleep).with(0.9) + assert_memcache_operations(3) do # get (miss), get+cas (miss, take lock), get+cas (fill) + assert_equal(:fill_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { :fill_data }) + end + assert_equal(:fill_data, backend.read(key)) + end + + def test_fetch_lock_wait_with_cache_invalidation + other_client_takes_lock + + # invalidate during lock wait + other_client_operations = 1 + cache_fetcher.expects(:sleep).with do |duration| + assert_memcache_operations(other_client_operations) do + other_cache_fetcher.delete(key) + end + duration == 0.9 + end + + assert_memcache_operations(3 + other_client_operations) do # get (miss), get (invalidated), get+cas (fallback key) + assert_equal(:fill_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { :fill_data }) + end + assert_equal(IdentityCache::DELETED, backend.read(key)) + end + + def test_fetch_lock_wait_with_cache_invalidation_and_relock + second_client = other_cache_fetcher + third_client = IdentityCache::CacheFetcher.new(backend) + + second_client_fiber = Fiber.new do + second_client.fetch(key, fill_lock_duration: 0.9) do + Fiber.yield + :second_client_data + end + end + second_client_fiber.resume + + # invalidate during lock wait + other_client_operations = 4 + cache_fetcher.expects(:sleep).with do |duration| + assert_memcache_operations(other_client_operations) do + third_client.delete(key) + other_client_takes_lock(third_client) # get+cas + second_client_fiber.resume # get (new lock), get+cas (fallback key) + end + duration == 0.9 + end + + assert_memcache_operations(3 + other_client_operations) do # get (other lock), get (new lock), get (fallback key) + assert_equal(:second_client_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { flunk("unexpected yield") }) + end + key_value = backend.read(key) + assert_equal([:fill_locked, third_client.send(:client_id)], key_value.first(2)) + end + + def test_fetch_lock_wait_tries_reached + data_version = SecureRandom.uuid + lock = write_lock(data_version: data_version) + other_client_operations = 3 + cache_fetcher.expects(:sleep).times(3).with do |duration| + lock = write_lock(data_version: data_version) + duration == 0.9 + end + assert_memcache_operations(4 + other_client_operations) do # get (miss) * 4 + assert_raises(IdentityCache::LockWaitTimeout) do + cache_fetcher.fetch(key, fill_lock_duration: 0.9, lock_wait_tries: 3) + end + end + assert_equal(lock, backend.read(key)) + end + + def test_fetch_lock_attempt_interrupted_with_cache_invalidation + cache_fetcher.expects(:sleep).never + backend.expects(:cas).returns(false).with do |got_key| + other_cache_fetcher.delete(key) + key == got_key + end + other_client_operations = 1 + assert_memcache_operations(2 + other_client_operations) do # add (lock), get (lock), excludes mocked cas + assert_equal(:fill_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { :fill_data }) + end + assert_equal(IdentityCache::DELETED, backend.read(key)) + end + + def test_fetch_with_memcached_down + cache_fetcher = IdentityCache::CacheFetcher.new(unconnected_cache_backend) + cache_fetcher.expects(:sleep).never + # 3 operations because the underlying cache store doesn't distinguish between + # request failures and error responses (e.g. NOT_FOUND or NOT_STORED) + assert_memcache_operations(3) do # get (miss), add (lock), get (read lock) + assert_equal(:miss_data, cache_fetcher.fetch(key, fill_lock_duration: 0.9) { :miss_data }) + end + end + + def test_fetch_with_database_down + IdentityCache::CacheFetcher.any_instance.expects(:sleep).never + exc = assert_raises(RuntimeError) do + other_cache_fetcher.fetch(key, fill_lock_duration: 0.9) { raise 'database down' } + end + assert_equal('database down', exc.message) + exc = assert_raises(RuntimeError) do + cache_fetcher.fetch(key, fill_lock_duration: 0.9) { raise 'database still down' } + end + assert_equal('database still down', exc.message) + end + + private + + def write_lock(client_id: SecureRandom.uuid, data_version:) + lock = IdentityCache::CacheFetcher::FillLock.new(client_id: client_id, data_version: data_version) + other_cache_fetcher.write(key, lock) + lock + end + + def other_client_takes_lock(cache_fetcher = other_cache_fetcher) + cache_fetcher.fetch(key, fill_lock_duration: 0.9) do + break # skip filling + end + end + + def other_cache_fetcher + @other_cache_fetcher ||= IdentityCache::CacheFetcher.new(backend) + end + + def unconnected_cache_backend + CacheConnection.build_backend(address: "127.0.0.1:#{open_port}").tap do |backend| + backend.extend(IdentityCache::MemCacheStoreCAS) if backend.is_a?(ActiveSupport::Cache::MemCacheStore) + end + end + + def open_port + socket = Socket.new(:INET, :STREAM) + socket.bind(Addrinfo.tcp('127.0.0.1', 0)) + socket.local_address.ip_port + ensure + socket&.close + end +end diff --git a/test/denormalized_has_one_test.rb b/test/denormalized_has_one_test.rb index b8c231a7..2e747f45 100644 --- a/test/denormalized_has_one_test.rb +++ b/test/denormalized_has_one_test.rb @@ -23,16 +23,8 @@ def test_on_cache_miss_record_should_embed_associated_object assert_equal(@record, record_from_cache_miss) assert_not_nil(@record.fetch_associated) assert_equal(@record.associated, record_from_cache_miss.fetch_associated) - assert( - fetch.has_been_called_with?( - @cached_attribute.cache_key('foo') - ) - ) - assert( - fetch.has_been_called_with?( - @record.primary_cache_index_key - ) - ) + assert(fetch.has_been_called_with?(@cached_attribute.cache_key('foo'), {})) + assert(fetch.has_been_called_with?(@record.primary_cache_index_key, {})) end def test_on_cache_miss_record_should_embed_nil_object @@ -50,16 +42,8 @@ def test_on_cache_miss_record_should_embed_nil_object 5.times do assert_nil(record_from_cache_miss.fetch_associated) end - assert( - fetch.has_been_called_with?( - @cached_attribute.cache_key('foo') - ) - ) - assert( - fetch.has_been_called_with?( - @record.primary_cache_index_key - ) - ) + assert(fetch.has_been_called_with?(@cached_attribute.cache_key('foo'), {})) + assert(fetch.has_been_called_with?(@record.primary_cache_index_key, {})) end def test_on_record_from_the_db_will_use_normal_association diff --git a/test/fetch_test.rb b/test/fetch_test.rb index f4f497e0..fc4fa1fc 100644 --- a/test/fetch_test.rb +++ b/test/fetch_test.rb @@ -23,13 +23,13 @@ def test_fetch_with_garbage_input end def test_fetch_cache_hit - IdentityCache.cache.expects(:fetch).with(@blob_key).returns(@cached_value) + IdentityCache.cache.expects(:fetch).with(@blob_key, {}).returns(@cached_value) assert_equal(@record, Item.fetch(1)) end def test_fetch_cache_hit_publishes_hydration_notification - IdentityCache.cache.expects(:fetch).with(@blob_key).returns(@cached_value) + IdentityCache.cache.expects(:fetch).with(@blob_key, {}).returns(@cached_value) events = 0 subscriber = ActiveSupport::Notifications.subscribe('hydration.identity_cache') do |_, _, _, _, payload| @@ -43,7 +43,8 @@ def test_fetch_cache_hit_publishes_hydration_notification end def test_fetch_cache_hit_publishes_cache_notification - IdentityCache.cache.cache_fetcher.expects(:fetch).with(@blob_key).returns(@cached_value) + expected_kwargs = {} + IdentityCache.cache.cache_fetcher.expects(:fetch).with(@blob_key, **expected_kwargs).returns(@cached_value) expected = { memoizing: false, resolve_miss_time: 0, memo_hits: 0, cache_hits: 1, cache_misses: 0 } events = 0 @@ -59,7 +60,8 @@ def test_fetch_cache_hit_publishes_cache_notification def test_fetch_memoized_hit_publishes_cache_notification subscriber = nil - IdentityCache.cache.cache_fetcher.expects(:fetch).with(@blob_key).returns(@cached_value) + expected_kwargs = {} + IdentityCache.cache.cache_fetcher.expects(:fetch).with(@blob_key, **expected_kwargs).returns(@cached_value) expected = { memoizing: true, resolve_miss_time: 0, memo_hits: 1, cache_hits: 0, cache_misses: 0 } IdentityCache.cache.with_memoization do @@ -81,7 +83,7 @@ def test_fetch_hit_cache_namespace IdentityCache.cache_namespace = proc { |model| "#{model.table_name}:#{old_ns}" } new_blob_key = "items:#{@blob_key}" - IdentityCache.cache.expects(:fetch).with(new_blob_key).returns(@cached_value) + IdentityCache.cache.expects(:fetch).with(new_blob_key, {}).returns(@cached_value) assert_equal(@record, Item.fetch(1)) ensure @@ -89,7 +91,7 @@ def test_fetch_hit_cache_namespace end def test_exists_with_identity_cache_when_cache_hit - IdentityCache.cache.expects(:fetch).with(@blob_key).returns(@cached_value) + IdentityCache.cache.expects(:fetch).with(@blob_key, {}).returns(@cached_value) assert(Item.exists_with_identity_cache?(1)) end @@ -99,7 +101,7 @@ def test_exists_with_identity_cache_when_cache_miss_and_in_db Item.cached_primary_index.expects(:load_one_from_db).with(1).once.returns(@record) assert(Item.exists_with_identity_cache?(1)) - assert(fetch.has_been_called_with?(@blob_key)) + assert(fetch.has_been_called_with?(@blob_key, {})) end def test_exists_with_identity_cache_when_cache_miss_and_not_in_db @@ -107,7 +109,7 @@ def test_exists_with_identity_cache_when_cache_miss_and_not_in_db Item.cached_primary_index.expects(:load_one_from_db).with(1).once.returns(nil) assert(!Item.exists_with_identity_cache?(1)) - assert(fetch.has_been_called_with?(@blob_key)) + assert(fetch.has_been_called_with?(@blob_key, {})) end def test_fetch_miss_published_dehydration_notification @@ -147,7 +149,7 @@ def test_fetch_miss fetch = Spy.on(IdentityCache.cache, :fetch).and_return { |_, &block| block.call.tap { |result| results << result } } assert_equal(@record, Item.fetch(1)) - assert(fetch.has_been_called_with?(@blob_key)) + assert(fetch.has_been_called_with?(@blob_key, {})) assert_equal([@cached_value], results) end @@ -162,11 +164,11 @@ def test_fetch_conflict @record.expire_cache @record end - add = Spy.on(fetcher, :add).and_call_through + write = Spy.on(backend, :write).and_call_through assert_equal(@record, Item.fetch(1)) assert(load_one_from_db.has_been_called_with?(1)) - assert(add.has_been_called_with?(@blob_key, @cached_value)) + assert(write.has_been_called_with?(@blob_key, @cached_value, unless_exist: true)) assert_equal(IdentityCache::DELETED, backend.read(@record.primary_cache_index_key)) end @@ -178,24 +180,24 @@ def test_fetch_conflict_after_delete @record.expire_cache @record end - add = Spy.on(IdentityCache.cache.cache_fetcher, :add).and_call_through + write = Spy.on(backend, :write).and_call_through assert_equal(@record, Item.fetch(1)) assert(load_one_from_db.has_been_called_with?(1)) - refute(add.has_been_called?) + refute(write.calls.any? { |call| call.args.last.key?(unless_exist: true) }) assert_equal(IdentityCache::DELETED, backend.read(@record.primary_cache_index_key)) end def test_fetch_by_id_not_found_should_return_nil nonexistent_record_id = 10 - fetcher.expects(:add).with(@blob_key + '0', IdentityCache::CACHED_NIL) + fetcher.expects(:add).with(@blob_key + '0', IdentityCache::CACHED_NIL, {}) assert_nil(Item.fetch_by_id(nonexistent_record_id)) end def test_fetch_not_found_should_raise nonexistent_record_id = 10 - fetcher.expects(:add).with(@blob_key + '0', IdentityCache::CACHED_NIL) + fetcher.expects(:add).with(@blob_key + '0', IdentityCache::CACHED_NIL, {}) assert_raises(IdentityCache::RecordNotFound) { Item.fetch(nonexistent_record_id) } end @@ -219,7 +221,7 @@ def test_fetch_id_coercion def test_fetch_by_title_hit values = [] - fetch = Spy.on(IdentityCache.cache, :fetch).and_return do |key, &block| + fetch = Spy.on(IdentityCache.cache, :fetch, {}).and_return do |key, &block| case key # Read record with title bob when @index_key then block.call.tap { |val| values << val } @@ -233,35 +235,28 @@ def test_fetch_by_title_hit assert_equal(@record, Item.fetch_by_title('bob')) assert_equal([1], values) - assert(fetch.has_been_called_with?(@index_key)) - assert(fetch.has_been_called_with?(@blob_key)) + assert(fetch.has_been_called_with?(@index_key, {})) + assert(fetch.has_been_called_with?(@blob_key, {})) end def test_fetch_by_title_cache_namespace Item.send(:include, SwitchNamespace) - IdentityCache.cache.expects(:fetch).with("ns:#{@index_key}").returns(1) - IdentityCache.cache.expects(:fetch).with("ns:#{@blob_key}").returns(@cached_value) + IdentityCache.cache.expects(:fetch).with("ns:#{@index_key}", {}).returns(1) + IdentityCache.cache.expects(:fetch).with("ns:#{@blob_key}", {}).returns(@cached_value) assert_equal(@record, Item.fetch_by_title('bob')) end - def test_fetch_by_title_stores_idcnil - Item.connection.expects(:exec_query).once.returns(ActiveRecord::Result.new([], [])) - add = Spy.on(fetcher, :add).and_call_through - fetch = Spy.on(fetcher, :fetch).and_call_through - assert_nil(Item.fetch_by_title('bob')) # exec_query => nil - - assert_nil(Item.fetch_by_title('bob')) # returns cached nil - assert_nil(Item.fetch_by_title('bob')) # returns cached nil - - assert(add.has_been_called_with?(@index_key, IdentityCache::CACHED_NIL)) - assert_equal(3, fetch.calls.length) + def test_fetch_by_title_caches_nil + assert_nil(Item.fetch_by_title('missing')) + assert_no_queries do + assert_nil(Item.fetch_by_title('missing')) + end end def test_fetch_by_bang_method - Item.connection.expects(:exec_query).returns(ActiveRecord::Result.new([], [])) assert_raises(IdentityCache::RecordNotFound) do - Item.fetch_by_title!('bob') + Item.fetch_by_title!('missing') end end @@ -306,7 +301,7 @@ def test_fetch_on_derived_model_raises def test_returned_records_are_readonly_on_cache_hit IdentityCache.with_fetch_read_only_records do - IdentityCache.cache.expects(:fetch).with(@blob_key).returns(@cached_value) + IdentityCache.cache.expects(:fetch).with(@blob_key, {}).returns(@cached_value) assert(Item.fetch(1).readonly?) end end @@ -317,7 +312,7 @@ def test_returned_records_are_readonly_on_cache_miss Item.cached_primary_index.expects(:load_one_from_db).with(1).once.returns(@record) assert(Item.exists_with_identity_cache?(1)) - assert(fetch.has_been_called_with?(@blob_key)) + assert(fetch.has_been_called_with?(@blob_key, {})) assert(Item.fetch(1).readonly?) end end @@ -329,7 +324,7 @@ def test_returned_records_are_not_readonly_with_open_transactions Item.cached_primary_index.expects(:load_one_from_db).with(1).once.returns(@record) refute(IdentityCache.should_use_cache?) - refute(fetch.has_been_called_with?(@blob_key)) + refute(fetch.has_been_called_with?(@blob_key, {})) refute(Item.fetch(1).readonly?, "Fetched item was read-only") end end @@ -345,4 +340,26 @@ def test_respects_should_use_cache_on_record end end end + + def test_fetch_supports_lock_wait_options + @record.save + + # only one client reads from the database on concurrent fetch with lock wait + assert_queries(1) do + # other client takes fill lock + cache_key = Item.cached_primary_index.cache_key(@record.id) + lock = IdentityCache::CacheFetcher::FillLock.new(client_id: SecureRandom.uuid, data_version: SecureRandom.uuid) + IdentityCache.cache.cache_fetcher.write(cache_key, lock.cache_value) + + # fetch waits on lock + IdentityCache.cache.cache_fetcher.expects(:sleep).with do |duration| + # other client fills cache + assert_queries(1) { Item.fetch(@record.id) } + duration == 0.1 + end + Item.fetch(@record.id, fill_lock_duration: 0.1, lock_wait_tries: 1) + + assert_equal(@record, Item.fetch(@record.id)) + end + end end diff --git a/test/helpers/cache_connection.rb b/test/helpers/cache_connection.rb index 0f3c5032..ff957b63 100644 --- a/test/helpers/cache_connection.rb +++ b/test/helpers/cache_connection.rb @@ -16,15 +16,19 @@ def host end def backend - @backend ||= case ENV['ADAPTER'] + @backend ||= build_backend + end + + def build_backend(address: "#{host}:11211") + case ENV['ADAPTER'] when nil, 'dalli' require 'active_support/cache/mem_cache_store' - ActiveSupport::Cache::MemCacheStore.new("#{host}:11211", failover: false, expires_in: 6.hours.to_i) + ActiveSupport::Cache::MemCacheStore.new(address, failover: false, expires_in: 6.hours.to_i) when 'memcached' require 'memcached_store' require 'active_support/cache/memcached_store' ActiveSupport::Cache::MemcachedStore.prepend(MemcachedStoreInstrumentation) - ActiveSupport::Cache::MemcachedStore.new("#{host}:11211", support_cas: true, auto_eject_hosts: false) + ActiveSupport::Cache::MemcachedStore.new(address, support_cas: true, auto_eject_hosts: false) else raise "Unknown adapter: #{ENV['ADAPTER']}" end diff --git a/test/memoized_cache_proxy_test.rb b/test/memoized_cache_proxy_test.rb index 7ce27461..56b24e16 100644 --- a/test/memoized_cache_proxy_test.rb +++ b/test/memoized_cache_proxy_test.rb @@ -41,7 +41,8 @@ def test_fetch_should_short_circuit_on_falsy_memoized_values end def test_fetch_should_try_memcached_on_not_memoized_values - fetcher.expects(:fetch).with('foo').returns('bar') + expected_kwargs = {} + fetcher.expects(:fetch).with('foo', **expected_kwargs).returns('bar') IdentityCache.cache.with_memoization do assert_equal('bar', IdentityCache.cache.fetch('foo')) diff --git a/test/readonly_test.rb b/test/readonly_test.rb index 70c063ff..1f4893ea 100644 --- a/test/readonly_test.rb +++ b/test/readonly_test.rb @@ -48,7 +48,7 @@ def test_fetch_should_not_update_cache assert_equal(@record, Item.fetch(1)) end assert_nil(backend.read(@record.primary_cache_index_key)) - assert(fetch.has_been_called_with?(@record.primary_cache_index_key)) + assert(fetch.has_been_called_with?(@record.primary_cache_index_key, {})) end def test_fetch_multi_should_not_update_cache