egglog_core_relations/containers/
mod.rs

1//! Support for containers
2//!
3//! Containers behave a lot like base values. They are implemented differently because
4//! their ids share a space with other Ids in the egraph and as a result, their ids need to be
5//! sparse.
6//!
7//! This is a relatively "eagler" implementation of containers, reflecting egglog's current
8//! semantics. One could imagine a variant of containers in which they behave more like egglog
9//! functions than base values.
10
11use std::{
12    any::{Any, TypeId},
13    hash::{Hash, Hasher},
14    ops::Deref,
15};
16
17use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
18use crossbeam_queue::SegQueue;
19use dashmap::SharedValue;
20use rayon::{
21    iter::{ParallelBridge, ParallelIterator},
22    prelude::*,
23};
24use rustc_hash::FxHasher;
25
26use crate::{
27    ColumnId, CounterId, ExecutionState, Offset, SubsetRef, TableId, TaggedRowBuffer, Value,
28    WrappedTable,
29    common::{DashMap, IndexSet, SubsetTracker},
30    parallel_heuristics::{parallelize_inter_container_op, parallelize_intra_container_op},
31    table_spec::Rebuilder,
32};
33
34#[cfg(test)]
35mod tests;
36
37define_id!(pub ContainerValueId, u32, "an identifier for containers");
38
39pub trait MergeFn:
40    Fn(&mut ExecutionState, Value, Value) -> Value + dyn_clone::DynClone + Send + Sync
41{
42}
43impl<T: Fn(&mut ExecutionState, Value, Value) -> Value + Clone + Send + Sync> MergeFn for T {}
44
45// Implements `Clone` for `Box<dyn MergeFn>`.
46dyn_clone::clone_trait_object!(MergeFn);
47
48#[derive(Clone, Default)]
49struct ContainerIds {
50    ids: IndexSet<TypeId>,
51}
52
53impl ContainerIds {
54    fn insert(&mut self, ty: TypeId) -> ContainerValueId {
55        if let Some(idx) = self.ids.get_index_of(&ty) {
56            ContainerValueId::from_usize(idx)
57        } else {
58            let idx = self.ids.len();
59            self.ids.insert(ty);
60            ContainerValueId::from_usize(idx)
61        }
62    }
63
64    fn get(&self, ty: &TypeId) -> Option<ContainerValueId> {
65        self.ids.get_index_of(ty).map(ContainerValueId::from_usize)
66    }
67}
68
69#[derive(Clone, Default)]
70pub struct ContainerValues {
71    subset_tracker: SubsetTracker,
72    container_ids: ContainerIds,
73    data: DenseIdMap<ContainerValueId, Box<dyn DynamicContainerEnv + Send + Sync>>,
74}
75
76/// Summary returned by container rebuild.
77///
78/// `changed` means some container entry changed during rebuild, either because
79/// its contents changed or because its outer id canonicalized.
80///
81/// `dirty_ids` is narrower: it records container ids whose semantics changed
82/// while their stored outer id stayed stable. Ordinary table rebuild already
83/// handles changed-id cases; these ids need a follow-up parent-row refresh.
84///
85/// For example, `l(vec-of(w(k(b))))` can rebuild to `l(vec-of(k(b)))` without
86/// changing the `Vec` id. The row is now newly matchable, but seminaive will
87/// miss it unless the parent row is retimestamped.
88#[derive(Clone, Default)]
89pub struct ContainerRebuildSummary {
90    changed: bool,
91    // Container ids whose semantics changed in a way that may not produce a
92    // fresh parent-row delta during ordinary table rebuild.
93    dirty_ids: IndexSet<Value>,
94}
95
96impl ContainerRebuildSummary {
97    /// Returns whether any container entry changed during rebuild.
98    pub fn changed(&self) -> bool {
99        self.changed
100    }
101
102    /// Returns the container ids whose parent rows may need retimestamping.
103    pub fn dirty_ids(&self) -> &IndexSet<Value> {
104        &self.dirty_ids
105    }
106
107    fn note_change(&mut self) {
108        self.changed = true;
109    }
110
111    fn note_dirty_id(&mut self, value: Value) {
112        self.changed = true;
113        self.dirty_ids.insert(value);
114    }
115
116    fn extend(&mut self, other: Self) {
117        self.changed |= other.changed;
118        self.dirty_ids.extend(other.dirty_ids);
119    }
120}
121
122impl ContainerValues {
123    pub fn new() -> Self {
124        Default::default()
125    }
126
127    fn get<C: ContainerValue>(&self) -> Option<&ContainerEnv<C>> {
128        let id = self.container_ids.get(&TypeId::of::<C>())?;
129        let res = self.data.get(id)?.as_any();
130        Some(res.downcast_ref::<ContainerEnv<C>>().unwrap())
131    }
132
133    /// Iterate over the containers of the given type.
134    pub fn for_each<C: ContainerValue>(&self, mut f: impl FnMut(&C, Value)) {
135        let Some(env) = self.get::<C>() else {
136            return;
137        };
138        for ent in env.to_id.iter() {
139            f(ent.key(), *ent.value());
140        }
141    }
142
143    /// Get the container associated with the value `val` in the database. The caller must know the
144    /// type of the container.
145    ///
146    /// The return type of this function may contain lock guards. Attempts to modify the contents
147    /// of the containers database may deadlock if the given guard has not been dropped.
148    pub fn get_val<C: ContainerValue>(&self, val: Value) -> Option<impl Deref<Target = C> + '_> {
149        self.get::<C>()?.get_container(val)
150    }
151
152    pub fn register_val<C: ContainerValue>(
153        &self,
154        container: C,
155        exec_state: &mut ExecutionState,
156    ) -> Value {
157        let env = self
158            .get::<C>()
159            .expect("must register container type before registering a value");
160        env.get_or_insert(&container, exec_state)
161    }
162
163    /// Apply the given rebuild to the contents of each container.
164    pub fn rebuild_all(
165        &mut self,
166        table_id: TableId,
167        table: &WrappedTable,
168        exec_state: &mut ExecutionState,
169    ) -> ContainerRebuildSummary {
170        let Some(rebuilder) = table.rebuilder(&[]) else {
171            return Default::default();
172        };
173        let to_scan = rebuilder.hint_col().map(|_| {
174            // We may attempt an incremental rebuild.
175            self.subset_tracker.recent_updates(table_id, table)
176        });
177        if parallelize_inter_container_op(self.data.next_id().index()) {
178            self.data
179                .iter_mut()
180                .zip(std::iter::repeat_with(|| exec_state.clone()))
181                .par_bridge()
182                .map(|((_, env), mut exec_state)| {
183                    env.apply_rebuild(
184                        table,
185                        &*rebuilder,
186                        to_scan.as_ref().map(|x| x.as_ref()),
187                        &mut exec_state,
188                    )
189                })
190                .reduce(ContainerRebuildSummary::default, |mut acc, summary| {
191                    acc.extend(summary);
192                    acc
193                })
194        } else {
195            let mut summary = ContainerRebuildSummary::default();
196            for (_, env) in self.data.iter_mut() {
197                summary.extend(env.apply_rebuild(
198                    table,
199                    &*rebuilder,
200                    to_scan.as_ref().map(|x| x.as_ref()),
201                    exec_state,
202                ));
203            }
204            summary
205        }
206    }
207
208    /// Add a new container type to the given [`ContainerValue`] instance.
209    ///
210    /// Container types need a meaans of generating fresh ids (`id_counter`) along with a means of
211    /// merging conflicting ids (`merge_fn`).
212    pub fn register_type<C: ContainerValue>(
213        &mut self,
214        id_counter: CounterId,
215        merge_fn: impl MergeFn + 'static,
216    ) -> ContainerValueId {
217        let id = self.container_ids.insert(TypeId::of::<C>());
218        self.data.get_or_insert(id, || {
219            Box::new(ContainerEnv::<C>::new(Box::new(merge_fn), id_counter))
220        });
221        id
222    }
223}
224
225/// A trait implemented by container types.
226///
227/// Containers behave a lot like base values, but they include extra trait methods to support
228/// rebuilding of container contents and merging containers that become equal after a rebuild pass
229/// has taken place.
230pub trait ContainerValue: Hash + Eq + Clone + Send + Sync + 'static {
231    /// Rebuild an additional container in place according the the given [`Rebuilder`].
232    ///
233    /// If this method returns `false` then the container must not have been modified (i.e. it must
234    /// hash to the same value, and compare equal to a copy of itself before the call).
235    fn rebuild_contents(&mut self, rebuilder: &dyn Rebuilder) -> bool;
236
237    /// Iterate over the contents of the container.
238    ///
239    /// Note that containers can be more structured than just a sequence of values. This iterator
240    /// is used to populate an index that in turn is used to speed up rebuilds. If a value in the
241    /// container is eligible for a rebuild and it is not mentioned by this iterator, the outer
242    /// container registry may skip rebuilding this container.
243    fn iter(&self) -> impl Iterator<Item = Value> + '_;
244}
245
246pub trait DynamicContainerEnv: Any + dyn_clone::DynClone + Send + Sync {
247    fn as_any(&self) -> &dyn Any;
248    fn apply_rebuild(
249        &mut self,
250        table: &WrappedTable,
251        rebuilder: &dyn Rebuilder,
252        subset: Option<SubsetRef>,
253        exec_state: &mut ExecutionState,
254    ) -> ContainerRebuildSummary;
255}
256
257// Implements `Clone` for `Box<dyn DynamicContainerEnv>`.
258dyn_clone::clone_trait_object!(DynamicContainerEnv);
259
260fn hash_container(container: &impl ContainerValue) -> u64 {
261    let mut hasher = FxHasher::default();
262    container.hash(&mut hasher);
263    hasher.finish()
264}
265
266#[derive(Clone)]
267struct ContainerEnv<C: Eq + Hash> {
268    merge_fn: Box<dyn MergeFn>,
269    counter: CounterId,
270    to_id: DashMap<C, Value>,
271    to_container: DashMap<Value, (usize /* hash code */, usize /* map */)>,
272    /// Map from a Value to the set of ids of containers that contain that value.
273    val_index: DashMap<Value, IndexSet<Value>>,
274}
275
276impl<C: ContainerValue> DynamicContainerEnv for ContainerEnv<C> {
277    fn as_any(&self) -> &dyn Any {
278        self
279    }
280
281    fn apply_rebuild(
282        &mut self,
283        table: &WrappedTable,
284        rebuilder: &dyn Rebuilder,
285        subset: Option<SubsetRef>,
286        exec_state: &mut ExecutionState,
287    ) -> ContainerRebuildSummary {
288        if let Some(subset) = subset
289            && incremental_rebuild(
290                subset.size(),
291                self.to_id.len(),
292                parallelize_intra_container_op(self.to_id.len()),
293            )
294        {
295            return self.apply_rebuild_incremental(
296                table,
297                rebuilder,
298                exec_state,
299                subset,
300                rebuilder.hint_col().unwrap(),
301            );
302        }
303        self.apply_rebuild_nonincremental(rebuilder, exec_state)
304    }
305}
306
307impl<C: ContainerValue> ContainerEnv<C> {
308    pub fn new(merge_fn: Box<dyn MergeFn>, counter: CounterId) -> Self {
309        Self {
310            merge_fn,
311            counter,
312            to_id: DashMap::default(),
313            to_container: DashMap::default(),
314            val_index: DashMap::default(),
315        }
316    }
317
318    fn get_or_insert(&self, container: &C, exec_state: &mut ExecutionState) -> Value {
319        if let Some(value) = self.to_id.get(container) {
320            return *value;
321        }
322
323        // Time to insert a new mapping. First, insert into `to_container`: the moment that we
324        // insert a new value into `to_id`, someone else can return it from another call to
325        // `get_or_insert` and then feed that value to `get_container`.
326
327        let value = Value::from_usize(exec_state.inc_counter(self.counter));
328        let target_map = self.to_id.determine_map(container);
329        // This assertion is here because in parallel rebuilding we use `to_container` to
330        // compute the intended shard for to_id, because we have a mutable borrow of
331        // `to_container` that means we cannot call `determine_map` on `to_id`.
332        debug_assert_eq!(
333            target_map,
334            self.to_container
335                .determine_shard(hash_container(container) as usize)
336        );
337        self.to_container
338            .insert(value, (hash_container(container) as usize, target_map));
339
340        // Now insert into `to_id`, handling the case where a different thread is doing the same
341        // thing.
342        match self.to_id.entry(container.clone()) {
343            dashmap::Entry::Vacant(vac) => {
344                // Common case: insert the mapping in to_id and update the index.
345                vac.insert(value);
346                for val in container.iter() {
347                    self.val_index.entry(val).or_default().insert(value);
348                }
349                value
350            }
351            dashmap::Entry::Occupied(occ) => {
352                // Someone inserted `container` into the mapping since we looked it up. Remove the
353                // mapping that we inserted into `to_container` (we won't use it), and instead
354                // return the "winning" value.
355                let res = *occ.get();
356                std::mem::drop(occ); // drop the lock.
357                self.to_container.remove(&value);
358                res
359            }
360        }
361    }
362
363    fn insert_owned(&self, container: C, value: Value, exec_state: &mut ExecutionState) -> Value {
364        let hc = hash_container(&container);
365        let target_map = self.to_id.determine_map(&container);
366        match self.to_id.entry(container) {
367            dashmap::Entry::Occupied(mut occ) => {
368                let result = (self.merge_fn)(exec_state, *occ.get(), value);
369                let old_val = *occ.get();
370                if result != old_val {
371                    self.to_container.remove(&old_val);
372                    self.to_container.insert(result, (hc as usize, target_map));
373                    *occ.get_mut() = result;
374                    for val in occ.key().iter() {
375                        let mut index = self.val_index.entry(val).or_default();
376                        index.swap_remove(&old_val);
377                        index.insert(result);
378                    }
379                }
380                result
381            }
382            dashmap::Entry::Vacant(vacant_entry) => {
383                self.to_container.insert(value, (hc as usize, target_map));
384                for val in vacant_entry.key().iter() {
385                    self.val_index.entry(val).or_default().insert(value);
386                }
387                vacant_entry.insert(value);
388                value
389            }
390        }
391    }
392
393    fn reinsert_incremental(
394        &self,
395        container: C,
396        old_id: Value,
397        rebuilt_id: Value,
398        container_changed: bool,
399        exec_state: &mut ExecutionState,
400        summary: &mut ContainerRebuildSummary,
401    ) {
402        if container_changed || rebuilt_id != old_id {
403            summary.note_change();
404        }
405        if rebuilt_id != old_id {
406            // Parent rows will get a real delta from ordinary table rebuild, so
407            // we only need an explicit refresh when the outer id stayed stable.
408            self.to_container.remove(&old_id);
409        }
410        let actual = self.insert_owned(container, rebuilt_id, exec_state);
411        if container_changed && rebuilt_id == old_id && actual == old_id {
412            summary.note_dirty_id(old_id);
413        }
414    }
415
416    fn apply_rebuild_incremental(
417        &mut self,
418        table: &WrappedTable,
419        rebuilder: &dyn Rebuilder,
420        exec_state: &mut ExecutionState,
421        to_scan: SubsetRef,
422        search_col: ColumnId,
423    ) -> ContainerRebuildSummary {
424        // NB: there is no parallel implementation as of now.
425        //
426        // Implementing one should be straightforward, but we should wait for a real benchmark that
427        // requires it. It's possible that incremental rebuilding will only be profitable when the
428        // total number of ids to rebuild is small, in which case the overhead of parallelism may
429        // not be worth it in the first place.
430        let mut summary = ContainerRebuildSummary::default();
431        let mut buf = TaggedRowBuffer::new(1);
432        table.scan_project(
433            to_scan,
434            &[search_col],
435            Offset::new(0),
436            usize::MAX,
437            &[],
438            &mut buf,
439        );
440        // For each value in the buffer, rebuild all containers that mention it.
441        let mut to_rebuild = IndexSet::<Value>::default();
442        for (_, row) in buf.iter() {
443            to_rebuild.insert(row[0]);
444            let Some(ids) = self.val_index.get(&row[0]) else {
445                continue;
446            };
447            to_rebuild.extend(&*ids);
448        }
449        for id in to_rebuild {
450            let Some((hc, target_map)) = self.to_container.get(&id).map(|x| *x) else {
451                continue;
452            };
453            let shard_mut = self.to_id.shards_mut()[target_map].get_mut();
454            let Some((mut container, _)) =
455                shard_mut.remove_entry(hc as u64, |(_, v)| *v.get() == id)
456            else {
457                continue;
458            };
459            let rebuilt_id = rebuilder.rebuild_val(id);
460            let container_changed = container.rebuild_contents(rebuilder);
461            self.reinsert_incremental(
462                container,
463                id,
464                rebuilt_id,
465                container_changed,
466                exec_state,
467                &mut summary,
468            );
469        }
470        summary
471    }
472
473    fn apply_rebuild_nonincremental(
474        &mut self,
475        rebuilder: &dyn Rebuilder,
476        exec_state: &mut ExecutionState,
477    ) -> ContainerRebuildSummary {
478        if parallelize_inter_container_op(self.to_id.len()) {
479            return self.apply_rebuild_nonincremental_parallel(rebuilder, exec_state);
480        }
481        let mut summary = ContainerRebuildSummary::default();
482        let mut to_reinsert = Vec::new();
483        let shards = self.to_id.shards_mut();
484        for shard in shards.iter_mut() {
485            let shard = shard.get_mut();
486            // SAFETY: the iterator does not outlive `shard`.
487            for bucket in unsafe { shard.iter() } {
488                // SAFETY: the bucket is valid; we just got it from the iterator.
489                let (container, val) = unsafe { bucket.as_mut() };
490                let old_val = *val.get();
491                let new_val = rebuilder.rebuild_val(old_val);
492                let container_changed = container.rebuild_contents(rebuilder);
493                if !container_changed && new_val == old_val {
494                    // Nothing changed about this entry. Leave it in place.
495                    continue;
496                }
497                summary.note_change();
498                if container_changed {
499                    // The container changed. Remove both map entries then reinsert.
500                    // SAFETY: This is a valid bucket. Furthermore, iterators remain valid if
501                    // buckets they have already yielded have been removed.
502                    let ((container, _), _) = unsafe { shard.remove(bucket) };
503                    self.to_container.remove(&old_val);
504                    to_reinsert.push((container, new_val, new_val == old_val));
505                } else {
506                    // Just the value changed. Leave the container in place.
507                    *val.get_mut() = new_val;
508                    let prev = self.to_container.remove(&old_val).unwrap().1;
509                    self.to_container.insert(new_val, prev);
510                }
511            }
512        }
513        for (container, val, stable_id) in to_reinsert {
514            let actual = self.insert_owned(container, val, exec_state);
515            // Refresh only when rebuild changed container semantics in place.
516            // If the outer id changed, ordinary table rebuild already creates a
517            // fresh parent-row delta for seminaive to follow.
518            if stable_id && actual == val {
519                summary.note_dirty_id(val);
520            }
521        }
522        summary
523    }
524
525    fn apply_rebuild_nonincremental_parallel(
526        &mut self,
527        rebuilder: &dyn Rebuilder,
528        exec_state: &mut ExecutionState,
529    ) -> ContainerRebuildSummary {
530        // This is very similar to the serial variant. The main difference is that
531        // `to_reinsert` isn't a flat vector. It's instead a vector of queues - one per
532        // destination map shard. This lets us do a bulk insertion in parallel without having
533        // to grab a lock per container.
534        let mut to_reinsert =
535            IdVec::<usize /* to_id shard */, SegQueue<(C, Value, bool)>>::default();
536        to_reinsert.resize_with(self.to_id.shards().len(), Default::default);
537
538        let shards = self.to_id.shards_mut();
539        let changed = shards
540            .par_iter_mut()
541            .map(|shard| {
542                let mut changed = false;
543                let shard = shard.get_mut();
544                // SAFETY: the iterator does not outlive `shard`.
545                for bucket in unsafe { shard.iter() } {
546                    // SAFETY: the bucket is valid; we just got it from the iterator.
547                    let (container, val) = unsafe { bucket.as_mut() };
548                    let old_val = *val.get();
549                    let new_val = rebuilder.rebuild_val(old_val);
550                    let container_changed = container.rebuild_contents(rebuilder);
551                    if !container_changed && new_val == old_val {
552                        // Nothing changed about this entry. Leave it in place.
553                        continue;
554                    }
555                    changed = true;
556                    if container_changed {
557                        // The container changed. Remove both map entries then reinsert.
558                        // SAFETY: This is a valid bucket. Furthermore, iterators remain valid if
559                        // buckets they have already yielded have been removed.
560                        let ((container, _), _) = unsafe { shard.remove(bucket) };
561                        self.to_container.remove(&old_val);
562                        // Spooky: we're using `to_container` to determine the shard for
563                        // `to_id`. We are assuming that the # shards determination is
564                        // deterministic here. There is a debug assertion in `get_or_insert`
565                        // that attempts to verify this.
566                        let shard = self
567                            .to_container
568                            .determine_shard(hash_container(&container) as usize);
569                        to_reinsert[shard].push((container, new_val, new_val == old_val));
570                    } else {
571                        // Just the value changed. Leave the container in place.
572                        *val.get_mut() = new_val;
573                        let prev = self.to_container.remove(&old_val).unwrap().1;
574                        self.to_container.insert(new_val, prev);
575                    }
576                }
577                changed
578            })
579            .max()
580            .unwrap_or(false);
581
582        let dirty_ids = SegQueue::new();
583        shards
584            .iter_mut()
585            .enumerate()
586            .map(|(i, shard)| (i, shard, exec_state.clone()))
587            .par_bridge()
588            .for_each(|(shard_id, shard, mut exec_state)| {
589                // This bit is a real slog. Once Dashmap updates from RawTable to HashTable for
590                // the underlying shard, this will get a little better.
591                //
592                // NB: We are probably leaving some paralellism on the floor with these calls
593                // to `to_container` and `val_index`.
594                let shard = shard.get_mut();
595                let queue = &to_reinsert[shard_id];
596                while let Some((container, val, stable_id)) = queue.pop() {
597                    let hc = hash_container(&container);
598                    let target_map = self.to_container.determine_shard(hc as usize);
599                    match shard.find_or_find_insert_slot(
600                        hc,
601                        |(c, _)| c == &container,
602                        |(c, _)| hash_container(c),
603                    ) {
604                        Ok(bucket) => {
605                            // SAFETY: the bucket is valid; we just got it from the shard and
606                            // we have not done any operations that can invalidate the bucket.
607                            let (container, val_slot) = unsafe { bucket.as_mut() };
608                            let old_val = *val_slot.get();
609                            let result = (self.merge_fn)(&mut exec_state, old_val, val);
610                            if result != old_val {
611                                self.to_container.remove(&old_val);
612                                self.to_container.insert(result, (hc as usize, target_map));
613                                *val_slot.get_mut() = result;
614                                for val in container.iter() {
615                                    let mut index = self.val_index.entry(val).or_default();
616                                    index.swap_remove(&old_val);
617                                    index.insert(result);
618                                }
619                            }
620                            // As in the serial path, only same-id semantic
621                            // changes need an explicit parent-row refresh.
622                            if stable_id && result == val {
623                                dirty_ids.push(val);
624                            }
625                        }
626                        Err(slot) => {
627                            self.to_container.insert(val, (hc as usize, target_map));
628                            for v in container.iter() {
629                                self.val_index.entry(v).or_default().insert(val);
630                            }
631                            // SAFETY: We just got this slot from `find_or_find_insert_slot`
632                            // and we have not mutated the map at all since then.
633                            unsafe {
634                                shard.insert_in_slot(hc, slot, (container, SharedValue::new(val)));
635                            }
636                            if stable_id {
637                                dirty_ids.push(val);
638                            }
639                        }
640                    }
641                }
642            });
643        let mut summary = ContainerRebuildSummary::default();
644        if changed {
645            summary.note_change();
646        }
647        while let Some(value) = dirty_ids.pop() {
648            summary.note_dirty_id(value);
649        }
650        summary
651    }
652
653    fn get_container(&self, value: Value) -> Option<impl Deref<Target = C> + '_> {
654        let (hc, target_map) = *self.to_container.get(&value)?;
655        let shard = &self.to_id.shards()[target_map];
656        let read_guard = shard.read();
657        let val_ptr: *const (C, _) = shard
658            .read()
659            .find(hc as u64, |(_, v)| *v.get() == value)?
660            .as_ptr();
661        struct ValueDeref<'a, T, Guard> {
662            _guard: Guard,
663            data: &'a T,
664        }
665
666        impl<T, Guard> Deref for ValueDeref<'_, T, Guard> {
667            type Target = T;
668
669            fn deref(&self) -> &T {
670                self.data
671            }
672        }
673
674        Some(ValueDeref {
675            _guard: read_guard,
676            // SAFETY: the value will remain valid for as long as `read_guard` is in scope.
677            data: unsafe {
678                let unwrapped: &(C, _) = &*val_ptr;
679                &unwrapped.0
680            },
681        })
682    }
683}
684
685fn incremental_rebuild(uf_size: usize, table_size: usize, parallel: bool) -> bool {
686    if parallel {
687        table_size > 1000 && uf_size * 512 <= table_size
688    } else {
689        table_size > 1000 && uf_size * 8 <= table_size
690    }
691}