Ruby 3.5.0dev (2025-10-22 revision 66c12bd5194396fab66338a87ca8f2d89f1d66d0)
concurrent_set.c (66c12bd5194396fab66338a87ca8f2d89f1d66d0)
1#include "internal.h"
2#include "internal/gc.h"
3#include "internal/concurrent_set.h"
4#include "ruby/atomic.h"
5#include "vm_sync.h"
6
7enum concurrent_set_special_values {
8 CONCURRENT_SET_EMPTY,
9 CONCURRENT_SET_DELETED,
10 CONCURRENT_SET_MOVED,
11 CONCURRENT_SET_SPECIAL_VALUE_COUNT
12};
13
15 VALUE hash;
16 VALUE key;
17};
18
20 rb_atomic_t size;
21 unsigned int capacity;
22 unsigned int deleted_entries;
23 const struct rb_concurrent_set_funcs *funcs;
24 struct concurrent_set_entry *entries;
25};
26
27static void
28concurrent_set_free(void *ptr)
29{
30 struct concurrent_set *set = ptr;
31 xfree(set->entries);
32}
33
34static size_t
35concurrent_set_size(const void *ptr)
36{
37 const struct concurrent_set *set = ptr;
38 return sizeof(struct concurrent_set) +
39 (set->capacity * sizeof(struct concurrent_set_entry));
40}
41
42/* Hack: Though it would be trivial, we're intentionally avoiding WB-protecting
43 * this object. This prevents the object from aging and ensures it can always be
44 * collected in a minor GC.
45 * Longer term this deserves a better way to reclaim memory promptly.
46 */
47static void
48concurrent_set_mark(void *ptr)
49{
50 (void)ptr;
51}
52
53static const rb_data_type_t concurrent_set_type = {
54 .wrap_struct_name = "VM/concurrent_set",
55 .function = {
56 .dmark = concurrent_set_mark,
57 .dfree = concurrent_set_free,
58 .dsize = concurrent_set_size,
59 },
60 /* Hack: NOT WB_PROTECTED on purpose (see above) */
61 .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_EMBEDDABLE
62};
63
65rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity)
66{
67 struct concurrent_set *set;
68 VALUE obj = TypedData_Make_Struct(0, struct concurrent_set, &concurrent_set_type, set);
69 set->funcs = funcs;
70 set->entries = ZALLOC_N(struct concurrent_set_entry, capacity);
71 set->capacity = capacity;
72 return obj;
73}
74
76rb_concurrent_set_size(VALUE set_obj)
77{
78 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
79
80 return RUBY_ATOMIC_LOAD(set->size);
81}
82
84 int idx;
85 int d;
86 int mask;
87};
88
89static int
90concurrent_set_probe_start(struct concurrent_set_probe *probe, struct concurrent_set *set, VALUE hash)
91{
92 RUBY_ASSERT((set->capacity & (set->capacity - 1)) == 0);
93 probe->d = 0;
94 probe->mask = set->capacity - 1;
95 probe->idx = hash & probe->mask;
96 return probe->idx;
97}
98
99static int
100concurrent_set_probe_next(struct concurrent_set_probe *probe)
101{
102 probe->d++;
103 probe->idx = (probe->idx + probe->d) & probe->mask;
104 return probe->idx;
105}
106
107static void
108concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
109{
110 // Check if another thread has already resized.
111 if (rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE) != old_set_obj) {
112 return;
113 }
114
115 struct concurrent_set *old_set = RTYPEDDATA_GET_DATA(old_set_obj);
116
117 // This may overcount by up to the number of threads concurrently attempting to insert
118 // GC may also happen between now and the set being rebuilt
119 int expected_size = rbimpl_atomic_load(&old_set->size, RBIMPL_ATOMIC_RELAXED) - old_set->deleted_entries;
120
121 struct concurrent_set_entry *old_entries = old_set->entries;
122 int old_capacity = old_set->capacity;
123 int new_capacity = old_capacity * 2;
124 if (new_capacity > expected_size * 8) {
125 new_capacity = old_capacity / 2;
126 }
127 else if (new_capacity > expected_size * 4) {
128 new_capacity = old_capacity;
129 }
130
131 // May cause GC and therefore deletes, so must hapen first.
132 VALUE new_set_obj = rb_concurrent_set_new(old_set->funcs, new_capacity);
133 struct concurrent_set *new_set = RTYPEDDATA_GET_DATA(new_set_obj);
134
135 for (int i = 0; i < old_capacity; i++) {
136 struct concurrent_set_entry *entry = &old_entries[i];
137 VALUE key = rbimpl_atomic_value_exchange(&entry->key, CONCURRENT_SET_MOVED, RBIMPL_ATOMIC_ACQUIRE);
138 RUBY_ASSERT(key != CONCURRENT_SET_MOVED);
139
140 if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
141 if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
142
143 VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
144 if (hash == 0) {
145 // Either in-progress insert or extremely unlikely 0 hash.
146 // Re-calculate the hash.
147 hash = old_set->funcs->hash(key);
148 }
149 RUBY_ASSERT(hash == old_set->funcs->hash(key));
150
151 // Insert key into new_set.
152 struct concurrent_set_probe probe;
153 int idx = concurrent_set_probe_start(&probe, new_set, hash);
154
155 while (true) {
156 struct concurrent_set_entry *entry = &new_set->entries[idx];
157
158 if (entry->key == CONCURRENT_SET_EMPTY) {
159 new_set->size++;
160
161 RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
162 RUBY_ASSERT(entry->hash == 0);
163
164 entry->key = key;
165 entry->hash = hash;
166 break;
167 }
168 else {
169 RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
170 }
171
172 idx = concurrent_set_probe_next(&probe);
173 }
174 }
175
176 rbimpl_atomic_value_store(set_obj_ptr, new_set_obj, RBIMPL_ATOMIC_RELEASE);
177
178 RB_GC_GUARD(old_set_obj);
179}
180
181static void
182concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr)
183{
184 RB_VM_LOCKING() {
185 concurrent_set_try_resize_without_locking(old_set_obj, set_obj_ptr);
186 }
187}
188
189VALUE
190rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
191{
192 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
193
194 VALUE set_obj;
195 VALUE hash = 0;
196
197 retry:
198 set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
199 RUBY_ASSERT(set_obj);
200 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
201
202 if (hash == 0) {
203 // We don't need to recompute the hash on every retry because it should
204 // never change.
205 hash = set->funcs->hash(key);
206 }
207 RUBY_ASSERT(hash == set->funcs->hash(key));
208
209 struct concurrent_set_probe probe;
210 int idx = concurrent_set_probe_start(&probe, set, hash);
211
212 while (true) {
213 struct concurrent_set_entry *entry = &set->entries[idx];
214 VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
215
216 switch (curr_key) {
217 case CONCURRENT_SET_EMPTY:
218 return 0;
219 case CONCURRENT_SET_DELETED:
220 break;
221 case CONCURRENT_SET_MOVED:
222 // Wait
223 RB_VM_LOCKING();
224
225 goto retry;
226 default: {
227 VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
228 if (curr_hash != 0 && curr_hash != hash) break;
229
230 if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
231 // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
232 // Skip it and mark it as deleted.
233 rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
234 break;
235 }
236
237 if (set->funcs->cmp(key, curr_key)) {
238 // We've found a match.
239 RB_GC_GUARD(set_obj);
240 return curr_key;
241 }
242
243 break;
244 }
245 }
246
247 idx = concurrent_set_probe_next(&probe);
248 }
249}
250
251VALUE
252rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
253{
254 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
255
256 bool inserting = false;
257 VALUE set_obj;
258 VALUE hash = 0;
259
260 retry:
261 set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
262 RUBY_ASSERT(set_obj);
263 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
264
265 if (hash == 0) {
266 // We don't need to recompute the hash on every retry because it should
267 // never change.
268 hash = set->funcs->hash(key);
269 }
270 RUBY_ASSERT(hash == set->funcs->hash(key));
271
272 struct concurrent_set_probe probe;
273 int idx = concurrent_set_probe_start(&probe, set, hash);
274
275 while (true) {
276 struct concurrent_set_entry *entry = &set->entries[idx];
277 VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
278
279 switch (curr_key) {
280 case CONCURRENT_SET_EMPTY: {
281 // Not in set
282 if (!inserting) {
283 key = set->funcs->create(key, data);
284 RUBY_ASSERT(hash == set->funcs->hash(key));
285 inserting = true;
286 }
287
288 rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
289
290 if (UNLIKELY(prev_size > set->capacity / 2)) {
291 concurrent_set_try_resize(set_obj, set_obj_ptr);
292
293 goto retry;
294 }
295
296 curr_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
297 if (curr_key == CONCURRENT_SET_EMPTY) {
298 rbimpl_atomic_value_store(&entry->hash, hash, RBIMPL_ATOMIC_RELAXED);
299
300 RB_GC_GUARD(set_obj);
301 return key;
302 }
303 else {
304 // Entry was not inserted.
305 rbimpl_atomic_sub(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
306
307 // Another thread won the race, try again at the same location.
308 continue;
309 }
310 }
311 case CONCURRENT_SET_DELETED:
312 break;
313 case CONCURRENT_SET_MOVED:
314 // Wait
315 RB_VM_LOCKING();
316
317 goto retry;
318 default: {
319 VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
320 if (curr_hash != 0 && curr_hash != hash) break;
321
322 if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
323 // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
324 // Skip it and mark it as deleted.
325 rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
326 break;
327 }
328
329 if (set->funcs->cmp(key, curr_key)) {
330 // We've found a match.
331 RB_GC_GUARD(set_obj);
332
333 if (inserting) {
334 // We created key using set->funcs->create, but we didn't end
335 // up inserting it into the set. Free it here to prevent memory
336 // leaks.
337 if (set->funcs->free) set->funcs->free(key);
338 }
339
340 return curr_key;
341 }
342
343 break;
344 }
345 }
346
347 idx = concurrent_set_probe_next(&probe);
348 }
349}
350
351VALUE
352rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
353{
354 ASSERT_vm_locking_with_barrier();
355
356 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
357
358 VALUE hash = set->funcs->hash(key);
359
360 struct concurrent_set_probe probe;
361 int idx = concurrent_set_probe_start(&probe, set, hash);
362
363 while (true) {
364 struct concurrent_set_entry *entry = &set->entries[idx];
365 VALUE curr_key = entry->key;
366
367 switch (curr_key) {
368 case CONCURRENT_SET_EMPTY:
369 // We didn't find our entry to delete.
370 return 0;
371 case CONCURRENT_SET_DELETED:
372 break;
373 case CONCURRENT_SET_MOVED:
374 rb_bug("rb_concurrent_set_delete_by_identity: moved entry");
375 break;
376 default:
377 if (key == curr_key) {
378 entry->key = CONCURRENT_SET_DELETED;
379 set->deleted_entries++;
380 return curr_key;
381 }
382 break;
383 }
384
385 idx = concurrent_set_probe_next(&probe);
386 }
387}
388
389void
390rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data)
391{
392 ASSERT_vm_locking_with_barrier();
393
394 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
395
396 for (unsigned int i = 0; i < set->capacity; i++) {
397 struct concurrent_set_entry *entry = &set->entries[i];
398 VALUE key = set->entries[i].key;
399
400 switch (key) {
401 case CONCURRENT_SET_EMPTY:
402 case CONCURRENT_SET_DELETED:
403 continue;
404 case CONCURRENT_SET_MOVED:
405 rb_bug("rb_concurrent_set_foreach_with_replace: moved entry");
406 break;
407 default: {
408 int ret = callback(&entry->key, data);
409 switch (ret) {
410 case ST_STOP:
411 return;
412 case ST_DELETE:
413 set->entries[i].key = CONCURRENT_SET_DELETED;
414 set->deleted_entries++;
415 break;
416 }
417 break;
418 }
419 }
420 }
421}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
Atomic operations.
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:175
#define xfree
Old name of ruby_xfree.
Definition xmalloc.h:58
#define ZALLOC_N
Old name of RB_ZALLOC_N.
Definition memory.h:401
#define RB_GC_GUARD(v)
Prevents premature destruction of local objects.
Definition memory.h:167
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:503
static bool RB_SPECIAL_CONST_P(VALUE obj)
Checks if the given object is of enum ruby_special_consts.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:202
const char * wrap_struct_name
Name of structs of this kind.
Definition rtypeddata.h:209
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40