Skip to content

Commit

Permalink
Add support for fill lock with lock wait to avoid thundering herd pro…
Browse files Browse the repository at this point in the history
…blem
  • Loading branch information
dylanahsmith committed Sep 15, 2020
1 parent 721e1bb commit 9ddfa3a
Show file tree
Hide file tree
Showing 15 changed files with 500 additions and 96 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ inherit_from:

AllCops:
TargetRubyVersion: 2.4

Layout/RescueEnsureAlignment:
Enabled: false # Reenable once https://github.com/rubocop-hq/rubocop/issues/6918 is fixed
8 changes: 6 additions & 2 deletions lib/identity_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class InverseAssociationError < StandardError; end
class UnsupportedScopeError < StandardError; end
class UnsupportedAssociationError < StandardError; end
class DerivedModelError < StandardError; end
class LockWaitTimeout < StandardError; end

mattr_accessor :cache_namespace
self.cache_namespace = "IDC:#{CACHE_VERSION}:"
Expand Down Expand Up @@ -117,10 +118,13 @@ def should_use_cache? # :nodoc:
#
# == Parameters
# +key+ A cache key string
# +cache_fetcher_options+ A hash of options to pass to the cache backend
#
def fetch(key)
def fetch(key, cache_fetcher_options = {})
if should_use_cache?
unmap_cached_nil_for(cache.fetch(key) { map_cached_nil_for yield })
unmap_cached_nil_for(cache.fetch(key, cache_fetcher_options) do
map_cached_nil_for yield
end)
else
yield
end
Expand Down
234 changes: 218 additions & 16 deletions lib/identity_cache/cache_fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,52 @@
# frozen_string_literal: true

require 'securerandom'

module IdentityCache
class CacheFetcher
attr_accessor :cache_backend

EMPTY_HASH = {}.freeze

class FillLock
FILL_LOCKED = :fill_locked
FAILED_CLIENT_ID = 'fill_failed'

class << self
def from_cache(marker, client_id, data_version)
raise ArgumentError unless marker == FILL_LOCKED
new(client_id: client_id, data_version: data_version)
end

def cache_value?(cache_value)
cache_value.is_a?(Array) && cache_value.length == 3 && cache_value.first == FILL_LOCKED
end
end

attr_reader :client_id, :data_version

def initialize(client_id:, data_version:)
@client_id = client_id
@data_version = data_version
end

def cache_value
[FILL_LOCKED, client_id, data_version]
end

def mark_failed
@client_id = FAILED_CLIENT_ID
end

def fill_failed?
@client_id == FAILED_CLIENT_ID
end

def ==(other)
self.class == other.class && client_id == other.client_id && data_version == other.data_version
end
end

def initialize(cache_backend)
@cache_backend = cache_backend
end
Expand All @@ -25,27 +69,184 @@ def fetch_multi(keys, &block)
results
end

def fetch(key)
result = nil
yielded = false
@cache_backend.cas(key) do |value|
yielded = true
unless IdentityCache::DELETED == value
result = value
break
def fetch(key, fill_lock_duration: nil, lock_wait_limit: 2)
if fill_lock_duration && IdentityCache.should_fill_cache?
fetch_with_fill_lock(key, fill_lock_duration, lock_wait_limit) do
yield
end
else
fetch_without_fill_lock(key) { yield }
end
end

private

def fetch_without_fill_lock(key)
data = nil
upsert(key) do |value|
value = nil if value == IdentityCache::DELETED || FillLock.cache_value?(value)
unless value.nil?
return value
end
result = yield
data = yield
break unless IdentityCache.should_fill_cache?
result
data
end
data
end

def fetch_with_fill_lock(key, fill_lock_duration, lock_wait_limit)
raise ArgumentError, 'fill_lock_duration must be greater than 0.0' unless fill_lock_duration > 0.0
raise ArgumentError, 'lock_wait_limit must be greater than 0' unless lock_wait_limit > 0
lock = nil
using_fallback_key = false
expiration_options = EMPTY_HASH
(lock_wait_limit + 2).times do # +2 is for first attempt and retry with fallback key
result = fetch_or_take_lock(key, old_lock: lock, **expiration_options)
case result
when FillLock
lock = result
if lock.client_id == client_id # have lock
data = begin
yield
rescue
mark_fill_failure_on_lock(key, expiration_options)
raise
end

if !fill_with_lock(key, data, lock, expiration_options) && !using_fallback_key
# fallback to storing data in the fallback key so it is available to clients waiting on the lock
expiration_options = fallback_key_expiration_options(fill_lock_duration)
@cache_backend.write(lock_fill_fallback_key(key, lock), data, **expiration_options)
end
return data
else
raise LockWaitTimeout if lock_wait_limit <= 0
lock_wait_limit -= 1

# If fill failed in the other client, then it might be failing fast
# so avoid waiting the typical amount of time for a lock wait. The
# semian gem can be used to handle failing fast when the database is slow.
if lock.fill_failed?
return fetch_without_fill_lock(key) { yield }
end

# lock wait
sleep(fill_lock_duration)
# loop around to retry fetch_or_take_lock
end
when IdentityCache::DELETED # interrupted by cache invalidation
if lock && !using_fallback_key
# cache invalidated during lock wait, try fallback key
using_fallback_key = true
key = lock_fill_fallback_key(key, lock)
expiration_options = fallback_key_expiration_options(fill_lock_duration)
# loop around to retry with fallback key
else
# cache invalidation prevented lock from being taken
return yield
end
when nil # Errors talking to memcached
return yield
else # hit
return result
end
end
raise "unexpected number of loop iterations"
end

def mark_fill_failure_on_lock(key, expiration_options)
@cache_backend.cas(key, expiration_options) do |value|
break unless FillLock.cache_value?(value)
lock = FillLock.from_cache(*value)
break if lock.client_id != client_id
lock.mark_failed
lock.cache_value
end
end

def upsert(key, expiration_options = EMPTY_HASH)
yielded = false
upserted = @cache_backend.cas(key, expiration_options) do |value|
yielded = true
yield value
end
unless yielded
result = yield
add(key, result)
data = yield nil
upserted = add(key, data, expiration_options)
end
result
upserted
end

private
def fetch_or_take_lock(key, old_lock:, **expiration_options)
new_lock = nil
upserted = upsert(key, expiration_options) do |value|
if value.nil? || value == IdentityCache::DELETED
if old_lock # cache invalidated
return value
else
new_lock = FillLock.new(client_id: client_id, data_version: SecureRandom.uuid)
end
elsif FillLock.cache_value?(value)
fetched_lock = FillLock.from_cache(*value)
if old_lock == fetched_lock
# preserve data version since there hasn't been any cache invalidations
new_lock = FillLock.new(client_id: client_id, data_version: old_lock.data_version)
else
return fetched_lock
end
else # hit
return value
end
new_lock.cache_value # take lock
end

return new_lock if upserted

value = @cache_backend.read(key)
if FillLock.cache_value?(value)
FillLock.from_cache(*value)
else
value
end
end

def fill_with_lock(key, data, my_lock, expiration_options)
upserted = upsert(key, expiration_options) do |value|
return false if value.nil? || value == IdentityCache::DELETED
return true unless FillLock.cache_value?(value) # already filled
current_lock = FillLock.from_cache(*value)
if current_lock.data_version != my_lock.data_version
return false # invalidated then relocked
end
data
end

upserted
end

def lock_fill_fallback_key(key, lock)
"lock_fill:#{lock.data_version}:#{key}"
end

def fallback_key_expiration_options(fill_lock_duration)
# Override the default TTL for the fallback key lock since it won't be used for very long.
expires_in = fill_lock_duration * 2

# memcached uses integer number of seconds for TTL so round up to avoid having
# the cache store round down with `to_i`
expires_in = expires_in.ceil

# memcached TTL only gets part of the first second (https://github.com/memcached/memcached/issues/307),
# so increase TTL by 1 to compensate
expires_in += 1

{ expires_in: expires_in }
end

def client_id
@client_id ||= SecureRandom.uuid
end

def cas_multi(keys)
result = nil
Expand Down Expand Up @@ -81,8 +282,9 @@ def add_multi(keys)
result.each { |k, v| add(k, v) }
end

def add(key, value)
@cache_backend.write(key, value, unless_exist: true) if IdentityCache.should_fill_cache?
def add(key, value, **expiration_options)
return false unless IdentityCache.should_fill_cache?
@cache_backend.write(key, value, { unless_exist: true, **expiration_options })
end
end
end
4 changes: 2 additions & 2 deletions lib/identity_cache/cache_key_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ class << self
# @param cache_fetcher [_CacheFetcher]
# @param db_key Reference to what to load from the database.
# @return The database value corresponding to the database key.
def load(cache_fetcher, db_key)
def load(cache_fetcher, db_key, cache_fetcher_options = {})
cache_key = cache_fetcher.cache_key(db_key)

db_value = nil

cache_value = IdentityCache.fetch(cache_key) do
cache_value = IdentityCache.fetch(cache_key, cache_fetcher_options) do
db_value = cache_fetcher.load_one_from_db(db_key)
cache_fetcher.cache_encode(db_value)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/cached/primary_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ def initialize(model)
@model = model
end

def fetch(id)
def fetch(id, cache_fetcher_options)
id = cast_id(id)
return unless id
record = if model.should_use_cache?
object = CacheKeyLoader.load(self, id)
object = CacheKeyLoader.load(self, id, cache_fetcher_options)
if object && object.id != id
IdentityCache.logger.error(
<<~MSG.squish
Expand Down
5 changes: 4 additions & 1 deletion lib/identity_cache/fallback_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def fetch_multi(keys)
results
end

def fetch(key)
def fetch(key, **cache_fetcher_options)
unless cache_fetcher_options.empty?
raise ArgumentError, "unsupported cache_fetcher options: #{cache_fetcher_options.keys.join(', ')}"
end
result = @cache_backend.read(key)
if result.nil?
result = yield
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/memoized_cache_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def delete(key)
end
end

def fetch(key)
def fetch(key, cache_fetcher_options = {})
memo_misses = 0
cache_misses = 0

Expand All @@ -78,7 +78,7 @@ def fetch(key)

value = fetch_memoized(key) do
memo_misses = 1
@cache_fetcher.fetch(key) do
@cache_fetcher.fetch(key, cache_fetcher_options) do
cache_misses = 1
instrument_duration(payload, :resolve_miss_time) do
yield
Expand Down
Loading

0 comments on commit 9ddfa3a

Please sign in to comment.