Ruby 3.5.0dev (2025-09-17 revision 6094cb51136418a7b545baa84b6ede0aaeb2eaac)
concurrent_set.c (6094cb51136418a7b545baa84b6ede0aaeb2eaac)
1#include "internal.h"
2#include "internal/gc.h"
3#include "internal/concurrent_set.h"
4#include "ruby_atomic.h"
5#include "ruby/atomic.h"
6#include "vm_sync.h"
7
8enum concurrent_set_special_values {
9 CONCURRENT_SET_EMPTY,
10 CONCURRENT_SET_DELETED,
11 CONCURRENT_SET_MOVED,
12 CONCURRENT_SET_SPECIAL_VALUE_COUNT
13};
14
16 VALUE hash;
17 VALUE key;
18};
19
21 rb_atomic_t size;
22 unsigned int capacity;
23 unsigned int deleted_entries;
24 const struct rb_concurrent_set_funcs *funcs;
25 struct concurrent_set_entry *entries;
26};
27
28static void
29concurrent_set_free(void *ptr)
30{
31 struct concurrent_set *set = ptr;
32 xfree(set->entries);
33}
34
35static size_t
36concurrent_set_size(const void *ptr)
37{
38 const struct concurrent_set *set = ptr;
39 return sizeof(struct concurrent_set) +
40 (set->capacity * sizeof(struct concurrent_set_entry));
41}
42
43/* Hack: Though it would be trivial, we're intentionally avoiding WB-protecting
44 * this object. This prevents the object from aging and ensures it can always be
45 * collected in a minor GC.
46 * Longer term this deserves a better way to reclaim memory promptly.
47 */
48static void
49concurrent_set_mark(void *ptr)
50{
51 (void)ptr;
52}
53
54static const rb_data_type_t concurrent_set_type = {
55 .wrap_struct_name = "VM/concurrent_set",
56 .function = {
57 .dmark = concurrent_set_mark,
58 .dfree = concurrent_set_free,
59 .dsize = concurrent_set_size,
60 },
61 /* Hack: NOT WB_PROTECTED on purpose (see above) */
62 .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_EMBEDDABLE
63};
64
66rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity)
67{
68 struct concurrent_set *set;
69 VALUE obj = TypedData_Make_Struct(0, struct concurrent_set, &concurrent_set_type, set);
70 set->funcs = funcs;
71 set->entries = ZALLOC_N(struct concurrent_set_entry, capacity);
72 set->capacity = capacity;
73 return obj;
74}
75
77rb_concurrent_set_size(VALUE set_obj)
78{
79 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
80
81 return RUBY_ATOMIC_LOAD(set->size);
82}
83
85 int idx;
86 int d;
87 int mask;
88};
89
90static int
91concurrent_set_probe_start(struct concurrent_set_probe *probe, struct concurrent_set *set, VALUE hash)
92{
93 RUBY_ASSERT((set->capacity & (set->capacity - 1)) == 0);
94 probe->d = 0;
95 probe->mask = set->capacity - 1;
96 probe->idx = hash & probe->mask;
97 return probe->idx;
98}
99
100static int
101concurrent_set_probe_next(struct concurrent_set_probe *probe)
102{
103 probe->d++;
104 probe->idx = (probe->idx + probe->d) & probe->mask;
105 return probe->idx;
106}
107
108static void
109concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
110{
111 // Check if another thread has already resized.
112 if (RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr) != old_set_obj) {
113 return;
114 }
115
116 struct concurrent_set *old_set = RTYPEDDATA_GET_DATA(old_set_obj);
117
118 // This may overcount by up to the number of threads concurrently attempting to insert
119 // GC may also happen between now and the set being rebuilt
120 int expected_size = RUBY_ATOMIC_LOAD(old_set->size) - old_set->deleted_entries;
121
122 struct concurrent_set_entry *old_entries = old_set->entries;
123 int old_capacity = old_set->capacity;
124 int new_capacity = old_capacity * 2;
125 if (new_capacity > expected_size * 8) {
126 new_capacity = old_capacity / 2;
127 }
128 else if (new_capacity > expected_size * 4) {
129 new_capacity = old_capacity;
130 }
131
132 // May cause GC and therefore deletes, so must hapen first.
133 VALUE new_set_obj = rb_concurrent_set_new(old_set->funcs, new_capacity);
134 struct concurrent_set *new_set = RTYPEDDATA_GET_DATA(new_set_obj);
135
136 for (int i = 0; i < old_capacity; i++) {
137 struct concurrent_set_entry *entry = &old_entries[i];
138 VALUE key = RUBY_ATOMIC_VALUE_EXCHANGE(entry->key, CONCURRENT_SET_MOVED);
139 RUBY_ASSERT(key != CONCURRENT_SET_MOVED);
140
141 if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
142 if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
143
144 VALUE hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
145 if (hash == 0) {
146 // Either in-progress insert or extremely unlikely 0 hash.
147 // Re-calculate the hash.
148 hash = old_set->funcs->hash(key);
149 }
150 RUBY_ASSERT(hash == old_set->funcs->hash(key));
151
152 // Insert key into new_set.
153 struct concurrent_set_probe probe;
154 int idx = concurrent_set_probe_start(&probe, new_set, hash);
155
156 while (true) {
157 struct concurrent_set_entry *entry = &new_set->entries[idx];
158
159 if (entry->key == CONCURRENT_SET_EMPTY) {
160 new_set->size++;
161
162 RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
163 RUBY_ASSERT(entry->hash == 0);
164
165 entry->key = key;
166 entry->hash = hash;
167 break;
168 }
169 else {
170 RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
171 }
172
173 idx = concurrent_set_probe_next(&probe);
174 }
175 }
176
177 RUBY_ATOMIC_VALUE_SET(*set_obj_ptr, new_set_obj);
178
179 RB_GC_GUARD(old_set_obj);
180}
181
182static void
183concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr)
184{
185 RB_VM_LOCKING() {
186 concurrent_set_try_resize_without_locking(old_set_obj, set_obj_ptr);
187 }
188}
189
190VALUE
191rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
192{
193 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
194
195 VALUE set_obj;
196 VALUE hash = 0;
197
198 retry:
199 set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr);
200 RUBY_ASSERT(set_obj);
201 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
202
203 if (hash == 0) {
204 // We don't need to recompute the hash on every retry because it should
205 // never change.
206 hash = set->funcs->hash(key);
207 }
208 RUBY_ASSERT(hash == set->funcs->hash(key));
209
210 struct concurrent_set_probe probe;
211 int idx = concurrent_set_probe_start(&probe, set, hash);
212
213 while (true) {
214 struct concurrent_set_entry *entry = &set->entries[idx];
215 VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);
216
217 switch (curr_key) {
218 case CONCURRENT_SET_EMPTY:
219 return 0;
220 case CONCURRENT_SET_DELETED:
221 break;
222 case CONCURRENT_SET_MOVED:
223 // Wait
224 RB_VM_LOCKING();
225
226 goto retry;
227 default: {
228 VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
229 if (curr_hash != 0 && curr_hash != hash) break;
230
231 if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
232 // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
233 // Skip it and mark it as deleted.
234 RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
235 break;
236 }
237
238 if (set->funcs->cmp(key, curr_key)) {
239 // We've found a match.
240 RB_GC_GUARD(set_obj);
241 return curr_key;
242 }
243
244 break;
245 }
246 }
247
248 idx = concurrent_set_probe_next(&probe);
249 }
250}
251
252VALUE
253rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
254{
255 RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
256
257 bool inserting = false;
258 VALUE set_obj;
259 VALUE hash = 0;
260
261 retry:
262 set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr);
263 RUBY_ASSERT(set_obj);
264 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
265
266 if (hash == 0) {
267 // We don't need to recompute the hash on every retry because it should
268 // never change.
269 hash = set->funcs->hash(key);
270 }
271 RUBY_ASSERT(hash == set->funcs->hash(key));
272
273 struct concurrent_set_probe probe;
274 int idx = concurrent_set_probe_start(&probe, set, hash);
275
276 while (true) {
277 struct concurrent_set_entry *entry = &set->entries[idx];
278 VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);
279
280 switch (curr_key) {
281 case CONCURRENT_SET_EMPTY: {
282 // Not in set
283 if (!inserting) {
284 key = set->funcs->create(key, data);
285 RUBY_ASSERT(hash == set->funcs->hash(key));
286 inserting = true;
287 }
288
289 rb_atomic_t prev_size = RUBY_ATOMIC_FETCH_ADD(set->size, 1);
290
291 if (UNLIKELY(prev_size > set->capacity / 2)) {
292 concurrent_set_try_resize(set_obj, set_obj_ptr);
293
294 goto retry;
295 }
296
297 curr_key = RUBY_ATOMIC_VALUE_CAS(entry->key, CONCURRENT_SET_EMPTY, key);
298 if (curr_key == CONCURRENT_SET_EMPTY) {
299 RUBY_ATOMIC_VALUE_SET(entry->hash, hash);
300
301 RB_GC_GUARD(set_obj);
302 return key;
303 }
304 else {
305 // Entry was not inserted.
306 RUBY_ATOMIC_DEC(set->size);
307
308 // Another thread won the race, try again at the same location.
309 continue;
310 }
311 }
312 case CONCURRENT_SET_DELETED:
313 break;
314 case CONCURRENT_SET_MOVED:
315 // Wait
316 RB_VM_LOCKING();
317
318 goto retry;
319 default: {
320 VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash);
321 if (curr_hash != 0 && curr_hash != hash) break;
322
323 if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
324 // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
325 // Skip it and mark it as deleted.
326 RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED);
327 break;
328 }
329
330 if (set->funcs->cmp(key, curr_key)) {
331 // We've found a match.
332 RB_GC_GUARD(set_obj);
333
334 if (inserting) {
335 // We created key using set->funcs->create, but we didn't end
336 // up inserting it into the set. Free it here to prevent memory
337 // leaks.
338 if (set->funcs->free) set->funcs->free(key);
339 }
340
341 return curr_key;
342 }
343
344 break;
345 }
346 }
347
348 idx = concurrent_set_probe_next(&probe);
349 }
350}
351
352VALUE
353rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
354{
355 // Assume locking and barrier (which there is no assert for).
356 ASSERT_vm_locking();
357
358 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
359
360 VALUE hash = set->funcs->hash(key);
361
362 struct concurrent_set_probe probe;
363 int idx = concurrent_set_probe_start(&probe, set, hash);
364
365 while (true) {
366 struct concurrent_set_entry *entry = &set->entries[idx];
367 VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key);
368
369 switch (curr_key) {
370 case CONCURRENT_SET_EMPTY:
371 // We didn't find our entry to delete.
372 return 0;
373 case CONCURRENT_SET_DELETED:
374 break;
375 case CONCURRENT_SET_MOVED:
376 rb_bug("rb_concurrent_set_delete_by_identity: moved entry");
377 break;
378 default:
379 if (key == curr_key) {
380 entry->key = CONCURRENT_SET_DELETED;
381 set->deleted_entries++;
382 return curr_key;
383 }
384 break;
385 }
386
387 idx = concurrent_set_probe_next(&probe);
388 }
389}
390
391void
392rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data)
393{
394 // Assume locking and barrier (which there is no assert for).
395 ASSERT_vm_locking();
396
397 struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
398
399 for (unsigned int i = 0; i < set->capacity; i++) {
400 struct concurrent_set_entry *entry = &set->entries[i];
401 VALUE key = set->entries[i].key;
402
403 switch (key) {
404 case CONCURRENT_SET_EMPTY:
405 case CONCURRENT_SET_DELETED:
406 continue;
407 case CONCURRENT_SET_MOVED:
408 rb_bug("rb_concurrent_set_foreach_with_replace: moved entry");
409 break;
410 default: {
411 int ret = callback(&entry->key, data);
412 switch (ret) {
413 case ST_STOP:
414 return;
415 case ST_DELETE:
416 set->entries[i].key = CONCURRENT_SET_DELETED;
417 set->deleted_entries++;
418 break;
419 }
420 break;
421 }
422 }
423 }
424}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
Atomic operations.
#define RUBY_ATOMIC_VALUE_CAS(var, oldval, newval)
Identical to RUBY_ATOMIC_CAS, except it expects its arguments are VALUE.
Definition atomic.h:406
#define RUBY_ATOMIC_VALUE_SET(var, val)
Identical to RUBY_ATOMIC_SET, except it expects its arguments are VALUE.
Definition atomic.h:378
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_FETCH_ADD(var, val)
Atomically replaces the value pointed by var with the result of addition of val to the old value of v...
Definition atomic.h:118
#define RUBY_ATOMIC_VALUE_EXCHANGE(var, val)
Identical to RUBY_ATOMIC_EXCHANGE, except it expects its arguments are VALUE.
Definition atomic.h:392
#define RUBY_ATOMIC_DEC(var)
Atomically decrements the value pointed by var.
Definition atomic.h:223
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:175
#define xfree
Old name of ruby_xfree.
Definition xmalloc.h:58
#define ZALLOC_N
Old name of RB_ZALLOC_N.
Definition memory.h:401
#define RB_GC_GUARD(v)
Prevents premature destruction of local objects.
Definition memory.h:167
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:503
static bool RB_SPECIAL_CONST_P(VALUE obj)
Checks if the given object is of enum ruby_special_consts.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:202
const char * wrap_struct_name
Name of structs of this kind.
Definition rtypeddata.h:209
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40