egglog_core_relations/containers/
mod.rs

1//! Support for containers
2//!
3//! Containers behave a lot like base values. They are implemented differently because
4//! their ids share a space with other Ids in the egraph and as a result, their ids need to be
5//! sparse.
6//!
7//! This is a relatively "eagler" implementation of containers, reflecting egglog's current
8//! semantics. One could imagine a variant of containers in which they behave more like egglog
9//! functions than base values.
10
11use std::{
12    any::{Any, TypeId},
13    hash::{Hash, Hasher},
14    ops::Deref,
15};
16
17use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
18use crossbeam_queue::SegQueue;
19use dashmap::SharedValue;
20use rayon::{
21    iter::{ParallelBridge, ParallelIterator},
22    prelude::*,
23};
24use rustc_hash::FxHasher;
25
26use crate::{
27    ColumnId, CounterId, ExecutionState, Offset, SubsetRef, TableId, TaggedRowBuffer, Value,
28    WrappedTable,
29    common::{DashMap, IndexSet, SubsetTracker},
30    parallel_heuristics::{parallelize_inter_container_op, parallelize_intra_container_op},
31    table_spec::Rebuilder,
32};
33
34#[cfg(test)]
35mod tests;
36
37define_id!(pub ContainerValueId, u32, "an identifier for containers");
38
39pub trait MergeFn:
40    Fn(&mut ExecutionState, Value, Value) -> Value + dyn_clone::DynClone + Send + Sync
41{
42}
43impl<T: Fn(&mut ExecutionState, Value, Value) -> Value + Clone + Send + Sync> MergeFn for T {}
44
45// Implements `Clone` for `Box<dyn MergeFn>`.
46dyn_clone::clone_trait_object!(MergeFn);
47
48#[derive(Clone, Default)]
49struct ContainerIds {
50    ids: IndexSet<TypeId>,
51}
52
53impl ContainerIds {
54    fn insert(&mut self, ty: TypeId) -> ContainerValueId {
55        if let Some(idx) = self.ids.get_index_of(&ty) {
56            ContainerValueId::from_usize(idx)
57        } else {
58            let idx = self.ids.len();
59            self.ids.insert(ty);
60            ContainerValueId::from_usize(idx)
61        }
62    }
63
64    fn get(&self, ty: &TypeId) -> Option<ContainerValueId> {
65        self.ids.get_index_of(ty).map(ContainerValueId::from_usize)
66    }
67}
68
69#[derive(Clone, Default)]
70pub struct ContainerValues {
71    subset_tracker: SubsetTracker,
72    container_ids: ContainerIds,
73    data: DenseIdMap<ContainerValueId, Box<dyn DynamicContainerEnv + Send + Sync>>,
74}
75
76/// Summary returned by container rebuild.
77///
78/// `changed` means some container entry changed during rebuild, either because
79/// its contents changed or because its outer id canonicalized.
80///
81/// `dirty_ids` is narrower: it records container ids whose semantics changed
82/// while their stored outer id stayed stable. Ordinary table rebuild already
83/// handles changed-id cases; these ids need a follow-up parent-row refresh.
84/// This includes containers that changed directly and containers whose
85/// contained containers changed in place.
86///
87/// For example, `l(vec-of(w(k(b))))` can rebuild to `l(vec-of(k(b)))` without
88/// changing the `Vec` id. The row is now newly matchable, but seminaive will
89/// miss it unless the parent row is retimestamped.
90#[derive(Clone, Default)]
91pub struct ContainerRebuildSummary {
92    changed: bool,
93    // Container ids whose semantics changed in a way that may not produce a
94    // fresh parent-row delta during ordinary table rebuild.
95    dirty_ids: IndexSet<Value>,
96}
97
98impl ContainerRebuildSummary {
99    /// Returns whether any container entry changed during rebuild.
100    pub fn changed(&self) -> bool {
101        self.changed
102    }
103
104    /// Returns the container ids whose parent rows may need retimestamping.
105    pub fn dirty_ids(&self) -> &IndexSet<Value> {
106        &self.dirty_ids
107    }
108
109    fn note_change(&mut self) {
110        self.changed = true;
111    }
112
113    fn note_dirty_id(&mut self, value: Value) {
114        self.changed = true;
115        self.dirty_ids.insert(value);
116    }
117
118    fn extend(&mut self, other: Self) {
119        self.changed |= other.changed;
120        self.dirty_ids.extend(other.dirty_ids);
121    }
122}
123
124impl ContainerValues {
125    pub fn new() -> Self {
126        Default::default()
127    }
128
129    fn get<C: ContainerValue>(&self) -> Option<&ContainerEnv<C>> {
130        let id = self.container_ids.get(&TypeId::of::<C>())?;
131        let res = self.data.get(id)?.as_any();
132        Some(res.downcast_ref::<ContainerEnv<C>>().unwrap())
133    }
134
135    /// Iterate over the containers of the given type.
136    pub fn for_each<C: ContainerValue>(&self, mut f: impl FnMut(&C, Value)) {
137        let Some(env) = self.get::<C>() else {
138            return;
139        };
140        for ent in env.to_id.iter() {
141            f(ent.key(), *ent.value());
142        }
143    }
144
145    /// Get the container associated with the value `val` in the database. The caller must know the
146    /// type of the container.
147    ///
148    /// The return type of this function may contain lock guards. Attempts to modify the contents
149    /// of the containers database may deadlock if the given guard has not been dropped.
150    pub fn get_val<C: ContainerValue>(&self, val: Value) -> Option<impl Deref<Target = C> + '_> {
151        self.get::<C>()?.get_container(val)
152    }
153
154    pub fn register_val<C: ContainerValue>(
155        &self,
156        container: C,
157        exec_state: &mut ExecutionState,
158    ) -> Value {
159        let env = self
160            .get::<C>()
161            .expect("must register container type before registering a value");
162        env.get_or_insert(&container, exec_state)
163    }
164
165    /// Apply the given rebuild to the contents of each container.
166    pub fn rebuild_all(
167        &mut self,
168        table_id: TableId,
169        table: &WrappedTable,
170        exec_state: &mut ExecutionState,
171    ) -> ContainerRebuildSummary {
172        let Some(rebuilder) = table.rebuilder(&[]) else {
173            return Default::default();
174        };
175        let to_scan = rebuilder.hint_col().map(|_| {
176            // We may attempt an incremental rebuild.
177            self.subset_tracker.recent_updates(table_id, table)
178        });
179        let mut summary = if parallelize_inter_container_op(self.data.next_id().index()) {
180            self.data
181                .iter_mut()
182                .zip(std::iter::repeat_with(|| exec_state.clone()))
183                .par_bridge()
184                .map(|((_, env), mut exec_state)| {
185                    env.apply_rebuild(
186                        table,
187                        &*rebuilder,
188                        to_scan.as_ref().map(|x| x.as_ref()),
189                        &mut exec_state,
190                    )
191                })
192                .reduce(ContainerRebuildSummary::default, |mut acc, summary| {
193                    acc.extend(summary);
194                    acc
195                })
196        } else {
197            let mut summary = ContainerRebuildSummary::default();
198            for (_, env) in self.data.iter_mut() {
199                summary.extend(env.apply_rebuild(
200                    table,
201                    &*rebuilder,
202                    to_scan.as_ref().map(|x| x.as_ref()),
203                    exec_state,
204                ));
205            }
206            summary
207        };
208        self.expand_dirty_id_closure(&mut summary);
209        summary
210    }
211
212    /// Add ancestor containers to the dirty-id set until it is transitively closed.
213    ///
214    /// A rebuild can change a container's semantics in place without changing
215    /// its id. If that container is itself stored inside another container,
216    /// the parent container has also changed semantically even though no direct
217    /// rebuild touched its contents. For example, with
218    /// `(p (vec-of (vec-of (w (b)))))` and `(rewrite (w x) x)`, the inner
219    /// `Vec` rebuilds in place to `vec-of (b)`. Without this closure, only the
220    /// inner `Vec` id is dirty; the outer `Vec` row is not retimestamped, so a
221    /// later rule like `(rewrite (p (vec-of (vec-of (b)))) (b))` can miss the
222    /// newly matchable parent row.
223    fn expand_dirty_id_closure(&self, summary: &mut ContainerRebuildSummary) {
224        let mut frontier = summary.dirty_ids.clone();
225        let mut seen = frontier.iter().copied().collect::<IndexSet<_>>();
226
227        while !frontier.is_empty() {
228            let mut next = IndexSet::default();
229            for (_, env) in self.data.iter() {
230                env.extend_containers_containing(&frontier, &mut next);
231            }
232            frontier.clear();
233            for value in next {
234                if seen.insert(value) {
235                    summary.note_dirty_id(value);
236                    frontier.insert(value);
237                }
238            }
239        }
240    }
241
242    /// Add a new container type to the given [`ContainerValue`] instance.
243    ///
244    /// Container types need a meaans of generating fresh ids (`id_counter`) along with a means of
245    /// merging conflicting ids (`merge_fn`).
246    pub fn register_type<C: ContainerValue>(
247        &mut self,
248        id_counter: CounterId,
249        merge_fn: impl MergeFn + 'static,
250    ) -> ContainerValueId {
251        let id = self.container_ids.insert(TypeId::of::<C>());
252        self.data.get_or_insert(id, || {
253            Box::new(ContainerEnv::<C>::new(Box::new(merge_fn), id_counter))
254        });
255        id
256    }
257}
258
259/// A trait implemented by container types.
260///
261/// Containers behave a lot like base values, but they include extra trait methods to support
262/// rebuilding of container contents and merging containers that become equal after a rebuild pass
263/// has taken place.
264pub trait ContainerValue: Hash + Eq + Clone + Send + Sync + 'static {
265    /// Rebuild an additional container in place according the the given [`Rebuilder`].
266    ///
267    /// If this method returns `false` then the container must not have been modified (i.e. it must
268    /// hash to the same value, and compare equal to a copy of itself before the call).
269    fn rebuild_contents(&mut self, rebuilder: &dyn Rebuilder) -> bool;
270
271    /// Iterate over the contents of the container.
272    ///
273    /// Note that containers can be more structured than just a sequence of values. This iterator
274    /// is used to populate an index that in turn is used to speed up rebuilds. If a value in the
275    /// container is eligible for a rebuild and it is not mentioned by this iterator, the outer
276    /// container registry may skip rebuilding this container.
277    fn iter(&self) -> impl Iterator<Item = Value> + '_;
278}
279
280pub trait DynamicContainerEnv: Any + dyn_clone::DynClone + Send + Sync {
281    fn as_any(&self) -> &dyn Any;
282    fn apply_rebuild(
283        &mut self,
284        table: &WrappedTable,
285        rebuilder: &dyn Rebuilder,
286        subset: Option<SubsetRef>,
287        exec_state: &mut ExecutionState,
288    ) -> ContainerRebuildSummary;
289    /// Add ids for containers in this environment that contain any `values`.
290    ///
291    /// This uses the container content index populated from
292    /// [`ContainerValue::iter`] and lets callers climb from dirty child ids to
293    /// all directly containing parent container ids.
294    fn extend_containers_containing(&self, values: &IndexSet<Value>, out: &mut IndexSet<Value>);
295}
296
297// Implements `Clone` for `Box<dyn DynamicContainerEnv>`.
298dyn_clone::clone_trait_object!(DynamicContainerEnv);
299
300fn hash_container(container: &impl ContainerValue) -> u64 {
301    let mut hasher = FxHasher::default();
302    container.hash(&mut hasher);
303    hasher.finish()
304}
305
306#[derive(Clone)]
307struct ContainerEnv<C: Eq + Hash> {
308    merge_fn: Box<dyn MergeFn>,
309    counter: CounterId,
310    to_id: DashMap<C, Value>,
311    to_container: DashMap<Value, (usize /* hash code */, usize /* map */)>,
312    /// Map from a Value to the set of ids of containers that contain that value.
313    val_index: DashMap<Value, IndexSet<Value>>,
314}
315
316impl<C: ContainerValue> DynamicContainerEnv for ContainerEnv<C> {
317    fn as_any(&self) -> &dyn Any {
318        self
319    }
320
321    fn apply_rebuild(
322        &mut self,
323        table: &WrappedTable,
324        rebuilder: &dyn Rebuilder,
325        subset: Option<SubsetRef>,
326        exec_state: &mut ExecutionState,
327    ) -> ContainerRebuildSummary {
328        if let Some(subset) = subset
329            && incremental_rebuild(
330                subset.size(),
331                self.to_id.len(),
332                parallelize_intra_container_op(self.to_id.len()),
333            )
334        {
335            return self.apply_rebuild_incremental(
336                table,
337                rebuilder,
338                exec_state,
339                subset,
340                rebuilder.hint_col().unwrap(),
341            );
342        }
343        self.apply_rebuild_nonincremental(rebuilder, exec_state)
344    }
345
346    fn extend_containers_containing(&self, values: &IndexSet<Value>, out: &mut IndexSet<Value>) {
347        for value in values {
348            if let Some(containers) = self.val_index.get(value) {
349                out.extend(containers.iter().copied());
350            }
351        }
352    }
353}
354
355impl<C: ContainerValue> ContainerEnv<C> {
356    pub fn new(merge_fn: Box<dyn MergeFn>, counter: CounterId) -> Self {
357        Self {
358            merge_fn,
359            counter,
360            to_id: DashMap::default(),
361            to_container: DashMap::default(),
362            val_index: DashMap::default(),
363        }
364    }
365
366    fn get_or_insert(&self, container: &C, exec_state: &mut ExecutionState) -> Value {
367        if let Some(value) = self.to_id.get(container) {
368            return *value;
369        }
370
371        // Time to insert a new mapping. First, insert into `to_container`: the moment that we
372        // insert a new value into `to_id`, someone else can return it from another call to
373        // `get_or_insert` and then feed that value to `get_container`.
374
375        let value = Value::from_usize(exec_state.inc_counter(self.counter));
376        let target_map = self.to_id.determine_map(container);
377        // This assertion is here because in parallel rebuilding we use `to_container` to
378        // compute the intended shard for to_id, because we have a mutable borrow of
379        // `to_container` that means we cannot call `determine_map` on `to_id`.
380        debug_assert_eq!(
381            target_map,
382            self.to_container
383                .determine_shard(hash_container(container) as usize)
384        );
385        self.to_container
386            .insert(value, (hash_container(container) as usize, target_map));
387
388        // Now insert into `to_id`, handling the case where a different thread is doing the same
389        // thing.
390        match self.to_id.entry(container.clone()) {
391            dashmap::Entry::Vacant(vac) => {
392                // Common case: insert the mapping in to_id and update the index.
393                vac.insert(value);
394                for val in container.iter() {
395                    self.val_index.entry(val).or_default().insert(value);
396                }
397                value
398            }
399            dashmap::Entry::Occupied(occ) => {
400                // Someone inserted `container` into the mapping since we looked it up. Remove the
401                // mapping that we inserted into `to_container` (we won't use it), and instead
402                // return the "winning" value.
403                let res = *occ.get();
404                std::mem::drop(occ); // drop the lock.
405                self.to_container.remove(&value);
406                res
407            }
408        }
409    }
410
411    fn insert_owned(&self, container: C, value: Value, exec_state: &mut ExecutionState) -> Value {
412        let hc = hash_container(&container);
413        let target_map = self.to_id.determine_map(&container);
414        match self.to_id.entry(container) {
415            dashmap::Entry::Occupied(mut occ) => {
416                let result = (self.merge_fn)(exec_state, *occ.get(), value);
417                let old_val = *occ.get();
418                if result != old_val {
419                    self.to_container.remove(&old_val);
420                    self.to_container.insert(result, (hc as usize, target_map));
421                    *occ.get_mut() = result;
422                    for val in occ.key().iter() {
423                        let mut index = self.val_index.entry(val).or_default();
424                        index.swap_remove(&old_val);
425                        index.insert(result);
426                    }
427                }
428                result
429            }
430            dashmap::Entry::Vacant(vacant_entry) => {
431                self.to_container.insert(value, (hc as usize, target_map));
432                for val in vacant_entry.key().iter() {
433                    self.val_index.entry(val).or_default().insert(value);
434                }
435                vacant_entry.insert(value);
436                value
437            }
438        }
439    }
440
441    fn reinsert_incremental(
442        &self,
443        container: C,
444        old_id: Value,
445        rebuilt_id: Value,
446        container_changed: bool,
447        exec_state: &mut ExecutionState,
448        summary: &mut ContainerRebuildSummary,
449    ) {
450        if container_changed || rebuilt_id != old_id {
451            summary.note_change();
452        }
453        if rebuilt_id != old_id {
454            // Parent rows will get a real delta from ordinary table rebuild, so
455            // we only need an explicit refresh when the outer id stayed stable.
456            self.to_container.remove(&old_id);
457        }
458        let actual = self.insert_owned(container, rebuilt_id, exec_state);
459        if container_changed && rebuilt_id == old_id && actual == old_id {
460            summary.note_dirty_id(old_id);
461        }
462    }
463
464    fn apply_rebuild_incremental(
465        &mut self,
466        table: &WrappedTable,
467        rebuilder: &dyn Rebuilder,
468        exec_state: &mut ExecutionState,
469        to_scan: SubsetRef,
470        search_col: ColumnId,
471    ) -> ContainerRebuildSummary {
472        // NB: there is no parallel implementation as of now.
473        //
474        // Implementing one should be straightforward, but we should wait for a real benchmark that
475        // requires it. It's possible that incremental rebuilding will only be profitable when the
476        // total number of ids to rebuild is small, in which case the overhead of parallelism may
477        // not be worth it in the first place.
478        let mut summary = ContainerRebuildSummary::default();
479        let mut buf = TaggedRowBuffer::new(1);
480        table.scan_project(
481            to_scan,
482            &[search_col],
483            Offset::new(0),
484            usize::MAX,
485            &[],
486            &mut buf,
487        );
488        // For each value in the buffer, rebuild all containers that mention it.
489        let mut to_rebuild = IndexSet::<Value>::default();
490        for (_, row) in buf.iter() {
491            to_rebuild.insert(row[0]);
492            let Some(ids) = self.val_index.get(&row[0]) else {
493                continue;
494            };
495            to_rebuild.extend(&*ids);
496        }
497        for id in to_rebuild {
498            let Some((hc, target_map)) = self.to_container.get(&id).map(|x| *x) else {
499                continue;
500            };
501            let shard_mut = self.to_id.shards_mut()[target_map].get_mut();
502            let Some((mut container, _)) =
503                shard_mut.remove_entry(hc as u64, |(_, v)| *v.get() == id)
504            else {
505                continue;
506            };
507            let rebuilt_id = rebuilder.rebuild_val(id);
508            let container_changed = container.rebuild_contents(rebuilder);
509            self.reinsert_incremental(
510                container,
511                id,
512                rebuilt_id,
513                container_changed,
514                exec_state,
515                &mut summary,
516            );
517        }
518        summary
519    }
520
521    fn apply_rebuild_nonincremental(
522        &mut self,
523        rebuilder: &dyn Rebuilder,
524        exec_state: &mut ExecutionState,
525    ) -> ContainerRebuildSummary {
526        if parallelize_inter_container_op(self.to_id.len()) {
527            return self.apply_rebuild_nonincremental_parallel(rebuilder, exec_state);
528        }
529        let mut summary = ContainerRebuildSummary::default();
530        let mut to_reinsert = Vec::new();
531        let shards = self.to_id.shards_mut();
532        for shard in shards.iter_mut() {
533            let shard = shard.get_mut();
534            // SAFETY: the iterator does not outlive `shard`.
535            for bucket in unsafe { shard.iter() } {
536                // SAFETY: the bucket is valid; we just got it from the iterator.
537                let (container, val) = unsafe { bucket.as_mut() };
538                let old_val = *val.get();
539                let new_val = rebuilder.rebuild_val(old_val);
540                let container_changed = container.rebuild_contents(rebuilder);
541                if !container_changed && new_val == old_val {
542                    // Nothing changed about this entry. Leave it in place.
543                    continue;
544                }
545                summary.note_change();
546                if container_changed {
547                    // The container changed. Remove both map entries then reinsert.
548                    // SAFETY: This is a valid bucket. Furthermore, iterators remain valid if
549                    // buckets they have already yielded have been removed.
550                    let ((container, _), _) = unsafe { shard.remove(bucket) };
551                    self.to_container.remove(&old_val);
552                    to_reinsert.push((container, new_val, new_val == old_val));
553                } else {
554                    // Just the value changed. Leave the container in place.
555                    *val.get_mut() = new_val;
556                    let prev = self.to_container.remove(&old_val).unwrap().1;
557                    self.to_container.insert(new_val, prev);
558                }
559            }
560        }
561        for (container, val, stable_id) in to_reinsert {
562            let actual = self.insert_owned(container, val, exec_state);
563            // Refresh only when rebuild changed container semantics in place.
564            // If the outer id changed, ordinary table rebuild already creates a
565            // fresh parent-row delta for seminaive to follow.
566            if stable_id && actual == val {
567                summary.note_dirty_id(val);
568            }
569        }
570        summary
571    }
572
573    fn apply_rebuild_nonincremental_parallel(
574        &mut self,
575        rebuilder: &dyn Rebuilder,
576        exec_state: &mut ExecutionState,
577    ) -> ContainerRebuildSummary {
578        // This is very similar to the serial variant. The main difference is that
579        // `to_reinsert` isn't a flat vector. It's instead a vector of queues - one per
580        // destination map shard. This lets us do a bulk insertion in parallel without having
581        // to grab a lock per container.
582        let mut to_reinsert =
583            IdVec::<usize /* to_id shard */, SegQueue<(C, Value, bool)>>::default();
584        to_reinsert.resize_with(self.to_id.shards().len(), Default::default);
585
586        let shards = self.to_id.shards_mut();
587        let changed = shards
588            .par_iter_mut()
589            .map(|shard| {
590                let mut changed = false;
591                let shard = shard.get_mut();
592                // SAFETY: the iterator does not outlive `shard`.
593                for bucket in unsafe { shard.iter() } {
594                    // SAFETY: the bucket is valid; we just got it from the iterator.
595                    let (container, val) = unsafe { bucket.as_mut() };
596                    let old_val = *val.get();
597                    let new_val = rebuilder.rebuild_val(old_val);
598                    let container_changed = container.rebuild_contents(rebuilder);
599                    if !container_changed && new_val == old_val {
600                        // Nothing changed about this entry. Leave it in place.
601                        continue;
602                    }
603                    changed = true;
604                    if container_changed {
605                        // The container changed. Remove both map entries then reinsert.
606                        // SAFETY: This is a valid bucket. Furthermore, iterators remain valid if
607                        // buckets they have already yielded have been removed.
608                        let ((container, _), _) = unsafe { shard.remove(bucket) };
609                        self.to_container.remove(&old_val);
610                        // Spooky: we're using `to_container` to determine the shard for
611                        // `to_id`. We are assuming that the # shards determination is
612                        // deterministic here. There is a debug assertion in `get_or_insert`
613                        // that attempts to verify this.
614                        let shard = self
615                            .to_container
616                            .determine_shard(hash_container(&container) as usize);
617                        to_reinsert[shard].push((container, new_val, new_val == old_val));
618                    } else {
619                        // Just the value changed. Leave the container in place.
620                        *val.get_mut() = new_val;
621                        let prev = self.to_container.remove(&old_val).unwrap().1;
622                        self.to_container.insert(new_val, prev);
623                    }
624                }
625                changed
626            })
627            .max()
628            .unwrap_or(false);
629
630        let dirty_ids = SegQueue::new();
631        shards
632            .iter_mut()
633            .enumerate()
634            .map(|(i, shard)| (i, shard, exec_state.clone()))
635            .par_bridge()
636            .for_each(|(shard_id, shard, mut exec_state)| {
637                // This bit is a real slog. Once Dashmap updates from RawTable to HashTable for
638                // the underlying shard, this will get a little better.
639                //
640                // NB: We are probably leaving some paralellism on the floor with these calls
641                // to `to_container` and `val_index`.
642                let shard = shard.get_mut();
643                let queue = &to_reinsert[shard_id];
644                while let Some((container, val, stable_id)) = queue.pop() {
645                    let hc = hash_container(&container);
646                    let target_map = self.to_container.determine_shard(hc as usize);
647                    match shard.find_or_find_insert_slot(
648                        hc,
649                        |(c, _)| c == &container,
650                        |(c, _)| hash_container(c),
651                    ) {
652                        Ok(bucket) => {
653                            // SAFETY: the bucket is valid; we just got it from the shard and
654                            // we have not done any operations that can invalidate the bucket.
655                            let (container, val_slot) = unsafe { bucket.as_mut() };
656                            let old_val = *val_slot.get();
657                            let result = (self.merge_fn)(&mut exec_state, old_val, val);
658                            if result != old_val {
659                                self.to_container.remove(&old_val);
660                                self.to_container.insert(result, (hc as usize, target_map));
661                                *val_slot.get_mut() = result;
662                                for val in container.iter() {
663                                    let mut index = self.val_index.entry(val).or_default();
664                                    index.swap_remove(&old_val);
665                                    index.insert(result);
666                                }
667                            }
668                            // As in the serial path, only same-id semantic
669                            // changes need an explicit parent-row refresh.
670                            if stable_id && result == val {
671                                dirty_ids.push(val);
672                            }
673                        }
674                        Err(slot) => {
675                            self.to_container.insert(val, (hc as usize, target_map));
676                            for v in container.iter() {
677                                self.val_index.entry(v).or_default().insert(val);
678                            }
679                            // SAFETY: We just got this slot from `find_or_find_insert_slot`
680                            // and we have not mutated the map at all since then.
681                            unsafe {
682                                shard.insert_in_slot(hc, slot, (container, SharedValue::new(val)));
683                            }
684                            if stable_id {
685                                dirty_ids.push(val);
686                            }
687                        }
688                    }
689                }
690            });
691        let mut summary = ContainerRebuildSummary::default();
692        if changed {
693            summary.note_change();
694        }
695        while let Some(value) = dirty_ids.pop() {
696            summary.note_dirty_id(value);
697        }
698        summary
699    }
700
701    fn get_container(&self, value: Value) -> Option<impl Deref<Target = C> + '_> {
702        let (hc, target_map) = *self.to_container.get(&value)?;
703        let shard = &self.to_id.shards()[target_map];
704        let read_guard = shard.read();
705        let val_ptr: *const (C, _) = shard
706            .read()
707            .find(hc as u64, |(_, v)| *v.get() == value)?
708            .as_ptr();
709        struct ValueDeref<'a, T, Guard> {
710            _guard: Guard,
711            data: &'a T,
712        }
713
714        impl<T, Guard> Deref for ValueDeref<'_, T, Guard> {
715            type Target = T;
716
717            fn deref(&self) -> &T {
718                self.data
719            }
720        }
721
722        Some(ValueDeref {
723            _guard: read_guard,
724            // SAFETY: the value will remain valid for as long as `read_guard` is in scope.
725            data: unsafe {
726                let unwrapped: &(C, _) = &*val_ptr;
727                &unwrapped.0
728            },
729        })
730    }
731}
732
733fn incremental_rebuild(uf_size: usize, table_size: usize, parallel: bool) -> bool {
734    if parallel {
735        table_size > 1000 && uf_size * 512 <= table_size
736    } else {
737        table_size > 1000 && uf_size * 8 <= table_size
738    }
739}