Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unbounded memory growth for Ruby <2.7. #8429

Merged
merged 4 commits into from Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 79 additions & 6 deletions ruby/ext/google/protobuf_c/protobuf.c
Expand Up @@ -251,14 +251,80 @@ void Arena_register(VALUE module) {
// The object is used only for its identity; it does not contain any data.
VALUE secondary_map = Qnil;

// Mutations to the map are under a mutex, because SeconaryMap_MaybeGC()
// iterates over the map which cannot happen in parallel with insertions, or
// Ruby will throw:
// can't add a new key into hash during iteration (RuntimeError)
VALUE secondary_map_mutex = Qnil;

// Lambda that will GC entries from the secondary map that are no longer present
// in the primary map.
VALUE gc_secondary_map_lambda = Qnil;
ID length;

extern VALUE weak_obj_cache;

static void SecondaryMap_Init() {
rb_gc_register_address(&secondary_map);
rb_gc_register_address(&gc_secondary_map_lambda);
rb_gc_register_address(&secondary_map_mutex);
secondary_map = rb_hash_new();
gc_secondary_map_lambda = rb_eval_string(
"->(secondary, weak) {\n"
" secondary.delete_if { |k, v| !weak.key?(v) }\n"
"}\n");
secondary_map_mutex = rb_mutex_new();
length = rb_intern("length");
}

static VALUE SecondaryMap_Get(VALUE key) {
// The secondary map is a regular Hash, and will never shrink on its own.
// The main object cache is a WeakMap that will automatically remove entries
// when the target object is no longer reachable, but unless we manually
// remove the corresponding entries from the secondary map, it will grow
// without bound.
//
// To avoid this unbounded growth we periodically remove entries from the
// secondary map that are no longer present in the WeakMap. The logic of
// how often to perform this GC is an artbirary tuning parameter that
// represents a straightforward CPU/memory tradeoff.
//
// Requires: secondary_map_mutex is held.
static void SecondaryMap_MaybeGC() {
haberman marked this conversation as resolved.
Show resolved Hide resolved
PBRUBY_ASSERT(rb_mutex_locked_p(secondary_map_mutex) == Qtrue);
size_t weak_len = NUM2ULL(rb_funcall(weak_obj_cache, length, 0));
size_t secondary_len = RHASH_SIZE(secondary_map);
if (secondary_len < weak_len) {
// Logically this case should not be possible: a valid entry cannot exist in
// the weak table unless there is a corresponding entry in the secondary
// table. It should *always* be the case that secondary_len >= weak_len.
//
// However ObjectSpace::WeakMap#length (and therefore weak_len) is
// unreliable: it overreports its true length by including non-live objects.
// However these non-live objects are not yielded in iteration, so we may
// have previously deleted them from the secondary map in a previous
// invocation of SecondaryMap_MaybeGC().
//
// In this case, we can't measure any waste, so we just return.
return;
}
size_t waste = secondary_len - weak_len;
// GC if we could remove at least 2000 entries or 20% of the table size
// (whichever is greater). Since the cost of the GC pass is O(N), we
// want to make sure that we condition this on overall table size, to
// avoid O(N^2) CPU costs.
size_t threshold = PBRUBY_MAX(secondary_len * 0.2, 2000);
if (waste > threshold) {
rb_funcall(gc_secondary_map_lambda, rb_intern("call"), 2,
secondary_map, weak_obj_cache);
}
}

// Requires: secondary_map_mutex is held by this thread iff create == true.
static VALUE SecondaryMap_Get(VALUE key, bool create) {
PBRUBY_ASSERT(!create || rb_mutex_locked_p(secondary_map_mutex) == Qtrue);
VALUE ret = rb_hash_lookup(secondary_map, key);
if (ret == Qnil) {
if (ret == Qnil && create) {
SecondaryMap_MaybeGC();
ret = rb_eval_string("Object.new");
rb_hash_aset(secondary_map, key, ret);
}
Expand All @@ -267,14 +333,15 @@ static VALUE SecondaryMap_Get(VALUE key) {

#endif

static VALUE ObjectCache_GetKey(const void* key) {
// Requires: secondary_map_mutex is held by this thread iff create == true.
static VALUE ObjectCache_GetKey(const void* key, bool create) {
char buf[sizeof(key)];
memcpy(&buf, &key, sizeof(key));
intptr_t key_int = (intptr_t)key;
PBRUBY_ASSERT((key_int & 3) == 0);
VALUE ret = LL2NUM(key_int >> 2);
#if USE_SECONDARY_MAP
ret = SecondaryMap_Get(ret);
ret = SecondaryMap_Get(ret, create);
#endif
return ret;
}
Expand All @@ -298,14 +365,20 @@ static void ObjectCache_Init() {

void ObjectCache_Add(const void* key, VALUE val) {
PBRUBY_ASSERT(ObjectCache_Get(key) == Qnil);
VALUE key_rb = ObjectCache_GetKey(key);
#if USE_SECONDARY_MAP
rb_mutex_lock(secondary_map_mutex);
#endif
VALUE key_rb = ObjectCache_GetKey(key, true);
rb_funcall(weak_obj_cache, item_set, 2, key_rb, val);
#if USE_SECONDARY_MAP
rb_mutex_unlock(secondary_map_mutex);
#endif
PBRUBY_ASSERT(ObjectCache_Get(key) == val);
}

// Returns the cached object for this key, if any. Otherwise returns Qnil.
VALUE ObjectCache_Get(const void* key) {
VALUE key_rb = ObjectCache_GetKey(key);
VALUE key_rb = ObjectCache_GetKey(key, false);
return rb_funcall(weak_obj_cache, item_get, 1, key_rb);
}

Expand Down
2 changes: 2 additions & 0 deletions ruby/ext/google/protobuf_c/protobuf.h
Expand Up @@ -106,6 +106,8 @@ extern VALUE cTypeError;
#define PBRUBY_ASSERT(expr) assert(expr)
#endif

#define PBRUBY_MAX(x, y) (((x) > (y)) ? (x) : (y))

#define UPB_UNUSED(var) (void)var

#endif // __GOOGLE_PROTOBUF_RUBY_PROTOBUF_H__