Skip to content

Commit

Permalink
Add support for fill lock with lock wait to avoid thundering herd pro…
Browse files Browse the repository at this point in the history
…blem
  • Loading branch information
dylanahsmith committed Nov 26, 2019
1 parent 10c771a commit 8c1f918
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 75 deletions.
8 changes: 6 additions & 2 deletions lib/identity_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class InverseAssociationError < StandardError; end
class UnsupportedScopeError < StandardError; end
class UnsupportedAssociationError < StandardError; end
class DerivedModelError < StandardError; end
class LockWaitTimeout < StandardError; end

class << self
include IdentityCache::CacheHash
Expand Down Expand Up @@ -116,10 +117,13 @@ def should_use_cache? # :nodoc:
#
# == Parameters
# +key+ A cache key string
# +cache_fetcher_options+ A hash of options to pass to the cache backend
#
def fetch(key)
def fetch(key, cache_fetcher_options = {})
if should_use_cache?
unmap_cached_nil_for(cache.fetch(key) { map_cached_nil_for yield })
unmap_cached_nil_for(cache.fetch(key, cache_fetcher_options) do
map_cached_nil_for yield
end)
else
yield
end
Expand Down
234 changes: 218 additions & 16 deletions lib/identity_cache/cache_fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,52 @@
# frozen_string_literal: true

require 'securerandom'

module IdentityCache
class CacheFetcher
attr_accessor :cache_backend

EMPTY_HASH = {}.freeze

class FillLock
FILL_LOCKED = :fill_locked
FAILED_CLIENT_ID = 'fill_failed'

class << self
def from_cache(marker, client_id, data_version)
raise ArgumentError unless marker == FILL_LOCKED
new(client_id: client_id, data_version: data_version)
end

def cache_value?(cache_value)
cache_value.is_a?(Array) && cache_value.length == 3 && cache_value.first == FILL_LOCKED
end
end

attr_reader :client_id, :data_version

def initialize(client_id:, data_version:)
@client_id = client_id
@data_version = data_version
end

def cache_value
[FILL_LOCKED, client_id, data_version]
end

def mark_failed
@client_id = FAILED_CLIENT_ID
end

def fill_failed?
@client_id == FAILED_CLIENT_ID
end

def ==(other)
self.class == other.class && client_id == other.client_id && data_version == other.data_version
end
end

def initialize(cache_backend)
@cache_backend = cache_backend
end
Expand All @@ -25,27 +69,184 @@ def fetch_multi(keys, &block)
results
end

def fetch(key)
result = nil
yielded = false
@cache_backend.cas(key) do |value|
yielded = true
unless IdentityCache::DELETED == value
result = value
break
def fetch(key, fill_lock_duration: nil, lock_wait_limit: 2)
if fill_lock_duration && IdentityCache.should_fill_cache?
fetch_with_fill_lock(key, fill_lock_duration, lock_wait_limit) do
yield
end
else
fetch_without_fill_lock(key) { yield }
end
end

private

def fetch_without_fill_lock(key)
data = nil
upsert(key) do |value|
value = nil if value == IdentityCache::DELETED || FillLock.cache_value?(value)
unless value.nil?
return value
end
result = yield
data = yield
break unless IdentityCache.should_fill_cache?
result
data
end
data
end

def fetch_with_fill_lock(key, fill_lock_duration, lock_wait_limit)
raise ArgumentError, 'fill_lock_duration must be greater than 0.0' unless fill_lock_duration > 0.0
raise ArgumentError, 'lock_wait_limit must be greater than 0' unless lock_wait_limit > 0
lock = nil
using_fallback_key = false
expiration_options = EMPTY_HASH
(lock_wait_limit + 2).times do # +2 is for first attempt and retry with fallback key
result = fetch_or_take_lock(key, old_lock: lock, **expiration_options)
case result
when FillLock
lock = result
if lock.client_id == client_id # have lock
data = begin
yield
rescue
mark_fill_failure_on_lock(key, expiration_options)
raise
end

if !fill_with_lock(key, data, lock, expiration_options) && !using_fallback_key
# fallback to storing data in the fallback key so it is available to clients waiting on the lock
expiration_options = fallback_key_expiration_options(fill_lock_duration)
@cache_backend.write(lock_fill_fallback_key(key, lock), data, **expiration_options)
end
return data
else
raise LockWaitTimeout if lock_wait_limit <= 0
lock_wait_limit -= 1

# If fill failed in the other client, then it might be failing fast
# so avoid waiting the typical amount of time for a lock wait. The
# semian gem can be used to handle failing fast when the database is slow.
if lock.fill_failed?
return fetch_without_fill_lock(key) { yield }
end

# lock wait
sleep(fill_lock_duration)
# loop around to retry fetch_or_take_lock
end
when IdentityCache::DELETED # interrupted by cache invalidation
if lock && !using_fallback_key
# cache invalidated during lock wait, try fallback key
using_fallback_key = true
key = lock_fill_fallback_key(key, lock)
expiration_options = fallback_key_expiration_options(fill_lock_duration)
# loop around to retry with fallback key
else
# cache invalidation prevented lock from being taken
return yield
end
when nil # Errors talking to memcached
return yield
else # hit
return result
end
end
raise "unexpected number of loop iterations"
end

def mark_fill_failure_on_lock(key, expiration_options)
@cache_backend.cas(key, **expiration_options) do |value|
break unless FillLock.cache_value?(value)
lock = FillLock.from_cache(*value)
break if lock.client_id != client_id
lock.mark_failed
lock.cache_value
end
end

def upsert(key, expiration_options = EMPTY_HASH)
yielded = false
upserted = @cache_backend.cas(key, expiration_options) do |value|
yielded = true
yield value
end
unless yielded
result = yield
add(key, result)
data = yield nil
upserted = add(key, data, expiration_options)
end
result
upserted
end

private
def fetch_or_take_lock(key, old_lock:, **expiration_options)
new_lock = nil
upserted = upsert(key, expiration_options) do |value|
if value.nil? || value == IdentityCache::DELETED
if old_lock # cache invalidated
return value
else
new_lock = FillLock.new(client_id: client_id, data_version: SecureRandom.uuid)
end
elsif FillLock.cache_value?(value)
fetched_lock = FillLock.from_cache(*value)
if old_lock == fetched_lock
# preserve data version since there hasn't been any cache invalidations
new_lock = FillLock.new(client_id: client_id, data_version: old_lock.data_version)
else
return fetched_lock
end
else # hit
return value
end
new_lock.cache_value # take lock
end

return new_lock if upserted

value = @cache_backend.read(key)
if FillLock.cache_value?(value)
FillLock.from_cache(*value)
else
value
end
end

def fill_with_lock(key, data, my_lock, expiration_options)
upserted = upsert(key, expiration_options) do |value|
return false if value.nil? || value == IdentityCache::DELETED
return true unless FillLock.cache_value?(value) # already filled
current_lock = FillLock.from_cache(*value)
if current_lock.data_version != my_lock.data_version
return false # invalidated then relocked
end
data
end

upserted
end

def lock_fill_fallback_key(key, lock)
"lock_fill:#{lock.data_version}:#{key}"
end

def fallback_key_expiration_options(fill_lock_duration)
# Override the default TTL for the fallback key lock since it won't be used for very long.
expires_in = fill_lock_duration * 2

# memcached uses integer number of seconds for TTL so round up to avoid having
# the cache store round down with `to_i`
expires_in = expires_in.ceil

# memcached TTL only gets part of the first second (https://github.com/memcached/memcached/issues/307),
# so increase TTL by 1 to compensate
expires_in += 1

{ expires_in: expires_in }
end

def client_id
@client_id ||= SecureRandom.uuid
end

def cas_multi(keys)
result = nil
Expand Down Expand Up @@ -81,8 +282,9 @@ def add_multi(keys)
result.each { |k, v| add(k, v) }
end

def add(key, value)
@cache_backend.write(key, value, unless_exist: true) if IdentityCache.should_fill_cache?
def add(key, value, **expiration_options)
return false unless IdentityCache.should_fill_cache?
@cache_backend.write(key, value, unless_exist: true, **expiration_options)
end
end
end
5 changes: 4 additions & 1 deletion lib/identity_cache/fallback_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def fetch_multi(keys)
results
end

def fetch(key)
def fetch(key, unsupported_options = {})
unless unsupported_options.empty?
raise ArgumentError, "unsupported keywords: #{unsupported_options.keys.join(', ')}"
end
result = @cache_backend.read(key)
if result.nil?
result = yield
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/memoized_cache_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def delete(key)
end
end

def fetch(key)
def fetch(key, cache_fetcher_options = {})
memo_misses = 0
cache_misses = 0

Expand All @@ -68,7 +68,7 @@ def fetch(key)

value = fetch_memoized(key) do
memo_misses = 1
@cache_fetcher.fetch(key) do
@cache_fetcher.fetch(key, cache_fetcher_options) do
cache_misses = 1
instrument_duration(payload, :resolve_miss_time) do
yield
Expand Down
47 changes: 36 additions & 11 deletions lib/identity_cache/query_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,38 @@ def exists_with_identity_cache?(id)
!!fetch_by_id(id)
end

# Default fetcher added to the model on inclusion, it behaves like
# ActiveRecord::Base.where(id: id).first
def fetch_by_id(id, includes: nil)
# Fetch the record by its primary key from the cache or read from
# the database and fill the cache on a cache miss. This behaves like
# `where(id: id).readonly.first` being called on the model.
#
# @param id Primary key value for the record to fetch.
# @param includes [Hash|Array|Symbol] Cached associations to prefetch from
# the cache on the returned record
# @param fill_lock_duration [Float] If provided, take a fill lock around cache fills
# and wait for this duration for cache to be filled when reading a lock provided
# by another client. Defaults to not setting the fill lock and trying to fill the
# cache from the database regardless of the presence of another client's fill lock.
# Set this to just above the typical amount of time it takes to do a cache fill.
# @param lock_wait_limit [Integer] Only applicable if fill_lock_duration is provided,
# in which case it specifies the number of times to do a lock wait. After the first
# lock wait it will try to take the lock, so will only do following lock waits due
# to another client taking the lock first. If another lock wait would be needed after
# reaching the limit, then a `LockWaitTimeout` exception is raised. Default is 2. Use
# this to control the maximum total lock wait duration
# (`lock_wait_limit * fill_lock_duration`).
# @raise [LockWaitTimeout] Timeout after waiting `lock_wait_limit * fill_lock_duration`
# seconds for `lock_wait_limit` other clients to fill the cache.
# @return [self|nil] An instance of this model for the record with the specified id or
# `nil` if no record with this `id` exists in the database
def fetch_by_id(id, includes: nil, **cache_fetcher_options)
ensure_base_model
raise_if_scoped
raise NotImplementedError, "fetching needs the primary index enabled" unless primary_cache_index_enabled
id = type_for_attribute(primary_key).cast(id)
return unless id
record = if should_use_cache?
object = nil
coder = IdentityCache.fetch(rails_cache_key(id)) do
coder = IdentityCache.fetch(rails_cache_key(id), cache_fetcher_options) do
Encoder.encode(object = resolve_cache_miss(id))
end
object ||= Encoder.decode(coder, self)
Expand All @@ -48,13 +69,17 @@ def fetch_by_id(id, includes: nil)
record
end

# Default fetcher added to the model on inclusion, it behaves like
# ActiveRecord::Base.find, will raise ActiveRecord::RecordNotFound exception
# if id is not in the cache or the db.
def fetch(id, includes: nil)
fetch_by_id(id, includes: includes) or raise(
ActiveRecord::RecordNotFound, "Couldn't find #{name} with ID=#{id}"
)
# Fetch the record by its primary key from the cache or read from
# the database and fill the cache on a cache miss. This behaves like
# `readonly.find(id)` being called on the model.
#
# @param (see #fetch_by_id)
# @raise (see #fetch_by_id)
# @raise [ActiveRecord::RecordNotFound] if the record isn't found
# @return [self] An instance of this model for the record with the specified id
def fetch(id, options = {})
fetch_by_id(id, options) or
raise(ActiveRecord::RecordNotFound, "Couldn't find #{name} with ID=#{id}")
end

# Default fetcher added to the model on inclusion, if behaves like
Expand Down
2 changes: 1 addition & 1 deletion test/attribute_cache_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_attribute_values_are_fetched_and_returned_on_cache_misses
assert_queries(1) do
assert_equal 'foo', AssociatedRecord.fetch_name_by_id(1)
end
assert(fetch.has_been_called_with?(@name_attribute_key))
assert(fetch.has_been_called_with?(@name_attribute_key, {}))
end

def test_attribute_values_are_returned_on_cache_hits
Expand Down
Loading

0 comments on commit 8c1f918

Please sign in to comment.