Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fill lock with lock wait to avoid thundering herd problem #373

Merged
merged 6 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ inherit_from:

AllCops:
TargetRubyVersion: 2.4

Layout/BeginEndAlignment:
EnforcedStyleAlignWith: start_of_line
9 changes: 7 additions & 2 deletions lib/identity_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class UnsupportedAssociationError < StandardError; end

class DerivedModelError < StandardError; end

class LockWaitTimeout < StandardError; end

mattr_accessor :cache_namespace
self.cache_namespace = "IDC:#{CACHE_VERSION}:"

Expand Down Expand Up @@ -141,10 +143,13 @@ def should_use_cache? # :nodoc:
#
# == Parameters
# +key+ A cache key string
# +cache_fetcher_options+ A hash of options to pass to the cache backend
#
def fetch(key)
def fetch(key, cache_fetcher_options = {})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but I think you could splat the options hash all the way through. This way you won't have to update any assertion/expectation and everything will keep working as expected no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing keyword arguments using ** has the performance disadvantage of duplicating the hash for each method call.

The other annoying thing with the test expectations are that some of them use the spy gem, which doesn't yet support keyword arguments (although I've opened ryanong/spy#12 for that issue). This prevents me from explicitly passing options through without a splat for any method that is referenced by Spy.on in the test suite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My upstream spy PR has been merged and released, so I'll see what I can cleanup as a result of that.

if should_use_cache?
unmap_cached_nil_for(cache.fetch(key) { map_cached_nil_for(yield) })
unmap_cached_nil_for(cache.fetch(key, cache_fetcher_options) do
map_cached_nil_for(yield)
end)
else
yield
end
Expand Down
239 changes: 223 additions & 16 deletions lib/identity_cache/cache_fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,52 @@
# frozen_string_literal: true

require 'securerandom'

module IdentityCache
class CacheFetcher
attr_accessor :cache_backend

EMPTY_HASH = {}.freeze

class FillLock
FILL_LOCKED = :fill_locked
FAILED_CLIENT_ID = 'fill_failed'

class << self
def from_cache(marker, client_id, data_version)
raise ArgumentError unless marker == FILL_LOCKED
new(client_id: client_id, data_version: data_version)
end

def cache_value?(cache_value)
cache_value.is_a?(Array) && cache_value.length == 3 && cache_value.first == FILL_LOCKED
end
end

attr_reader :client_id, :data_version

def initialize(client_id:, data_version:)
@client_id = client_id
@data_version = data_version
end

def cache_value
[FILL_LOCKED, client_id, data_version]
end

def mark_failed
@client_id = FAILED_CLIENT_ID
end

def fill_failed?
@client_id == FAILED_CLIENT_ID
end

def ==(other)
self.class == other.class && client_id == other.client_id && data_version == other.data_version
end
end

def initialize(cache_backend)
@cache_backend = cache_backend
end
Expand All @@ -25,27 +69,189 @@ def fetch_multi(keys, &block)
results
end

def fetch(key)
result = nil
yielded = false
@cache_backend.cas(key) do |value|
yielded = true
unless IdentityCache::DELETED == value
result = value
break
def fetch(key, fill_lock_duration: nil, lock_wait_limit: 2)
if fill_lock_duration && IdentityCache.should_fill_cache?
fetch_with_fill_lock(key, fill_lock_duration, lock_wait_limit) do
yield
end
else
fetch_without_fill_lock(key) { yield }
end
end

private

def fetch_without_fill_lock(key)
data = nil
upsert(key) do |value|
value = nil if value == IdentityCache::DELETED || FillLock.cache_value?(value)
unless value.nil?
return value
end
result = yield
data = yield
break unless IdentityCache.should_fill_cache?
result
data
end
data
end

def fetch_with_fill_lock(key, fill_lock_duration, lock_wait_limit)
raise ArgumentError, 'fill_lock_duration must be greater than 0.0' unless fill_lock_duration > 0.0
raise ArgumentError, 'lock_wait_limit must be greater than 0' unless lock_wait_limit > 0
lock = nil
using_fallback_key = false
expiration_options = EMPTY_HASH
(lock_wait_limit + 2).times do # +2 is for first attempt and retry with fallback key
result = fetch_or_take_lock(key, old_lock: lock, **expiration_options)
case result
when FillLock
lock = result
if lock.client_id == client_id # have lock
data = begin
yield
rescue
mark_fill_failure_on_lock(key, expiration_options)
raise
end

if !fill_with_lock(key, data, lock, expiration_options) && !using_fallback_key
# fallback to storing data in the fallback key so it is available to clients waiting on the lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache invalidations could happen while a cache miss is being resolved which could prevent clients that are in a lock wait from reading the value after the lock wait.

Is the problem this is addressing that the waiting clients would have to wait even longer, and we might as well serve them stale data since they queried before it was invalidated? i.e. if the lock wait timeout is infinite and the client is okay with higher latency, this wouldn't be necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't serve stale data from the perspective of the client. It is similar to how in MVCC the reader won't try to read a write that came in after it has started the query and chosen a data version for the query. However, the data version for a fill lock / fallback key should be thought of as a lower bound in this case.

The problem this is addressing is latency caused by cache invalidations. In the theoretical case where we didn't have to worry about latency, then this wouldn't be necessary. However, in practice we don't want the fill lock to turn into a queue because it only allows one client to try to fill at a time and the cache invalidations can overwrite or prevent the cache fill preventing any other clients from benefiting from their lock wait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't serve stale data from the perspective of the client.

Right, that part was clear; I wanted to make sure I fully understood the reason for the extra complexity from the fallback key.

expiration_options = fallback_key_expiration_options(fill_lock_duration)
@cache_backend.write(lock_fill_fallback_key(key, lock), data, **expiration_options)
end
return data
else
raise LockWaitTimeout if lock_wait_limit <= 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to fail or to yield? I suppose the worker will have waited a long time by this point, but if lock_wait_limit * fill_lock_duration is much less than the request timeout, falling back to the database would make sense. In Shopify configuration I imagine that wouldn't be the case though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would certainly be a problem if the lock_wait_limit * fill_lock_duration was configured to be too low. That's probably true of all timeouts, although this one we might only notice during concurrent reads of the same key, making the misconfiguration more subtle.

If this is configured appropriately, then a long delay seems indicative of database load. If we yielded here, then that would likely make the problem much worse, since it would effectively be unleashing the thundering herd this is here to protect against.

Copy link
Contributor

@pushrax pushrax Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose a lock wait timeout should be very rare anyway, since it would imply that several workers were unable to fill the value, but also didn’t encounter an error and mark a fill failure.

Under heavy load, does it seem right that the most likely failure outcome is query execution timeouts that cause fill failures to be marked? Or is the idea to set the lock wait time shorter then the query timeout (that actually makes sense here I suppose). In SFR the query timeout is much shorter (2s), which changes things significantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock wait time can end up adding latency in addition to the query itself. For a single lock wait fill_lock_duration ends up being a minimum time to wait, so we don't want it much higher than the typical time the query takes. In the worst case, a client could end up doing lock waits before the query, which could add up to (lock_wait_limit - 1) * fill_lock_duration to the query latency, so we probably want it shorter than the query timeout to avoid effectively doubling it.

lock_wait_limit -= 1

# If fill failed in the other client, then it might be failing fast
# so avoid waiting the typical amount of time for a lock wait. The
# semian gem can be used to handle failing fast when the database is slow.
if lock.fill_failed?
return fetch_without_fill_lock(key) { yield }
end

# lock wait
sleep(fill_lock_duration)
# loop around to retry fetch_or_take_lock
end
when IdentityCache::DELETED # interrupted by cache invalidation
if lock && !using_fallback_key
# cache invalidated during lock wait, try fallback key
using_fallback_key = true
key = lock_fill_fallback_key(key, lock)
expiration_options = fallback_key_expiration_options(fill_lock_duration)
# loop around to retry with fallback key
else
# cache invalidation prevented lock from being taken
return yield
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't we want to loop and attempt to acquire the fill lock again in this case? I don't see a reason for this worker to not participate in the next round of waiting/filling, since a worker that hits this case won't have done any waiting yet (or it would have a lock value).

Worth noting that it's impossible for this case to happen when using_fallback_key is true, since the fallback key can never have the value IdentityCache::DELETED

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock wait limit isn't really configured as a lock fill retry limit, which seems like it would be to handle a different scenario. The lock wait limit is mostly about tolerating cache fill delays from the client that has the fill lock. Interruption from a cache invalidation doesn't take away from the number of waits, because it is a separate concern, thus lock_wait_limit -= 1 is not even done in the using_fallback_key = true code path.

I had considered whether we should have a configuration for the number of attempts to fetch or take the fill lock and ended up deciding that it wasn't even the right solution to the problem, nor is it the right problem to focus on solving right now. Specifically, the problem comes up from frequently invalidated cache keys, which isn't really a good fit for read-through caching with cache invalidation. A better solution to this problem, if we wanted to address it, would be to add a data version to the cache invalidation value (IdentityCache::DELETED) so we could switch to using the fallback key at this point. Even if we wanted that solution, we would need an extra PR to handle backwards compatibility around deploys so the new cache invalidation value doesn't get interrupted as a cache hit. Either way, we would only benefit from solving this problem if there are still enough more reads than cache invalidations, since we would only be grouping reads for the same data version.

Copy link
Contributor

@pushrax pushrax Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I left this comment before but it seems to have gotten lost: I think this context is very useful and should go into the code comment here.

The takeaway I got is that looping here (without decrementing lock_wait_limit) could end up in an unbounded loop (exceeding lock_wait_limit + 2) due to extremely frequent invalidations, and that would be bad so we'd need another limit. However that other limit doesn't really make sense, since in this scenario the cache is pretty useless anyway.

It is possible for this case to be hit randomly on a key that isn't invalidating frequently though (just happened to have 2 invalidations in a row). But that should be rare so it's probably not worth handling explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that it's impossible for this case to happen when using_fallback_key is true, since the fallback key can never have the value IdentityCache::DELETED

Good point, the && !using_fallback_key condition shouldn't be necessary. I'll change that to raise if the fallback key is invalidated to make the state for the code path clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible for this case to be hit randomly on a key that isn't invalidating frequently though (just happened to have 2 invalidations in a row). But that should be rare so it's probably not worth handling explicitly.

Yeah, this is the scenario I would like to add the data version to the cache invalidation value for. Mostly to avoid worrying about the scenario from a performance perspective.

However, we should see the benefits even in this scenario, since it should leave a smaller window of time for a thundering herd of reads to hit the database, since it would just be the round trip time to memcached rather than the longer additional time we have now of multiple queries to the database to load the record along with its embedded associations and time to serialize the cache blob.

end
when nil # Errors talking to memcached
return yield
else # hit
return result
end
end
raise "unexpected number of loop iterations"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit rough to have a non-specific exception raised here. What can the caller do about it?

I can see two paths that reach here: if the last iteration of the loop was interrupted by cache invalidation (fetch_or_take_lock returned IdentityCache::DELETED and we have a lock and we didn't previously try using the fallback key), or if we orened on the last iteration of the loop. I feel like we should not raise here, and we should instead try to handle both those paths in a sane way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can the caller do about it?

The caller isn't meant to do anything about it, because getting here means there is a bug in the code. The (lock_wait_limit + 2).times do loop could be replaced with an infinite loop, but I wanted to make it clear that it shouldn't have an infinite loop here, so this check prevents that from happening.

Caching invalidation being interrupted would be the fallback case that is mentioned in the # +2 is for first attempt and retry with fallback key comment at the start of the loop. What does oren stand for? I don't know what you mean by that path.

Copy link
Member

@fbogsany fbogsany Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does oren stand for?

Old #webscale joke that should probably be retired: sleep.

end

def mark_fill_failure_on_lock(key, expiration_options)
@cache_backend.cas(key, expiration_options) do |value|
break unless FillLock.cache_value?(value)
lock = FillLock.from_cache(*value)
break if lock.client_id != client_id
lock.mark_failed
lock.cache_value
end
end

def upsert(key, expiration_options = EMPTY_HASH)
yielded = false
upserted = @cache_backend.cas(key, expiration_options) do |value|
yielded = true
yield value
end
unless yielded
result = yield
add(key, result)
data = yield nil
upserted = add(key, data, expiration_options)
end
result
upserted
end

private
def fetch_or_take_lock(key, old_lock:, **expiration_options)
new_lock = nil
upserted = upsert(key, expiration_options) do |value|
if value.nil? || value == IdentityCache::DELETED
if old_lock # cache invalidated
return value
else
new_lock = FillLock.new(client_id: client_id, data_version: SecureRandom.uuid)
end
elsif FillLock.cache_value?(value)
fetched_lock = FillLock.from_cache(*value)
if old_lock == fetched_lock
# preserve data version since there hasn't been any cache invalidations
new_lock = FillLock.new(client_id: client_id, data_version: old_lock.data_version)
elsif old_lock && fetched_lock.data_version != old_lock.data_version
# Cache was invalidated, then another lock was taken during a lock wait.
# Treat it as any other cache invalidation, where the caller will switch
# to the fallback key.
return IdentityCache::DELETED
else
return fetched_lock
end
else # hit
return value
end
new_lock.cache_value # take lock
Copy link
Contributor Author

@dylanahsmith dylanahsmith Oct 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the code converts the lock values to primitives so that they can be serialized and deserialized with msgpack. However, an alternative would be to add a special value for msgpack to handle the hydration of these values instead of doing it explicitly in the code. With Marshal as the serializer, it can handle this for us as long as we are careful about not changing the class name or instance variables names in an incompatible way.

end

return new_lock if upserted

value = @cache_backend.read(key)
if FillLock.cache_value?(value)
FillLock.from_cache(*value)
else
value
end
end

def fill_with_lock(key, data, my_lock, expiration_options)
upserted = upsert(key, expiration_options) do |value|
return false if value.nil? || value == IdentityCache::DELETED
return true unless FillLock.cache_value?(value) # already filled
current_lock = FillLock.from_cache(*value)
if current_lock.data_version != my_lock.data_version
return false # invalidated then relocked
end
data
end

upserted
end

def lock_fill_fallback_key(key, lock)
"lock_fill:#{lock.data_version}:#{key}"
end

def fallback_key_expiration_options(fill_lock_duration)
# Override the default TTL for the fallback key lock since it won't be used for very long.
expires_in = fill_lock_duration * 2

# memcached uses integer number of seconds for TTL so round up to avoid having
# the cache store round down with `to_i`
expires_in = expires_in.ceil

# memcached TTL only gets part of the first second (https://github.com/memcached/memcached/issues/307),
# so increase TTL by 1 to compensate
expires_in += 1

{ expires_in: expires_in }
end

def client_id
@client_id ||= SecureRandom.uuid
end

def cas_multi(keys)
result = nil
Expand Down Expand Up @@ -81,8 +287,9 @@ def add_multi(keys)
result.each { |k, v| add(k, v) }
end

def add(key, value)
@cache_backend.write(key, value, unless_exist: true) if IdentityCache.should_fill_cache?
def add(key, value, **expiration_options)
return false unless IdentityCache.should_fill_cache?
@cache_backend.write(key, value, { unless_exist: true, **expiration_options })
end
end
end
4 changes: 2 additions & 2 deletions lib/identity_cache/cache_key_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ class << self
# @param cache_fetcher [_CacheFetcher]
# @param db_key Reference to what to load from the database.
# @return The database value corresponding to the database key.
def load(cache_fetcher, db_key)
def load(cache_fetcher, db_key, cache_fetcher_options = {})
cache_key = cache_fetcher.cache_key(db_key)

db_value = nil

cache_value = IdentityCache.fetch(cache_key) do
cache_value = IdentityCache.fetch(cache_key, cache_fetcher_options) do
db_value = cache_fetcher.load_one_from_db(db_key)
cache_fetcher.cache_encode(db_value)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/cached/primary_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ def initialize(model)
@model = model
end

def fetch(id)
def fetch(id, cache_fetcher_options)
id = cast_id(id)
return unless id
record = if model.should_use_cache?
object = CacheKeyLoader.load(self, id)
object = CacheKeyLoader.load(self, id, cache_fetcher_options)
if object && object.id != id
IdentityCache.logger.error(
<<~MSG.squish
Expand Down
5 changes: 4 additions & 1 deletion lib/identity_cache/fallback_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def fetch_multi(keys)
results
end

def fetch(key)
def fetch(key, **cache_fetcher_options)
unless cache_fetcher_options.empty?
raise ArgumentError, "unsupported cache_fetcher options: #{cache_fetcher_options.keys.join(', ')}"
end
result = @cache_backend.read(key)
if result.nil?
result = yield
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/memoized_cache_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def delete(key)
end
end

def fetch(key)
def fetch(key, cache_fetcher_options = {})
memo_misses = 0
cache_misses = 0

Expand All @@ -78,7 +78,7 @@ def fetch(key)

value = fetch_memoized(key) do
memo_misses = 1
@cache_fetcher.fetch(key) do
@cache_fetcher.fetch(key, cache_fetcher_options) do
cache_misses = 1
instrument_duration(payload, :resolve_miss_time) do
yield
Expand Down
Loading