Ruby 4.0.0dev (2025-12-23 revision d517e04806616d2384fd2e1e3aa63eea99036669)
concurrent_set.c (d517e04806616d2384fd2e1e3aa63eea99036669)
1#include "internal.h"
2#include "internal/gc.h"
3#include "internal/concurrent_set.h"
4#include "ruby/atomic.h"
5#include "vm_sync.h"
6
7#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1))
8#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT)
9
10enum concurrent_set_special_values {
11 CONCURRENT_SET_EMPTY,
12 CONCURRENT_SET_DELETED,
13 CONCURRENT_SET_MOVED,
14 CONCURRENT_SET_SPECIAL_VALUE_COUNT
15};
16
18 VALUE hash;
19 VALUE key;
20};
21
23 rb_atomic_t size;
24 unsigned int capacity;
25 unsigned int deleted_entries;
26 const struct rb_concurrent_set_funcs *funcs;
27 struct concurrent_set_entry *entries;
28};
29
30static void
31concurrent_set_mark_continuation(struct concurrent_set_entry *entry, VALUE curr_hash_and_flags)
32{
33 if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT) return;
34
35 RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0);
36
37 VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT;
38 VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
39
40 // At the moment we only expect to be racing concurrently against another
41 // thread also setting the continuation bit.
42 // In the future if deletion is concurrent this will need adjusting
43 RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash);
44 (void)prev_hash;
45}
46
47static VALUE
48concurrent_set_hash(const struct concurrent_set *set, VALUE key)
49{
50 VALUE hash = set->funcs->hash(key);
51 hash &= CONCURRENT_SET_HASH_MASK;
52 if (hash == 0) {
53 hash ^= CONCURRENT_SET_HASH_MASK;
54 }
55 RUBY_ASSERT(hash != 0);
56 RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT));
57 return hash;
58}
59
60static void
61concurrent_set_free(void *ptr)
62{
63 struct concurrent_set *set = ptr;
64 xfree(set->entries);
65}
66
67static size_t
68concurrent_set_size(const void *ptr)
69{
70 const struct concurrent_set *set = ptr;
71 return sizeof(struct concurrent_set) +
72 (set->capacity * sizeof(struct concurrent_set_entry));
73}
74
75/* Hack: Though it would be trivial, we're intentionally avoiding WB-protecting
76 * this object. This prevents the object from aging and ensures it can always be
77 * collected in a minor GC.
78 * Longer term this deserves a better way to reclaim memory promptly.
79 */
80static void
81concurrent_set_mark(void *ptr)
82{
83 (void)ptr;
84}
85
86static const rb_data_type_t concurrent_set_type = {
87 .wrap_struct_name = "VM/concurrent_set",
88 .function = {
89 .dmark = concurrent_set_mark,
90 .dfree = concurrent_set_free,
91 .dsize = concurrent_set_size,
92 },
93 /* Hack: NOT WB_PROTECTED on purpose (see above) */
94 .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_EMBEDDABLE
95};
96
98rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity)
99{
100 struct concurrent_set *set;
101 VALUE obj = TypedData_Make_Struct(0, struct concurrent_set, &concurrent_set_type, set);
102 set->funcs = funcs;
103 set->entries = ZALLOC_N(struct concurrent_set_entry, capacity);
104 set->capacity = capacity;
105 return obj;
106}
107
109rb_concurrent_set_size(VALUE set_obj)
110{
111 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
112
113 return RUBY_ATOMIC_LOAD(set->size);
114}
115
117 int idx;
118 int d;
119 int mask;
120};
121
122static int
123concurrent_set_probe_start(struct concurrent_set_probe *probe, struct concurrent_set *set, VALUE hash)
124{
125 RUBY_ASSERT((set->capacity & (set->capacity - 1)) == 0);
126 probe->d = 0;
127 probe->mask = set->capacity - 1;
128 probe->idx = hash & probe->mask;
129 return probe->idx;
130}
131
132static int
133concurrent_set_probe_next(struct concurrent_set_probe *probe)
134{
135 probe->d++;
136 probe->idx = (probe->idx + probe->d) & probe->mask;
137 return probe->idx;
138}
139
140static void
141concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
142{
143 // Check if another thread has already resized.
144 if (rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE) != old_set_obj) {
145 return;
146 }
147
148 struct concurrent_set *old_set = RTYPEDDATA_GET_DATA(old_set_obj);
149
150 // This may overcount by up to the number of threads concurrently attempting to insert
151 // GC may also happen between now and the set being rebuilt
152 int expected_size = rbimpl_atomic_load(&old_set->size, RBIMPL_ATOMIC_RELAXED) - old_set->deleted_entries;
153
154 // NOTE: new capacity must make sense with load factor, don't change one without checking the other.
155 struct concurrent_set_entry *old_entries = old_set->entries;
156 int old_capacity = old_set->capacity;
157 int new_capacity = old_capacity * 2;
158 if (new_capacity > expected_size * 8) {
159 new_capacity = old_capacity / 2;
160 }
161 else if (new_capacity > expected_size * 4) {
162 new_capacity = old_capacity;
163 }
164
165 // May cause GC and therefore deletes, so must happen first.
166 VALUE new_set_obj = rb_concurrent_set_new(old_set->funcs, new_capacity);
167 struct concurrent_set *new_set = RTYPEDDATA_GET_DATA(new_set_obj);
168
169 for (int i = 0; i < old_capacity; i++) {
170 struct concurrent_set_entry *old_entry = &old_entries[i];
171 VALUE key = rbimpl_atomic_value_exchange(&old_entry->key, CONCURRENT_SET_MOVED, RBIMPL_ATOMIC_ACQUIRE);
172 RUBY_ASSERT(key != CONCURRENT_SET_MOVED);
173
174 if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
175 if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
176
177 VALUE hash = rbimpl_atomic_value_load(&old_entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK;
178 RUBY_ASSERT(hash != 0);
179 RUBY_ASSERT(hash == concurrent_set_hash(old_set, key));
180
181 // Insert key into new_set.
182 struct concurrent_set_probe probe;
183 int idx = concurrent_set_probe_start(&probe, new_set, hash);
184
185 while (true) {
186 struct concurrent_set_entry *entry = &new_set->entries[idx];
187
188 if (entry->hash == CONCURRENT_SET_EMPTY) {
189 RUBY_ASSERT(entry->key == CONCURRENT_SET_EMPTY);
190
191 new_set->size++;
192 RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
193
194 entry->key = key;
195 entry->hash = hash;
196 break;
197 }
198
199 RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
200 entry->hash |= CONCURRENT_SET_CONTINUATION_BIT;
201 idx = concurrent_set_probe_next(&probe);
202 }
203 }
204
205 rbimpl_atomic_value_store(set_obj_ptr, new_set_obj, RBIMPL_ATOMIC_RELEASE);
206
207 RB_GC_GUARD(old_set_obj);
208}
209
210static void
211concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr)
212{
213 RB_VM_LOCKING() {
214 concurrent_set_try_resize_without_locking(old_set_obj, set_obj_ptr);
215 }
216}
217
218VALUE
219rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
220{
221 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
222
223 VALUE set_obj;
224 VALUE hash = 0;
225 struct concurrent_set *set;
226 struct concurrent_set_probe probe;
227 int idx;
228
229 retry:
230 set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
231 RUBY_ASSERT(set_obj);
232 set = RTYPEDDATA_GET_DATA(set_obj);
233
234 if (hash == 0) {
235 // We don't need to recompute the hash on every retry because it should
236 // never change.
237 hash = concurrent_set_hash(set, key);
238 }
239 RUBY_ASSERT(hash == concurrent_set_hash(set, key));
240
241 idx = concurrent_set_probe_start(&probe, set, hash);
242
243 while (true) {
244 struct concurrent_set_entry *entry = &set->entries[idx];
245 VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
246 VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
247 bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
248
249 if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
250 return 0;
251 }
252
253 if (curr_hash != hash) {
254 if (!continuation) {
255 return 0;
256 }
257 idx = concurrent_set_probe_next(&probe);
258 continue;
259 }
260
261 VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
262
263 switch (curr_key) {
264 case CONCURRENT_SET_EMPTY:
265 // In-progress insert: hash written but key not yet
266 break;
267 case CONCURRENT_SET_DELETED:
268 break;
269 case CONCURRENT_SET_MOVED:
270 // Wait
271 RB_VM_LOCKING();
272
273 goto retry;
274 default: {
275 if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
276 // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
277 // Skip it and let the GC pass clean it up
278 break;
279 }
280
281 if (set->funcs->cmp(key, curr_key)) {
282 // We've found a match.
283 RB_GC_GUARD(set_obj);
284 return curr_key;
285 }
286
287 if (!continuation) {
288 return 0;
289 }
290
291 break;
292 }
293 }
294
295 idx = concurrent_set_probe_next(&probe);
296 }
297}
298
299VALUE
300rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
301{
302 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
303
304 // First attempt to find
305 {
306 VALUE result = rb_concurrent_set_find(set_obj_ptr, key);
307 if (result) return result;
308 }
309
310 // First time we need to call create, and store the hash
311 VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
312 RUBY_ASSERT(set_obj);
313
314 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
315 key = set->funcs->create(key, data);
316 VALUE hash = concurrent_set_hash(set, key);
317
318 struct concurrent_set_probe probe;
319 int idx;
320
321 goto start_search;
322
323retry:
324 // On retries we only need to load the hash object
325 set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
326 RUBY_ASSERT(set_obj);
327 set = RTYPEDDATA_GET_DATA(set_obj);
328
329 RUBY_ASSERT(hash == concurrent_set_hash(set, key));
330
331start_search:
332 idx = concurrent_set_probe_start(&probe, set, hash);
333
334 while (true) {
335 struct concurrent_set_entry *entry = &set->entries[idx];
336 VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
337 VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
338 bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
339
340 if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
341 // Reserve this slot for our hash value
342 curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
343 if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
344 // Lost race, retry same slot to check winner's hash
345 continue;
346 }
347
348 // CAS succeeded, so these are the values stored
349 curr_hash_and_flags = hash;
350 curr_hash = hash;
351
352 // Fall through to try to claim key
353 }
354
355 if (curr_hash != hash) {
356 goto probe_next;
357 }
358
359 VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
360
361 switch (curr_key) {
362 case CONCURRENT_SET_EMPTY: {
363 rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
364
365 // Load_factor reached at 75% full. ex: prev_size: 32, capacity: 64, load_factor: 50%.
366 bool load_factor_reached = (uint64_t)(prev_size * 4) >= (uint64_t)(set->capacity * 3);
367
368 if (UNLIKELY(load_factor_reached)) {
369 concurrent_set_try_resize(set_obj, set_obj_ptr);
370 goto retry;
371 }
372
373 VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
374 if (prev_key == CONCURRENT_SET_EMPTY) {
375 RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key);
376 RB_GC_GUARD(set_obj);
377 return key;
378 }
379 else {
380 // Entry was not inserted.
381 rbimpl_atomic_sub(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
382
383 // Another thread won the race, try again at the same location.
384 continue;
385 }
386 }
387 case CONCURRENT_SET_DELETED:
388 break;
389 case CONCURRENT_SET_MOVED:
390 // Wait
391 RB_VM_LOCKING();
392 goto retry;
393 default:
394 // We're never GC during our search
395 // If the continuation bit wasn't set at the start of our search,
396 // any concurrent find with the same hash value would also look at
397 // this location and try to swap curr_key
398 if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
399 if (continuation) {
400 goto probe_next;
401 }
402 rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
403 continue;
404 }
405
406 if (set->funcs->cmp(key, curr_key)) {
407 // We've found a live match.
408 RB_GC_GUARD(set_obj);
409
410 // We created key using set->funcs->create, but we didn't end
411 // up inserting it into the set. Free it here to prevent memory
412 // leaks.
413 if (set->funcs->free) set->funcs->free(key);
414
415 return curr_key;
416 }
417 break;
418 }
419
420 probe_next:
421 RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY);
422 concurrent_set_mark_continuation(entry, curr_hash_and_flags);
423 idx = concurrent_set_probe_next(&probe);
424 }
425}
426
427static void
428concurrent_set_delete_entry_locked(struct concurrent_set *set, struct concurrent_set_entry *entry)
429{
430 ASSERT_vm_locking_with_barrier();
431
432 if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) {
433 entry->hash = CONCURRENT_SET_CONTINUATION_BIT;
434 entry->key = CONCURRENT_SET_DELETED;
435 set->deleted_entries++;
436 }
437 else {
438 entry->hash = CONCURRENT_SET_EMPTY;
439 entry->key = CONCURRENT_SET_EMPTY;
440 set->size--;
441 }
442}
443
444VALUE
445rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
446{
447 ASSERT_vm_locking_with_barrier();
448
449 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
450
451 VALUE hash = concurrent_set_hash(set, key);
452
453 struct concurrent_set_probe probe;
454 int idx = concurrent_set_probe_start(&probe, set, hash);
455
456 while (true) {
457 struct concurrent_set_entry *entry = &set->entries[idx];
458 VALUE curr_key = entry->key;
459
460 switch (curr_key) {
461 case CONCURRENT_SET_EMPTY:
462 // We didn't find our entry to delete.
463 return 0;
464 case CONCURRENT_SET_DELETED:
465 break;
466 case CONCURRENT_SET_MOVED:
467 rb_bug("rb_concurrent_set_delete_by_identity: moved entry");
468 break;
469 default:
470 if (key == curr_key) {
471 RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash);
472 concurrent_set_delete_entry_locked(set, entry);
473 return curr_key;
474 }
475 break;
476 }
477
478 idx = concurrent_set_probe_next(&probe);
479 }
480}
481
482void
483rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data)
484{
485 ASSERT_vm_locking_with_barrier();
486
487 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
488
489 for (unsigned int i = 0; i < set->capacity; i++) {
490 struct concurrent_set_entry *entry = &set->entries[i];
491 VALUE key = entry->key;
492
493 switch (key) {
494 case CONCURRENT_SET_EMPTY:
495 case CONCURRENT_SET_DELETED:
496 continue;
497 case CONCURRENT_SET_MOVED:
498 rb_bug("rb_concurrent_set_foreach_with_replace: moved entry");
499 break;
500 default: {
501 int ret = callback(&entry->key, data);
502 switch (ret) {
503 case ST_STOP:
504 return;
505 case ST_DELETE:
506 concurrent_set_delete_entry_locked(set, entry);
507 break;
508 }
509 break;
510 }
511 }
512 }
513}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
Atomic operations.
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:175
#define xfree
Old name of ruby_xfree.
Definition xmalloc.h:58
#define ZALLOC_N
Old name of RB_ZALLOC_N.
Definition memory.h:401
#define RB_GC_GUARD(v)
Prevents premature destruction of local objects.
Definition memory.h:167
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:508
static bool RB_SPECIAL_CONST_P(VALUE obj)
Checks if the given object is of enum ruby_special_consts.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:208
const char * wrap_struct_name
Name of structs of this kind.
Definition rtypeddata.h:215
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40