Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the object cache to better account for race conditions #13204

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ src/js_embed

# vim generated
*.swp
*~

# Generated test scaffolding
src/no_warning_test.cc
Expand Down
4 changes: 1 addition & 3 deletions ruby/ext/google/protobuf_c/defs.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ static VALUE DescriptorPool_alloc(VALUE klass) {

RB_OBJ_WRITE(ret, &self->def_to_descriptor, rb_hash_new());
self->symtab = upb_DefPool_New();
ObjectCache_Add(self->symtab, ret);

return ret;
return ObjectCache_GetSet(self->symtab, ret);
}

/*
Expand Down
6 changes: 4 additions & 2 deletions ruby/ext/google/protobuf_c/map.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ VALUE Map_GetRubyWrapper(upb_Map* map, upb_CType key_type, TypeInfo value_type,
if (val == Qnil) {
val = Map_alloc(cMap);
Map* self;
ObjectCache_Add(map, val);
TypedData_Get_Struct(val, Map, &Map_type, self);
self->map = map;
self->arena = arena;
Expand All @@ -103,6 +102,7 @@ VALUE Map_GetRubyWrapper(upb_Map* map, upb_CType key_type, TypeInfo value_type,
const upb_MessageDef* val_m = self->value_type_info.def.msgdef;
self->value_type_class = Descriptor_DefToClass(val_m);
}
return ObjectCache_GetSet(map, val);
}

return val;
Expand Down Expand Up @@ -319,7 +319,9 @@ static VALUE Map_init(int argc, VALUE* argv, VALUE _self) {

self->map = upb_Map_New(Arena_get(self->arena), self->key_type,
self->value_type_info.type);
ObjectCache_Add(self->map, _self);
VALUE stored = ObjectCache_GetSet(self->map, _self);
(void)stored;
PBRUBY_ASSERT(stored == _self);

if (init_arg != Qnil) {
Map_merge_into_self(_self, init_arg);
Expand Down
4 changes: 3 additions & 1 deletion ruby/ext/google/protobuf_c/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ void Message_InitPtr(VALUE self_, upb_Message* msg, VALUE arena) {
Message* self = ruby_to_Message(self_);
self->msg = msg;
RB_OBJ_WRITE(self_, &self->arena, arena);
ObjectCache_Add(msg, self_);
VALUE stored = ObjectCache_GetSet(msg, self_);
(void)stored;
fowles marked this conversation as resolved.
Show resolved Hide resolved
PBRUBY_ASSERT(stored == self_);
}

VALUE Message_GetArena(VALUE msg_rb) {
Expand Down
166 changes: 20 additions & 146 deletions ruby/ext/google/protobuf_c/protobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,164 +252,39 @@ void Arena_register(VALUE module) {
// Object Cache
// -----------------------------------------------------------------------------

// A pointer -> Ruby Object cache that keeps references to Ruby wrapper
// objects. This allows us to look up any Ruby wrapper object by the address
// of the object it is wrapping. That way we can avoid ever creating two
// different wrapper objects for the same C object, which saves memory and
// preserves object identity.
//
// We use WeakMap for the cache. For Ruby <2.7 we also need a secondary Hash
// to store WeakMap keys because Ruby <2.7 WeakMap doesn't allow non-finalizable
// keys.
//
// We also need the secondary Hash if sizeof(long) < sizeof(VALUE), because this
// means it may not be possible to fit a pointer into a Fixnum. Keys are
// pointers, and if they fit into a Fixnum, Ruby doesn't collect them, but if
// they overflow and require allocating a Bignum, they could get collected
// prematurely, thus removing the cache entry. This happens on 64-bit Windows,
// on which pointers are 64 bits but longs are 32 bits. In this case, we enable
// the secondary Hash to hold the keys and prevent them from being collected.

#if RUBY_API_VERSION_CODE >= 20700 && SIZEOF_LONG >= SIZEOF_VALUE
#define USE_SECONDARY_MAP 0
#else
#define USE_SECONDARY_MAP 1
#endif

#if USE_SECONDARY_MAP

// Maps Numeric -> Object. The object is then used as a key into the WeakMap.
// This is needed for Ruby <2.7 where a number cannot be a key to WeakMap.
// The object is used only for its identity; it does not contain any data.
VALUE secondary_map = Qnil;

// Mutations to the map are under a mutex, because SeconaryMap_MaybeGC()
// iterates over the map which cannot happen in parallel with insertions, or
// Ruby will throw:
// can't add a new key into hash during iteration (RuntimeError)
VALUE secondary_map_mutex = Qnil;

// Lambda that will GC entries from the secondary map that are no longer present
// in the primary map.
VALUE gc_secondary_map_lambda = Qnil;
ID length;

extern VALUE weak_obj_cache;

static void SecondaryMap_Init() {
rb_gc_register_address(&secondary_map);
rb_gc_register_address(&gc_secondary_map_lambda);
rb_gc_register_address(&secondary_map_mutex);
secondary_map = rb_hash_new();
gc_secondary_map_lambda = rb_eval_string(
"->(secondary, weak) {\n"
" secondary.delete_if { |k, v| !weak.key?(v) }\n"
"}\n");
secondary_map_mutex = rb_mutex_new();
length = rb_intern("length");
}

// The secondary map is a regular Hash, and will never shrink on its own.
// The main object cache is a WeakMap that will automatically remove entries
// when the target object is no longer reachable, but unless we manually
// remove the corresponding entries from the secondary map, it will grow
// without bound.
//
// To avoid this unbounded growth we periodically remove entries from the
// secondary map that are no longer present in the WeakMap. The logic of
// how often to perform this GC is an artbirary tuning parameter that
// represents a straightforward CPU/memory tradeoff.
//
// Requires: secondary_map_mutex is held.
static void SecondaryMap_MaybeGC() {
PBRUBY_ASSERT(rb_mutex_locked_p(secondary_map_mutex) == Qtrue);
size_t weak_len = NUM2ULL(rb_funcall(weak_obj_cache, length, 0));
size_t secondary_len = RHASH_SIZE(secondary_map);
if (secondary_len < weak_len) {
// Logically this case should not be possible: a valid entry cannot exist in
// the weak table unless there is a corresponding entry in the secondary
// table. It should *always* be the case that secondary_len >= weak_len.
//
// However ObjectSpace::WeakMap#length (and therefore weak_len) is
// unreliable: it overreports its true length by including non-live objects.
// However these non-live objects are not yielded in iteration, so we may
// have previously deleted them from the secondary map in a previous
// invocation of SecondaryMap_MaybeGC().
//
// In this case, we can't measure any waste, so we just return.
return;
}
size_t waste = secondary_len - weak_len;
// GC if we could remove at least 2000 entries or 20% of the table size
// (whichever is greater). Since the cost of the GC pass is O(N), we
// want to make sure that we condition this on overall table size, to
// avoid O(N^2) CPU costs.
size_t threshold = PBRUBY_MAX(secondary_len * 0.2, 2000);
if (waste > threshold) {
rb_funcall(gc_secondary_map_lambda, rb_intern("call"), 2, secondary_map,
weak_obj_cache);
}
}

// Requires: secondary_map_mutex is held by this thread iff create == true.
static VALUE SecondaryMap_Get(VALUE key, bool create) {
PBRUBY_ASSERT(!create || rb_mutex_locked_p(secondary_map_mutex) == Qtrue);
VALUE ret = rb_hash_lookup(secondary_map, key);
if (ret == Qnil && create) {
SecondaryMap_MaybeGC();
ret = rb_class_new_instance(0, NULL, rb_cObject);
rb_hash_aset(secondary_map, key, ret);
}
return ret;
}

#endif

// Requires: secondary_map_mutex is held by this thread iff create == true.
static VALUE ObjectCache_GetKey(const void *key, bool create) {
VALUE key_val = (VALUE)key;
PBRUBY_ASSERT((key_val & 3) == 0);
VALUE ret = LL2NUM(key_val >> 2);
#if USE_SECONDARY_MAP
ret = SecondaryMap_Get(ret, create);
#endif
return ret;
}

// Public ObjectCache API.

VALUE weak_obj_cache = Qnil;
ID item_get;
ID item_set;
ID item_getset;

static void ObjectCache_Init(VALUE protobuf) {
item_get = rb_intern("get");
item_getset = rb_intern("getset");

static void ObjectCache_Init() {
rb_gc_register_address(&weak_obj_cache);
VALUE klass = rb_eval_string("ObjectSpace::WeakMap");
weak_obj_cache = rb_class_new_instance(0, NULL, klass);
item_get = rb_intern("[]");
item_set = rb_intern("[]=");
#if USE_SECONDARY_MAP
SecondaryMap_Init();
#if RUBY_API_VERSION_CODE >= 20700 && SIZEOF_LONG >= SIZEOF_VALUE
VALUE cache_class = rb_const_get(protobuf, rb_intern("ObjectCache"));
#else
VALUE cache_class = rb_const_get(protobuf, rb_intern("LegacyObjectCache"));
#endif

weak_obj_cache = rb_class_new_instance(0, NULL, cache_class);
rb_const_set(protobuf, rb_intern("OBJECT_CACHE"), weak_obj_cache);
}

void ObjectCache_Add(const void *key, VALUE val) {
PBRUBY_ASSERT(ObjectCache_Get(key) == Qnil);
#if USE_SECONDARY_MAP
rb_mutex_lock(secondary_map_mutex);
#endif
VALUE key_rb = ObjectCache_GetKey(key, true);
rb_funcall(weak_obj_cache, item_set, 2, key_rb, val);
#if USE_SECONDARY_MAP
rb_mutex_unlock(secondary_map_mutex);
#endif
PBRUBY_ASSERT(ObjectCache_Get(key) == val);
VALUE ObjectCache_GetSet(const void *key, VALUE val) {
VALUE key_val = (VALUE)key;
PBRUBY_ASSERT((key_val & 3) == 0);
return rb_funcall(weak_obj_cache, item_getset, 2, LL2NUM(key_val), val);
}

// Returns the cached object for this key, if any. Otherwise returns Qnil.
VALUE ObjectCache_Get(const void *key) {
VALUE key_rb = ObjectCache_GetKey(key, false);
return rb_funcall(weak_obj_cache, item_get, 1, key_rb);
VALUE key_val = (VALUE)key;
PBRUBY_ASSERT((key_val & 3) == 0);
fowles marked this conversation as resolved.
Show resolved Hide resolved
return rb_funcall(weak_obj_cache, item_get, 1, LL2NUM(key_val));
}

/*
Expand Down Expand Up @@ -459,11 +334,10 @@ VALUE Google_Protobuf_deep_copy(VALUE self, VALUE obj) {
// This must be named "Init_protobuf_c" because the Ruby module is named
// "protobuf_c" -- the VM looks for this symbol in our .so.
__attribute__((visibility("default"))) void Init_protobuf_c() {
ObjectCache_Init();

VALUE google = rb_define_module("Google");
VALUE protobuf = rb_define_module_under(google, "Protobuf");

ObjectCache_Init(protobuf);
Arena_register(protobuf);
Defs_register(protobuf);
RepeatedField_register(protobuf);
Expand Down
7 changes: 3 additions & 4 deletions ruby/ext/google/protobuf_c/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ void Arena_Pin(VALUE arena, VALUE obj);
// being collected (though in Ruby <2.7 is it effectively strong, due to
// implementation limitations).

// Adds an entry to the cache. The "arena" parameter must give the arena that
// "key" was allocated from. In Ruby <2.7.0, it will be used to remove the key
// from the cache when the arena is destroyed.
void ObjectCache_Add(const void* key, VALUE val);
// Tries to add a new entry to the cache, returning the newly installed value or
// the pre-existing entry.
VALUE ObjectCache_GetSet(const void* key, VALUE val);
fowles marked this conversation as resolved.
Show resolved Hide resolved

// Returns the cached object for this key, if any. Otherwise returns Qnil.
VALUE ObjectCache_Get(const void* key);
Expand Down
5 changes: 3 additions & 2 deletions ruby/ext/google/protobuf_c/repeated_field.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ VALUE RepeatedField_GetRubyWrapper(upb_Array* array, TypeInfo type_info,
if (val == Qnil) {
val = RepeatedField_alloc(cRepeatedField);
RepeatedField* self;
ObjectCache_Add(array, val);
TypedData_Get_Struct(val, RepeatedField, &RepeatedField_type, self);
self->array = array;
self->arena = arena;
self->type_info = type_info;
if (self->type_info.type == kUpb_CType_Message) {
self->type_class = Descriptor_DefToClass(type_info.def.msgdef);
}
val = ObjectCache_GetSet(array, val);
fowles marked this conversation as resolved.
Show resolved Hide resolved
}

PBRUBY_ASSERT(ruby_to_RepeatedField(val)->type_info.type == type_info.type);
Expand Down Expand Up @@ -615,7 +615,8 @@ VALUE RepeatedField_init(int argc, VALUE* argv, VALUE _self) {

self->type_info = TypeInfo_FromClass(argc, argv, 0, &self->type_class, &ary);
self->array = upb_Array_New(arena, self->type_info.type);
ObjectCache_Add(self->array, _self);
VALUE stored_val = ObjectCache_GetSet(self->array, _self);
PBRUBY_ASSERT(stored_val == _self);

if (ary != Qnil) {
if (!RB_TYPE_P(ary, T_ARRAY)) {
Expand Down
92 changes: 92 additions & 0 deletions ruby/lib/google/protobuf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,98 @@
# That way the module init can grab references to these.
module Google
module Protobuf
# A pointer -> Ruby Object cache that keeps references to Ruby wrapper
fowles marked this conversation as resolved.
Show resolved Hide resolved
# objects. This allows us to look up any Ruby wrapper object by the address
# of the object it is wrapping. That way we can avoid ever creating two
# different wrapper objects for the same C object, which saves memory and
# preserves object identity.
#
# We use WeakMap for the cache. For Ruby <2.7 we also need a secondary Hash
# to store WeakMap keys because Ruby <2.7 WeakMap doesn't allow non-finalizable
# keys.
#
# We also need the secondary Hash if sizeof(long) < sizeof(VALUE), because this
# means it may not be possible to fit a pointer into a Fixnum. Keys are
# pointers, and if they fit into a Fixnum, Ruby doesn't collect them, but if
# they overflow and require allocating a Bignum, they could get collected
# prematurely, thus removing the cache entry. This happens on 64-bit Windows,
# on which pointers are 64 bits but longs are 32 bits. In this case, we enable
# the secondary Hash to hold the keys and prevent them from being collected.
class ObjectCache
def initialize
@map = ObjectSpace::WeakMap.new
@mutex = Mutex.new
end

def get(key)
@map[key]
end

def getset(key, value)
@map[key] || @mutex.synchronize do
@map[key] ||= value
end
end

def purge
# noop to expose the same interface as LegacyObjectCache
end
end

class LegacyObjectCache
def initialize
@secondary_map = {}
@map = ObjectSpace::WeakMap.new
@mutex = Mutex.new
end

def get(key)
value = if secondary_key = @secondary_map[key]
@map[secondary_key]
else
@mutex.synchronize do
@map[(@secondary_map[key] ||= Object.new)]
end
end

# GC if we could remove at least 2000 entries or 20% of the table size
# (whichever is greater). Since the cost of the GC pass is O(N), we
# want to make sure that we condition this on overall table size, to
# avoid O(N^2) CPU costs.
cutoff = (@secondary_map.size * 0.2).ceil
cutoff = 2_000 if cutoff < 2_000
if (@secondary_map.size - @map.size) > cutoff
purge
end

value
end

def getset(key, value)
if secondary_key = @secondary_map[key]
if old_value = @map[secondary_key]
return old_value
end
end

@mutex.synchronize do
secondary_key ||= (@secondary_map[key] ||= Object.new)
@map[secondary_key] ||= value
end
end

def purge
@mutex.synchronize do
@secondary_map.each do |key, secondary_key|
unless @map.key?(secondary_key)
@secondary_map.delete(key)
end
end
end
nil
end
end

class Error < StandardError; end
class ParseError < Error; end
class TypeError < ::TypeError; end
Expand Down