Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fill lock with lock wait to avoid thundering herd problem #373

Merged
merged 6 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ inherit_from:

AllCops:
TargetRubyVersion: 2.4

Layout/BeginEndAlignment:
EnforcedStyleAlignWith: start_of_line
2 changes: 1 addition & 1 deletion dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ up:
- [email protected]:
or: [[email protected]]
conflicts: [mysql-connector-c, mysql, mysql-client]
- ruby: 2.6.5
- ruby: 2.7.2
- railgun
- bundler

Expand Down
9 changes: 7 additions & 2 deletions lib/identity_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class UnsupportedAssociationError < StandardError; end

class DerivedModelError < StandardError; end

class LockWaitTimeout < StandardError; end

mattr_accessor :cache_namespace
self.cache_namespace = "IDC:#{CACHE_VERSION}:"

Expand Down Expand Up @@ -141,10 +143,13 @@ def should_use_cache? # :nodoc:
#
# == Parameters
# +key+ A cache key string
# +cache_fetcher_options+ A hash of options to pass to the cache backend
#
def fetch(key)
def fetch(key, cache_fetcher_options = {})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but I think you could splat the options hash all the way through. This way you won't have to update any assertion/expectation and everything will keep working as expected no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing keyword arguments using ** has the performance disadvantage of duplicating the hash for each method call.

The other annoying thing with the test expectations are that some of them use the spy gem, which doesn't yet support keyword arguments (although I've opened ryanong/spy#12 for that issue). This prevents me from explicitly passing options through without a splat for any method that is referenced by Spy.on in the test suite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My upstream spy PR has been merged and released, so I'll see what I can cleanup as a result of that.

if should_use_cache?
unmap_cached_nil_for(cache.fetch(key) { map_cached_nil_for(yield) })
unmap_cached_nil_for(cache.fetch(key, cache_fetcher_options) do
map_cached_nil_for(yield)
end)
else
yield
end
Expand Down
248 changes: 232 additions & 16 deletions lib/identity_cache/cache_fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,52 @@
# frozen_string_literal: true

require 'securerandom'

module IdentityCache
class CacheFetcher
attr_accessor :cache_backend

EMPTY_HASH = {}.freeze

class FillLock
FILL_LOCKED = :fill_locked
FAILED_CLIENT_ID = 'fill_failed'

class << self
def from_cache(marker, client_id, data_version)
raise ArgumentError unless marker == FILL_LOCKED
new(client_id: client_id, data_version: data_version)
end

def cache_value?(cache_value)
cache_value.is_a?(Array) && cache_value.length == 3 && cache_value.first == FILL_LOCKED
end
end

attr_reader :client_id, :data_version

def initialize(client_id:, data_version:)
@client_id = client_id
@data_version = data_version
end

def cache_value
[FILL_LOCKED, client_id, data_version]
end

def mark_failed
@client_id = FAILED_CLIENT_ID
end

def fill_failed?
@client_id == FAILED_CLIENT_ID
end

def ==(other)
self.class == other.class && client_id == other.client_id && data_version == other.data_version
end
end

def initialize(cache_backend)
@cache_backend = cache_backend
end
Expand All @@ -25,27 +69,198 @@ def fetch_multi(keys, &block)
results
end

def fetch(key)
result = nil
yielded = false
@cache_backend.cas(key) do |value|
yielded = true
unless IdentityCache::DELETED == value
result = value
break
def fetch(key, fill_lock_duration: nil, lock_wait_tries: 2)
if fill_lock_duration && IdentityCache.should_fill_cache?
fetch_with_fill_lock(key, fill_lock_duration, lock_wait_tries) do
yield
end
else
fetch_without_fill_lock(key) { yield }
end
end

private

def fetch_without_fill_lock(key)
data = nil
upsert(key) do |value|
value = nil if value == IdentityCache::DELETED || FillLock.cache_value?(value)
unless value.nil?
return value
end
result = yield
data = yield
break unless IdentityCache.should_fill_cache?
result
data
end
data
end

def fetch_with_fill_lock(key, fill_lock_duration, lock_wait_tries)
raise ArgumentError, 'fill_lock_duration must be greater than 0.0' unless fill_lock_duration > 0.0
raise ArgumentError, 'lock_wait_tries must be greater than 0' unless lock_wait_tries > 0
lock = nil
using_fallback_key = false
expiration_options = EMPTY_HASH
(lock_wait_tries + 2).times do # +2 is for first attempt and retry with fallback key
result = fetch_or_take_lock(key, old_lock: lock, **expiration_options)
case result
when FillLock
lock = result
if lock.client_id == client_id # have lock
data = begin
yield
rescue
mark_fill_failure_on_lock(key, expiration_options)
raise
end

if !fill_with_lock(key, data, lock, expiration_options) && !using_fallback_key
# fallback to storing data in the fallback key so it is available to clients waiting on the lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache invalidations could happen while a cache miss is being resolved which could prevent clients that are in a lock wait from reading the value after the lock wait.

Is the problem this is addressing that the waiting clients would have to wait even longer, and we might as well serve them stale data since they queried before it was invalidated? i.e. if the lock wait timeout is infinite and the client is okay with higher latency, this wouldn't be necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't serve stale data from the perspective of the client. It is similar to how in MVCC the reader won't try to read a write that came in after it has started the query and chosen a data version for the query. However, the data version for a fill lock / fallback key should be thought of as a lower bound in this case.

The problem this is addressing is latency caused by cache invalidations. In the theoretical case where we didn't have to worry about latency, then this wouldn't be necessary. However, in practice we don't want the fill lock to turn into a queue because it only allows one client to try to fill at a time and the cache invalidations can overwrite or prevent the cache fill preventing any other clients from benefiting from their lock wait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't serve stale data from the perspective of the client.

Right, that part was clear; I wanted to make sure I fully understood the reason for the extra complexity from the fallback key.

expiration_options = fallback_key_expiration_options(fill_lock_duration)
@cache_backend.write(lock_fill_fallback_key(key, lock), data, expiration_options)
end
return data
else
raise LockWaitTimeout if lock_wait_tries <= 0
lock_wait_tries -= 1

# If fill failed in the other client, then it might be failing fast
# so avoid waiting the typical amount of time for a lock wait. The
# semian gem can be used to handle failing fast when the database is slow.
if lock.fill_failed?
return fetch_without_fill_lock(key) { yield }
end

# lock wait
sleep(fill_lock_duration)
# loop around to retry fetch_or_take_lock
end
when IdentityCache::DELETED # interrupted by cache invalidation
if using_fallback_key
raise "unexpected cache invalidation of versioned fallback key"
elsif lock
# Cache invalidated during lock wait, use a versioned fallback key
# to avoid further cache invalidation interruptions.
using_fallback_key = true
key = lock_fill_fallback_key(key, lock)
expiration_options = fallback_key_expiration_options(fill_lock_duration)
# loop around to retry with fallback key
else
# Cache invalidation prevented lock from being taken or read, so we don't
# have a data version to use to build a shared fallback key. In the future
# we could add the data version to the cache invalidation value so a fallback
# key could be used here. For now, we assume that a cache invalidation occuring
# just after the cache wasn't filled is more likely a sign of a key that is
# written more than read (which this cache isn't a good fit for), rather than
# a thundering herd or reads.
return yield
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't we want to loop and attempt to acquire the fill lock again in this case? I don't see a reason for this worker to not participate in the next round of waiting/filling, since a worker that hits this case won't have done any waiting yet (or it would have a lock value).

Worth noting that it's impossible for this case to happen when using_fallback_key is true, since the fallback key can never have the value IdentityCache::DELETED

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock wait limit isn't really configured as a lock fill retry limit, which seems like it would be to handle a different scenario. The lock wait limit is mostly about tolerating cache fill delays from the client that has the fill lock. Interruption from a cache invalidation doesn't take away from the number of waits, because it is a separate concern, thus lock_wait_limit -= 1 is not even done in the using_fallback_key = true code path.

I had considered whether we should have a configuration for the number of attempts to fetch or take the fill lock and ended up deciding that it wasn't even the right solution to the problem, nor is it the right problem to focus on solving right now. Specifically, the problem comes up from frequently invalidated cache keys, which isn't really a good fit for read-through caching with cache invalidation. A better solution to this problem, if we wanted to address it, would be to add a data version to the cache invalidation value (IdentityCache::DELETED) so we could switch to using the fallback key at this point. Even if we wanted that solution, we would need an extra PR to handle backwards compatibility around deploys so the new cache invalidation value doesn't get interrupted as a cache hit. Either way, we would only benefit from solving this problem if there are still enough more reads than cache invalidations, since we would only be grouping reads for the same data version.

Copy link
Contributor

@pushrax pushrax Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I left this comment before but it seems to have gotten lost: I think this context is very useful and should go into the code comment here.

The takeaway I got is that looping here (without decrementing lock_wait_limit) could end up in an unbounded loop (exceeding lock_wait_limit + 2) due to extremely frequent invalidations, and that would be bad so we'd need another limit. However that other limit doesn't really make sense, since in this scenario the cache is pretty useless anyway.

It is possible for this case to be hit randomly on a key that isn't invalidating frequently though (just happened to have 2 invalidations in a row). But that should be rare so it's probably not worth handling explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth noting that it's impossible for this case to happen when using_fallback_key is true, since the fallback key can never have the value IdentityCache::DELETED

Good point, the && !using_fallback_key condition shouldn't be necessary. I'll change that to raise if the fallback key is invalidated to make the state for the code path clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible for this case to be hit randomly on a key that isn't invalidating frequently though (just happened to have 2 invalidations in a row). But that should be rare so it's probably not worth handling explicitly.

Yeah, this is the scenario I would like to add the data version to the cache invalidation value for. Mostly to avoid worrying about the scenario from a performance perspective.

However, we should see the benefits even in this scenario, since it should leave a smaller window of time for a thundering herd of reads to hit the database, since it would just be the round trip time to memcached rather than the longer additional time we have now of multiple queries to the database to load the record along with its embedded associations and time to serialize the cache blob.

end
when nil # Errors talking to memcached
return yield
else # hit
return result
end
end
raise "unexpected number of loop iterations"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit rough to have a non-specific exception raised here. What can the caller do about it?

I can see two paths that reach here: if the last iteration of the loop was interrupted by cache invalidation (fetch_or_take_lock returned IdentityCache::DELETED and we have a lock and we didn't previously try using the fallback key), or if we orened on the last iteration of the loop. I feel like we should not raise here, and we should instead try to handle both those paths in a sane way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can the caller do about it?

The caller isn't meant to do anything about it, because getting here means there is a bug in the code. The (lock_wait_limit + 2).times do loop could be replaced with an infinite loop, but I wanted to make it clear that it shouldn't have an infinite loop here, so this check prevents that from happening.

Caching invalidation being interrupted would be the fallback case that is mentioned in the # +2 is for first attempt and retry with fallback key comment at the start of the loop. What does oren stand for? I don't know what you mean by that path.

Copy link
Member

@fbogsany fbogsany Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does oren stand for?

Old #webscale joke that should probably be retired: sleep.

end

def mark_fill_failure_on_lock(key, expiration_options)
@cache_backend.cas(key, expiration_options) do |value|
break unless FillLock.cache_value?(value)
lock = FillLock.from_cache(*value)
break if lock.client_id != client_id
lock.mark_failed
lock.cache_value
end
end

def upsert(key, expiration_options = EMPTY_HASH)
yielded = false
upserted = @cache_backend.cas(key, expiration_options) do |value|
yielded = true
yield value
end
unless yielded
result = yield
add(key, result)
data = yield nil
upserted = add(key, data, expiration_options)
end
result
upserted
end

private
def fetch_or_take_lock(key, old_lock:, **expiration_options)
new_lock = nil
upserted = upsert(key, expiration_options) do |value|
if value.nil? || value == IdentityCache::DELETED
if old_lock # cache invalidated
return value
else
new_lock = FillLock.new(client_id: client_id, data_version: SecureRandom.uuid)
end
elsif FillLock.cache_value?(value)
fetched_lock = FillLock.from_cache(*value)
if old_lock == fetched_lock
# preserve data version since there hasn't been any cache invalidations
new_lock = FillLock.new(client_id: client_id, data_version: old_lock.data_version)
elsif old_lock && fetched_lock.data_version != old_lock.data_version
# Cache was invalidated, then another lock was taken during a lock wait.
# Treat it as any other cache invalidation, where the caller will switch
# to the fallback key.
return IdentityCache::DELETED
else
return fetched_lock
end
else # hit
return value
end
new_lock.cache_value # take lock
Copy link
Contributor Author

@dylanahsmith dylanahsmith Oct 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the code converts the lock values to primitives so that they can be serialized and deserialized with msgpack. However, an alternative would be to add a special value for msgpack to handle the hydration of these values instead of doing it explicitly in the code. With Marshal as the serializer, it can handle this for us as long as we are careful about not changing the class name or instance variables names in an incompatible way.

end

return new_lock if upserted

value = @cache_backend.read(key)
if FillLock.cache_value?(value)
FillLock.from_cache(*value)
else
value
end
end

def fill_with_lock(key, data, my_lock, expiration_options)
upserted = upsert(key, expiration_options) do |value|
return false if value.nil? || value == IdentityCache::DELETED
return true unless FillLock.cache_value?(value) # already filled
current_lock = FillLock.from_cache(*value)
if current_lock.data_version != my_lock.data_version
return false # invalidated then relocked
end
data
end

upserted
end

def lock_fill_fallback_key(key, lock)
"lock_fill:#{lock.data_version}:#{key}"
end

def fallback_key_expiration_options(fill_lock_duration)
# Override the default TTL for the fallback key lock since it won't be used for very long.
expires_in = fill_lock_duration * 2

# memcached uses integer number of seconds for TTL so round up to avoid having
# the cache store round down with `to_i`
expires_in = expires_in.ceil

# memcached TTL only gets part of the first second (https://github.com/memcached/memcached/issues/307),
# so increase TTL by 1 to compensate
expires_in += 1

{ expires_in: expires_in }
end

def client_id
@client_id ||= SecureRandom.uuid
end

def cas_multi(keys)
result = nil
Expand Down Expand Up @@ -81,8 +296,9 @@ def add_multi(keys)
result.each { |k, v| add(k, v) }
end

def add(key, value)
@cache_backend.write(key, value, unless_exist: true) if IdentityCache.should_fill_cache?
def add(key, value, expiration_options = EMPTY_HASH)
return false unless IdentityCache.should_fill_cache?
@cache_backend.write(key, value, { unless_exist: true, **expiration_options })
end
end
end
4 changes: 2 additions & 2 deletions lib/identity_cache/cache_key_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ class << self
# @param cache_fetcher [_CacheFetcher]
# @param db_key Reference to what to load from the database.
# @return The database value corresponding to the database key.
def load(cache_fetcher, db_key)
def load(cache_fetcher, db_key, cache_fetcher_options = {})
cache_key = cache_fetcher.cache_key(db_key)

db_value = nil

cache_value = IdentityCache.fetch(cache_key) do
cache_value = IdentityCache.fetch(cache_key, cache_fetcher_options) do
db_value = cache_fetcher.load_one_from_db(db_key)
cache_fetcher.cache_encode(db_value)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/cached/primary_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ def initialize(model)
@model = model
end

def fetch(id)
def fetch(id, cache_fetcher_options)
id = cast_id(id)
return unless id
record = if model.should_use_cache?
object = CacheKeyLoader.load(self, id)
object = CacheKeyLoader.load(self, id, cache_fetcher_options)
if object && object.id != id
IdentityCache.logger.error(
<<~MSG.squish
Expand Down
5 changes: 4 additions & 1 deletion lib/identity_cache/fallback_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def fetch_multi(keys)
results
end

def fetch(key)
def fetch(key, **cache_fetcher_options)
unless cache_fetcher_options.empty?
raise ArgumentError, "unsupported cache_fetcher options: #{cache_fetcher_options.keys.join(', ')}"
end
result = @cache_backend.read(key)
if result.nil?
result = yield
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/memoized_cache_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def delete(key)
end
end

def fetch(key)
def fetch(key, cache_fetcher_options = {})
memo_misses = 0
cache_misses = 0

Expand All @@ -78,7 +78,7 @@ def fetch(key)

value = fetch_memoized(key) do
memo_misses = 1
@cache_fetcher.fetch(key) do
@cache_fetcher.fetch(key, **cache_fetcher_options) do
cache_misses = 1
instrument_duration(payload, :resolve_miss_time) do
yield
Expand Down
Loading