2#include "internal/gc.h"
3#include "internal/concurrent_set.h"
7#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1))
8#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT)
10enum concurrent_set_special_values {
12 CONCURRENT_SET_DELETED,
14 CONCURRENT_SET_SPECIAL_VALUE_COUNT
24 unsigned int capacity;
25 unsigned int deleted_entries;
33 if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT)
return;
35 RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0);
37 VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT;
38 VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
43 RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash);
50 VALUE hash = set->funcs->hash(key);
51 hash &= CONCURRENT_SET_HASH_MASK;
53 hash ^= CONCURRENT_SET_HASH_MASK;
56 RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT));
61concurrent_set_free(
void *ptr)
68concurrent_set_size(
const void *ptr)
81concurrent_set_mark(
void *ptr)
89 .dmark = concurrent_set_mark,
90 .dfree = concurrent_set_free,
91 .dsize = concurrent_set_size,
94 .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_EMBEDDABLE
104 set->capacity = capacity;
109rb_concurrent_set_size(
VALUE set_obj)
125 RUBY_ASSERT((set->capacity & (set->capacity - 1)) == 0);
127 probe->mask = set->capacity - 1;
128 probe->idx = hash & probe->mask;
136 probe->idx = (probe->idx + probe->d) & probe->mask;
141concurrent_set_try_resize_without_locking(
VALUE old_set_obj,
VALUE *set_obj_ptr)
144 if (rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE) != old_set_obj) {
148 struct concurrent_set *old_set = RTYPEDDATA_GET_DATA(old_set_obj);
152 int expected_size = rbimpl_atomic_load(&old_set->size, RBIMPL_ATOMIC_RELAXED) - old_set->deleted_entries;
156 int old_capacity = old_set->capacity;
157 int new_capacity = old_capacity * 2;
158 if (new_capacity > expected_size * 8) {
159 new_capacity = old_capacity / 2;
161 else if (new_capacity > expected_size * 4) {
162 new_capacity = old_capacity;
166 VALUE new_set_obj = rb_concurrent_set_new(old_set->funcs, new_capacity);
167 struct concurrent_set *new_set = RTYPEDDATA_GET_DATA(new_set_obj);
169 for (
int i = 0; i < old_capacity; i++) {
171 VALUE key = rbimpl_atomic_value_exchange(&old_entry->key, CONCURRENT_SET_MOVED, RBIMPL_ATOMIC_ACQUIRE);
174 if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT)
continue;
177 VALUE hash = rbimpl_atomic_value_load(&old_entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK;
179 RUBY_ASSERT(hash == concurrent_set_hash(old_set, key));
183 int idx = concurrent_set_probe_start(&probe, new_set, hash);
188 if (entry->hash == CONCURRENT_SET_EMPTY) {
192 RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
199 RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
200 entry->hash |= CONCURRENT_SET_CONTINUATION_BIT;
201 idx = concurrent_set_probe_next(&probe);
205 rbimpl_atomic_value_store(set_obj_ptr, new_set_obj, RBIMPL_ATOMIC_RELEASE);
211concurrent_set_try_resize(
VALUE old_set_obj,
VALUE *set_obj_ptr)
214 concurrent_set_try_resize_without_locking(old_set_obj, set_obj_ptr);
219rb_concurrent_set_find(
VALUE *set_obj_ptr,
VALUE key)
221 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
230 set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
232 set = RTYPEDDATA_GET_DATA(set_obj);
237 hash = concurrent_set_hash(set, key);
239 RUBY_ASSERT(hash == concurrent_set_hash(set, key));
241 idx = concurrent_set_probe_start(&probe, set, hash);
245 VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
246 VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
247 bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
249 if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
253 if (curr_hash != hash) {
257 idx = concurrent_set_probe_next(&probe);
261 VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
264 case CONCURRENT_SET_EMPTY:
267 case CONCURRENT_SET_DELETED:
269 case CONCURRENT_SET_MOVED:
275 if (UNLIKELY(!
RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
281 if (set->funcs->cmp(key, curr_key)) {
295 idx = concurrent_set_probe_next(&probe);
300rb_concurrent_set_find_or_insert(
VALUE *set_obj_ptr,
VALUE key,
void *data)
302 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
306 VALUE result = rb_concurrent_set_find(set_obj_ptr, key);
307 if (result)
return result;
311 VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
315 key = set->funcs->create(key, data);
316 VALUE hash = concurrent_set_hash(set, key);
325 set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
327 set = RTYPEDDATA_GET_DATA(set_obj);
329 RUBY_ASSERT(hash == concurrent_set_hash(set, key));
332 idx = concurrent_set_probe_start(&probe, set, hash);
336 VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
337 VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
338 bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
340 if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
342 curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
343 if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
349 curr_hash_and_flags = hash;
355 if (curr_hash != hash) {
359 VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
362 case CONCURRENT_SET_EMPTY: {
363 rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
366 bool load_factor_reached = (uint64_t)(prev_size * 4) >= (uint64_t)(set->capacity * 3);
368 if (UNLIKELY(load_factor_reached)) {
369 concurrent_set_try_resize(set_obj, set_obj_ptr);
373 VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
374 if (prev_key == CONCURRENT_SET_EMPTY) {
375 RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key);
381 rbimpl_atomic_sub(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
387 case CONCURRENT_SET_DELETED:
389 case CONCURRENT_SET_MOVED:
398 if (UNLIKELY(!
RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
402 rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
406 if (set->funcs->cmp(key, curr_key)) {
413 if (set->funcs->free) set->funcs->free(key);
421 RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY);
422 concurrent_set_mark_continuation(entry, curr_hash_and_flags);
423 idx = concurrent_set_probe_next(&probe);
430 ASSERT_vm_locking_with_barrier();
432 if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) {
433 entry->hash = CONCURRENT_SET_CONTINUATION_BIT;
434 entry->key = CONCURRENT_SET_DELETED;
435 set->deleted_entries++;
438 entry->hash = CONCURRENT_SET_EMPTY;
439 entry->key = CONCURRENT_SET_EMPTY;
445rb_concurrent_set_delete_by_identity(
VALUE set_obj,
VALUE key)
447 ASSERT_vm_locking_with_barrier();
451 VALUE hash = concurrent_set_hash(set, key);
454 int idx = concurrent_set_probe_start(&probe, set, hash);
458 VALUE curr_key = entry->key;
461 case CONCURRENT_SET_EMPTY:
464 case CONCURRENT_SET_DELETED:
466 case CONCURRENT_SET_MOVED:
467 rb_bug(
"rb_concurrent_set_delete_by_identity: moved entry");
470 if (key == curr_key) {
471 RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash);
472 concurrent_set_delete_entry_locked(set, entry);
478 idx = concurrent_set_probe_next(&probe);
483rb_concurrent_set_foreach_with_replace(
VALUE set_obj,
int (*callback)(
VALUE *key,
void *data),
void *data)
485 ASSERT_vm_locking_with_barrier();
489 for (
unsigned int i = 0; i < set->capacity; i++) {
491 VALUE key = entry->key;
494 case CONCURRENT_SET_EMPTY:
495 case CONCURRENT_SET_DELETED:
497 case CONCURRENT_SET_MOVED:
498 rb_bug(
"rb_concurrent_set_foreach_with_replace: moved entry");
501 int ret = callback(&entry->key, data);
506 concurrent_set_delete_entry_locked(set, entry);
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
#define xfree
Old name of ruby_xfree.
#define ZALLOC_N
Old name of RB_ZALLOC_N.
#define RB_GC_GUARD(v)
Prevents premature destruction of local objects.
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
static bool RB_SPECIAL_CONST_P(VALUE obj)
Checks if the given object is of enum ruby_special_consts.
This is the struct that holds necessary info for a struct.
const char * wrap_struct_name
Name of structs of this kind.
uintptr_t VALUE
Type that represents a Ruby object.