egglog_core_relations/free_join/
mod.rs

1//! Execute queries against a database using a variant of Free Join.
2use std::{
3    mem,
4    sync::{
5        Arc,
6        atomic::{AtomicUsize, Ordering},
7    },
8};
9
10use crate::{
11    common::IndexSet,
12    hash_index::IndexCatalog,
13    numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id},
14};
15use egglog_concurrency::{NotificationList, ResettableOnceLock};
16use rayon::prelude::*;
17use smallvec::SmallVec;
18
19use crate::{
20    BaseValues, ContainerRebuildSummary, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
21    action::{
22        Bindings, DbView,
23        mask::{Mask, MaskIter, ValueSource},
24    },
25    dependency_graph::DependencyGraph,
26    hash_index::{ColumnIndex, Index, IndexBase},
27    offsets::Subset,
28    parallel_heuristics::parallelize_db_level_op,
29    pool::{Pool, Pooled, with_pool_set},
30    query::{Query, RuleSetBuilder},
31    table_spec::{
32        ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
33    },
34};
35
36use self::plan::Plan;
37use crate::action::ExecutionState;
38
39pub(crate) mod execute;
40pub(crate) mod frame_update;
41pub(crate) mod plan;
42
43define_id!(
44    pub AtomId,
45    u32,
46    "A component of a query consisting of a function and a list of variables or constants"
47);
48define_id!(pub Variable, u32, "a variable in a query", pretty "Var");
49
50impl Variable {
51    pub fn placeholder() -> Variable {
52        Variable::new(!0)
53    }
54}
55
56define_id!(pub TableId, u32, "a table in the database");
57
58impl TableId {
59    pub fn dummy() -> TableId {
60        TableId::new(u32::MAX)
61    }
62
63    pub fn is_dummy(&self) -> bool {
64        self.rep == u32::MAX
65    }
66}
67
68define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
69
70#[derive(Debug)]
71pub(crate) struct ProcessedConstraints {
72    /// The subset of the table matching the fast constraints. If there are no
73    /// fast constraints then this is the full table.
74    pub(crate) subset: Subset,
75    /// The constraints that can be evaluated quickly (O(log(n)) or O(1)).
76    pub(crate) fast: Pooled<Vec<Constraint>>,
77    /// The constraints that require an O(n) scan to evaluate.
78    pub(crate) slow: Pooled<Vec<Constraint>>,
79}
80
81impl Clone for ProcessedConstraints {
82    fn clone(&self) -> Self {
83        ProcessedConstraints {
84            subset: self.subset.clone(),
85            fast: Pooled::cloned(&self.fast),
86            slow: Pooled::cloned(&self.slow),
87        }
88    }
89}
90
91impl ProcessedConstraints {
92    /// The size of the subset of the table matching the fast constraints.
93    fn approx_size(&self) -> usize {
94        self.subset.size()
95    }
96
97    pub(crate) fn dummy() -> ProcessedConstraints {
98        ProcessedConstraints {
99            subset: Subset::empty(),
100            fast: Pooled::new(Vec::new()),
101            slow: Pooled::new(Vec::new()),
102        }
103    }
104}
105
106#[derive(Clone, Debug, PartialEq, Eq)]
107pub(crate) struct SubAtom {
108    pub(crate) atom: AtomId,
109    pub(crate) vars: SmallVec<[ColumnId; 2]>,
110}
111
112impl SubAtom {
113    pub(crate) fn new(atom: AtomId) -> SubAtom {
114        SubAtom {
115            atom,
116            vars: Default::default(),
117        }
118    }
119}
120
121#[derive(Debug, Clone)]
122pub(crate) struct VarInfo {
123    pub(crate) occurrences: Vec<SubAtom>,
124    /// Whether or not this variable shows up in the "actions" portion of a
125    /// rule.
126    pub(crate) used_in_rhs: bool,
127    pub(crate) defined_in_rhs: bool,
128    pub(crate) name: Option<Arc<str>>,
129}
130
131pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
132pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
133
134pub struct TableInfo {
135    pub(crate) name: Option<Arc<str>>,
136    pub(crate) spec: TableSpec,
137    pub(crate) table: WrappedTable,
138    pub(crate) indexes: IndexCatalog<SmallVec<[ColumnId; 4]>, HashIndex>,
139    pub(crate) column_indexes: IndexCatalog<ColumnId, HashColumnIndex>,
140}
141
142impl TableInfo {
143    pub fn table(&self) -> &WrappedTable {
144        &self.table
145    }
146
147    pub fn name(&self) -> Option<&str> {
148        self.name.as_deref()
149    }
150
151    pub fn spec(&self) -> &TableSpec {
152        &self.spec
153    }
154}
155
156impl Clone for TableInfo {
157    fn clone(&self) -> Self {
158        fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
159            map: &IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>>,
160            table: WrappedTableRef,
161        ) -> IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>> {
162            map.map(|table_ref| {
163                let (k, v) = table_ref;
164                let v: Index<TI> = v
165                    .get_or_update(|index| {
166                        index.refresh(table);
167                    })
168                    .clone();
169                (k.clone(), Arc::new(ResettableOnceLock::new(v)))
170            })
171        }
172        TableInfo {
173            name: self.name.clone(),
174            spec: self.spec.clone(),
175            table: self.table.dyn_clone(),
176            indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
177            column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
178        }
179    }
180}
181
182define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
183define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
184
185/// External functions allow external callers to manipulate database state in
186/// near-arbitrary ways.
187///
188/// This is a useful, if low-level, interface for extending this database with
189/// functionality and state not built into the core model.
190pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
191    /// Invoke the function with mutable access to the database. If a value is
192    /// not returned, halt the execution of the current rule.
193    fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
194}
195
196/// Automatically generate an `ExternalFunction` implementation from a function.
197pub fn make_external_func<
198    F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
199>(
200    f: F,
201) -> impl ExternalFunction {
202    #[derive(Clone)]
203    struct Wrapped<F>(F);
204    impl<F> ExternalFunction for Wrapped<F>
205    where
206        F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
207    {
208        fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
209            (self.0)(state, args)
210        }
211    }
212    Wrapped(f)
213}
214
215/// A vectorized variant of [`ExternalFunction::invoke`] to avoid repeated dynamic dispatch.
216pub(crate) fn invoke_batch(
217    this: &dyn ExternalFunction,
218    state: &mut ExecutionState,
219    mask: &mut Mask,
220    bindings: &mut Bindings,
221    args: &[QueryEntry],
222    out_var: Variable,
223) {
224    let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool());
225    let mut out = pool.get();
226    out.reserve(mask.len());
227    for_each_binding_with_mask!(mask, args, bindings, |iter| {
228        iter.fill_vec(&mut out, Value::stale, |_, args| {
229            this.invoke(state, args.as_slice())
230        });
231    });
232    bindings.insert(out_var, &out);
233}
234
235/// A variant of [`invoke_batch`] that overwrites the output variable,
236/// rather than assigning all new values.
237///
238/// *Panics* This method will panic if `out_var` doesn't already have an appropriately-sized
239/// vector bound in `bindings`.
240pub(crate) fn invoke_batch_assign(
241    this: &dyn ExternalFunction,
242    state: &mut ExecutionState,
243    mask: &mut Mask,
244    bindings: &mut Bindings,
245    args: &[QueryEntry],
246    out_var: Variable,
247) {
248    let mut out = bindings.take(out_var).expect("out_var must be bound");
249    for_each_binding_with_mask!(mask, args, bindings, |iter| {
250        iter.assign_vec_and_retain(&mut out.vals, |_, args| this.invoke(state, &args))
251    });
252    bindings.replace(out);
253}
254
255// Implements `Clone` for `Box<dyn ExternalFunction>`.
256dyn_clone::clone_trait_object!(ExternalFunction);
257
258pub(crate) type ExternalFunctions =
259    DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunction>>;
260
261#[derive(Default)]
262pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
263
264impl Clone for Counters {
265    fn clone(&self) -> Counters {
266        let mut map = DenseIdMap::new();
267        for (k, v) in self.0.iter() {
268            // NB: we may want to experiment with Ordering::Relaxed here.
269            map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)));
270        }
271        Counters(map)
272    }
273}
274
275impl Counters {
276    pub(crate) fn read(&self, ctr: CounterId) -> usize {
277        self.0[ctr].load(Ordering::Acquire)
278    }
279    pub(crate) fn inc(&self, ctr: CounterId) -> usize {
280        // We synchronize with `read_counter` but not with other increments.
281        // NB: we may want to experiment with Ordering::Relaxed here.
282        self.0[ctr].fetch_add(1, Ordering::Release)
283    }
284}
285
286/// A collection of tables and indexes over them.
287///
288/// A database also owns the memory pools used by its tables.
289#[derive(Clone, Default)]
290pub struct Database {
291    // NB: some fields are pub(crate) to allow some internal modules to avoid
292    // borrowing the whole table.
293    pub(crate) tables: DenseIdMap<TableId, TableInfo>,
294    // TODO: having a single AtomicUsize per counter can lead to contention. We
295    // should look into prefetching counters when creating a new ExecutionState
296    // and incrementing locally. Note that the batch size shouldn't be too big
297    // because we keep an array per id in the UF.
298    pub(crate) counters: Counters,
299    pub(crate) external_functions: ExternalFunctions,
300    container_values: ContainerValues,
301    /// `notification_list` contains the list of tables that have been modified since the last call
302    /// to [`Database::merge_all`].
303    notification_list: NotificationList<TableId>,
304    // Tracks the relative dependencies between tables during merge operations.
305    deps: DependencyGraph,
306    base_values: BaseValues,
307    /// A rough estimate of the total size of the database.
308    ///
309    /// This is primarily used to determine whether or not to attempt to do some operations in
310    /// parallel.
311    total_size_estimate: usize,
312}
313
314impl Database {
315    /// Create an empty Database.
316    ///
317    /// Queries are executed using the current rayon thread pool, which defaults to the global
318    /// thread pool.
319    pub fn new() -> Database {
320        Database::default()
321    }
322
323    /// Initialize a new rulse set to run against this database.
324    pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
325        RuleSetBuilder::new(self)
326    }
327
328    /// Add a new external function to the database.
329    pub fn add_external_function(
330        &mut self,
331        f: Box<dyn ExternalFunction + 'static>,
332    ) -> ExternalFunctionId {
333        self.external_functions.push(f)
334    }
335
336    /// Free an existing external function. Make sure not to use `id` afterwards.
337    pub fn free_external_function(&mut self, id: ExternalFunctionId) {
338        self.external_functions.take(id);
339    }
340
341    pub fn base_values(&self) -> &BaseValues {
342        &self.base_values
343    }
344
345    pub fn base_values_mut(&mut self) -> &mut BaseValues {
346        &mut self.base_values
347    }
348
349    pub fn container_values(&self) -> &ContainerValues {
350        &self.container_values
351    }
352
353    pub fn container_values_mut(&mut self) -> &mut ContainerValues {
354        &mut self.container_values
355    }
356
357    pub fn rebuild_containers(&mut self, table_id: TableId) -> ContainerRebuildSummary {
358        let mut containers = mem::take(&mut self.container_values);
359        let table = &self.tables[table_id].table;
360        let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
361        self.container_values = containers;
362        res
363    }
364
365    /// Apply the value-level rebuild encoded by `func_id` to all the tables in `to_rebuild`.
366    ///
367    /// The native [`Table::apply_rebuild`] method takes a `next_ts` argument for filling in new
368    /// values in a table like [`crate::SortedWritesTable`] where values in a certain column need
369    /// to be inserted in sorted order; the `next_ts` argument to this method is passed to
370    /// `apply_rebuild` for this purpose.
371    pub fn apply_rebuild(
372        &mut self,
373        func_id: TableId,
374        to_rebuild: &[TableId],
375        next_ts: Value,
376    ) -> bool {
377        let func = self.tables.take(func_id).unwrap();
378        self.run_on_tables(to_rebuild, |_, info, view| {
379            info.table.apply_rebuild(
380                func_id,
381                &func.table,
382                next_ts,
383                &mut ExecutionState::new(*view, Default::default()),
384            )
385        });
386        self.tables.insert(func_id, func);
387        self.merge_all()
388    }
389
390    pub fn refresh_rows_for_values(
391        &mut self,
392        to_refresh: &[TableId],
393        dirty_ids: &[Value],
394        next_ts: Value,
395    ) -> bool {
396        if dirty_ids.is_empty() {
397            return false;
398        }
399        // This is the follow-up for `ContainerRebuildSummary::dirty_ids()`.
400        // These ids changed semantics without changing identity, so parent
401        // rows can become newly matchable without getting an ordinary table
402        // delta.
403        //
404        // It must run after ordinary table rebuild, which already handles
405        // changed-id cases by rewriting parent rows to the new id.
406        self.run_on_tables(to_refresh, |_, info, _| {
407            info.table.refresh_rows_for_values(dirty_ids, next_ts)
408        });
409        self.merge_all()
410    }
411
412    fn run_on_tables(
413        &mut self,
414        table_ids: &[TableId],
415        run: impl for<'a> Fn(TableId, &mut TableInfo, &DbView<'a>) -> bool + Sync,
416    ) {
417        if parallelize_db_level_op(self.total_size_estimate) {
418            let mut tables = Vec::with_capacity(table_ids.len());
419            for id in table_ids {
420                tables.push((*id, self.tables.take(*id).unwrap()));
421            }
422            let view = self.read_only_view();
423            tables.par_iter_mut().for_each(|(id, info)| {
424                if run(*id, info, &view) {
425                    self.notification_list.notify(*id);
426                }
427            });
428            for (id, info) in tables {
429                self.tables.insert(id, info);
430            }
431        } else {
432            for id in table_ids {
433                let mut info = self.tables.take(*id).unwrap();
434                let changed = {
435                    let view = self.read_only_view();
436                    run(*id, &mut info, &view)
437                };
438                if changed {
439                    self.notification_list.notify(*id);
440                }
441                self.tables.insert(*id, info);
442            }
443        }
444    }
445
446    /// Run `f` with access to an `ExecutionState` mapped to this database.
447    pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
448        let mut state = ExecutionState::new(self.read_only_view(), Default::default());
449        f(&mut state)
450    }
451
452    /// Like [`Database::with_execution_state`], but also reports whether `f`
453    /// staged any mutation through the execution state. Callers can use the
454    /// flag to skip a subsequent `merge_all` when the closure was read-only.
455    pub fn with_execution_state_tracked<R>(
456        &self,
457        f: impl FnOnce(&mut ExecutionState) -> R,
458    ) -> (R, bool) {
459        let mut state = ExecutionState::new(self.read_only_view(), Default::default());
460        let result = f(&mut state);
461        (result, state.changed)
462    }
463
464    pub(crate) fn read_only_view(&self) -> DbView<'_> {
465        DbView {
466            table_info: &self.tables,
467            counters: &self.counters,
468            external_funcs: &self.external_functions,
469            bases: &self.base_values,
470            containers: &self.container_values,
471            notification_list: &self.notification_list,
472        }
473    }
474
475    /// Estimate the size of the table. If a constraint is provided, return an
476    /// estimate of the size of the subset of the table matching the constraint.
477    pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
478        let table_info = self
479            .tables
480            .get(table)
481            .expect("table must be declared in the current database");
482        let table = &table_info.table;
483        if let Some(c) = c {
484            if let Some(sub) = table.fast_subset(&c) {
485                // In the case where a the constraint can be computed quickly,
486                // we do not filter for staleness, which may over-approximate.
487                sub.size()
488            } else {
489                table.refine_one(table.refine_live(table.all()), &c).size()
490            }
491        } else {
492            table.len()
493        }
494    }
495
496    /// Create a new counter for this database.
497    ///
498    /// These counters can be used to generate unique ids as part of an action.
499    pub fn add_counter(&mut self) -> CounterId {
500        self.counters.0.push(AtomicUsize::new(0))
501    }
502
503    /// Increment the given counter and return its previous value.
504    pub fn inc_counter(&self, counter: CounterId) -> usize {
505        self.counters.inc(counter)
506    }
507
508    /// Get the current value of the given counter.
509    pub fn read_counter(&self, counter: CounterId) -> usize {
510        self.counters.read(counter)
511    }
512
513    /// A helper for merging all pending updates. Used to write to the database after updates have
514    /// been staged. Returns true if any tuples were added.
515    ///
516    /// Exposed for testing purposes.
517    ///
518    /// Useful for out-of-band insertions into the database.
519    pub fn merge_all(&mut self) -> bool {
520        let mut ever_changed = false;
521        let do_parallel = parallelize_db_level_op(self.total_size_estimate);
522        let mut to_merge = IndexSet::default();
523        loop {
524            to_merge.clear();
525            let to_merge_vec = self.notification_list.reset();
526            if to_merge_vec.len() < 4 {
527                ever_changed |= self.merge_simple(to_merge_vec);
528                break;
529            }
530            for table in to_merge_vec {
531                to_merge.insert(table);
532            }
533
534            let mut changed = false;
535            let mut tables_merging = DenseIdMap::<
536                TableId,
537                (
538                    // The info needed to merge this table.
539                    Option<TableInfo>,
540                    // Pre-allocated write buffers, according to the tables declared write
541                    // dependencies.
542                    DenseIdMap<TableId, Box<dyn MutationBuffer>>,
543                ),
544            >::with_capacity(self.tables.n_ids());
545            for stratum in self.deps.strata() {
546                // Initialize the write dependencies first.
547                for table in stratum.intersection(&to_merge).copied() {
548                    let mut bufs = DenseIdMap::default();
549                    for dep in self.deps.write_deps(table) {
550                        if let Some(info) = self.tables.get(dep) {
551                            bufs.insert(dep, info.table.new_buffer());
552                        }
553                    }
554                    tables_merging.insert(table, (None, bufs));
555                }
556                // Then initialize read dependencies (this two-phase structure is why we have an
557                // Option in the tables_merging map).
558                for table in stratum.intersection(&to_merge).copied() {
559                    tables_merging[table].0 = Some(self.tables.unwrap_val(table));
560                }
561                let db = self.read_only_view();
562                changed |= if do_parallel {
563                    tables_merging
564                        .par_iter_mut()
565                        .map(|(_, (info, buffers))| {
566                            let mut es = ExecutionState::new(db, mem::take(buffers));
567                            info.as_mut().unwrap().table.merge(&mut es).added || es.changed
568                        })
569                        .max()
570                        .unwrap_or(false)
571                } else {
572                    tables_merging
573                        .iter_mut()
574                        .map(|(_, (info, buffers))| {
575                            let mut es = ExecutionState::new(db, mem::take(buffers));
576                            info.as_mut().unwrap().table.merge(&mut es).added || es.changed
577                        })
578                        .max()
579                        .unwrap_or(false)
580                };
581                for (id, (table, _)) in tables_merging.drain() {
582                    self.tables.insert(id, table.unwrap());
583                }
584            }
585            ever_changed |= changed;
586        }
587        // Reset all indexes to force an update on the next access.
588        let mut size_estimate = 0;
589        for (_, info) in self.tables.iter_mut() {
590            info.column_indexes.update(|_, ti| {
591                Arc::get_mut(ti).unwrap().reset();
592            });
593            info.indexes.update(|_, ti| {
594                Arc::get_mut(ti).unwrap().reset();
595            });
596            size_estimate += info.table.len();
597        }
598        self.total_size_estimate = size_estimate;
599        ever_changed
600    }
601
602    /// A "fast path" merge method that is not optimized for parallelism and does not respect read
603    /// and write dependencies. This ends up being faster than the full "strata-aware" option in
604    /// the body of `merge_all`.
605    fn merge_simple(&mut self, mut to_merge: SmallVec<[TableId; 4]>) -> bool {
606        let mut changed = false;
607        while !to_merge.is_empty() {
608            for table_id in to_merge.iter().copied() {
609                let mut info = self.tables.unwrap_val(table_id);
610                let mut es = ExecutionState::new(self.read_only_view(), Default::default());
611                changed |= info.table.merge(&mut es).added || es.changed;
612                self.tables.insert(table_id, info);
613            }
614            to_merge = self.notification_list.reset();
615        }
616        changed
617    }
618
619    /// A low-level helper for merging pending updates to a particular function.
620    ///
621    /// Callers should prefer `merge_all`, as the process of merging the data
622    /// for a particular table may cause other updates to be buffered
623    /// elesewhere. The `merge_all` method runs merges to a fixed point to avoid
624    /// surprises here.
625    pub fn merge_table(&mut self, table: TableId) -> bool {
626        let mut info = self.tables.unwrap_val(table);
627        self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
628        let table_changed = info.table.merge(&mut ExecutionState::new(
629            self.read_only_view(),
630            Default::default(),
631        ));
632        self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
633        self.tables.insert(table, info);
634        table_changed.added
635    }
636
637    /// Get id of the next table to be added to the database.
638    ///
639    /// This can be useful for "knot tying", when tables need to reference their
640    /// own id.
641    pub fn next_table_id(&self) -> TableId {
642        self.tables.next_id()
643    }
644
645    /// Add a table with the given schema to the database.
646    ///
647    /// The table must have a compatible spec with `types` (e.g. same number of
648    /// columns).
649    pub fn add_table<T: Table + Sized + 'static>(
650        &mut self,
651        table: T,
652        read_deps: impl IntoIterator<Item = TableId>,
653        write_deps: impl IntoIterator<Item = TableId>,
654    ) -> TableId {
655        self.add_table_impl(table, None, read_deps, write_deps)
656    }
657
658    pub fn add_table_named<T: Table + Sized + 'static>(
659        &mut self,
660        table: T,
661        name: Arc<str>,
662        read_deps: impl IntoIterator<Item = TableId>,
663        write_deps: impl IntoIterator<Item = TableId>,
664    ) -> TableId {
665        self.add_table_impl(table, Some(name), read_deps, write_deps)
666    }
667
668    fn add_table_impl<T: Table + Sized + 'static>(
669        &mut self,
670        table: T,
671        name: Option<Arc<str>>,
672        read_deps: impl IntoIterator<Item = TableId>,
673        write_deps: impl IntoIterator<Item = TableId>,
674    ) -> TableId {
675        let spec = table.spec();
676        let table = WrappedTable::new(table);
677        let res = self.tables.push(TableInfo {
678            name,
679            spec,
680            table,
681            indexes: IndexCatalog::new(),
682            column_indexes: IndexCatalog::new(),
683        });
684        self.deps.add_table(res, read_deps, write_deps);
685        res
686    }
687
688    /// Get direct mutable access to the table.
689    ///
690    /// This method is useful for out-of-band access to databse state.
691    ///
692    /// **NOTE:** It is legal to call [`Table::new_buffer`] on the returned table handle, and use
693    /// that to stage updates to the given table via [`MutationBuffer::stage_insert`] or
694    /// [`MutationBuffer::stage_remove`], however this is *likely to be a source of bugs*.
695    ///
696    /// Updates staged in this way will not cause `table` to be marked as having pending changes in
697    /// the next call to [`Database::merge_all`]. Instead, such users should use
698    /// [`Database::new_buffer`], which plumbs this signal through correctly, or better yet,
699    /// perform all updates through an [`ExecutionState`] or a [`crate::RuleBuilder`]. If these
700    /// options do not work, then calling [`Database::merge_table`] directly will force a merge
701    /// call on the table.
702    pub fn get_table(&self, table: TableId) -> &WrappedTable {
703        &self
704            .tables
705            .get(table)
706            .expect("must access a table that has been declared in this database")
707            .table
708    }
709
710    /// Get a handle on the given table along with metadata about it.
711    ///
712    ///
713    /// **NOTE:** See the note on [`Database::get_table`] around manually staging updates.
714    pub fn get_table_info(&self, table: TableId) -> &TableInfo {
715        self.tables
716            .get(table)
717            .expect("must access a table that has been declared in this database")
718    }
719
720    /// Create a new mutation buffer for the table with id `id`.
721    ///
722    /// This will marked the given table as potentially changed for the next round of merging.
723    /// Unlike calling [`Table::new_buffer`] on a table returned from a getter, this method also
724    /// triggers change notification metadata that is read by [`Database::merge_all`].
725    pub fn new_buffer(&self, id: TableId) -> Box<dyn MutationBuffer> {
726        self.notification_list.notify(id);
727        self.get_table(id).new_buffer()
728    }
729
730    pub(crate) fn process_constraints(
731        &self,
732        table: TableId,
733        cs: &[Constraint],
734    ) -> ProcessedConstraints {
735        let table_info = &self.tables[table];
736        let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
737        slow.retain(|c| {
738            let (col, val) = match c {
739                Constraint::EqConst { col, val } => (*col, *val),
740                Constraint::Eq { .. }
741                | Constraint::LtConst { .. }
742                | Constraint::GtConst { .. }
743                | Constraint::LeConst { .. }
744                | Constraint::GeConst { .. } => return true,
745            };
746            // We are looking up by a constant: this is something we can build
747            // an index for as long as the column is cacheable.
748            if *table_info
749                .spec
750                .uncacheable_columns
751                .get(col)
752                .unwrap_or(&false)
753            {
754                return true;
755            }
756            // We have or will build an index: upgrade this constraint to
757            // 'fast'.
758            fast.push(c.clone());
759            let index = get_column_index_from_tableinfo(table_info, col);
760            match index.get().unwrap().get_subset(&val) {
761                Some(s) => {
762                    with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
763                }
764                None => {
765                    // There are no rows matching this key! We can constrain this to nothing.
766                    subset = Subset::empty();
767                }
768            }
769            // Remove this constraint from the slow list.
770            false
771        });
772        ProcessedConstraints { subset, fast, slow }
773    }
774
775    /// Get direct mutable access to the table.
776    ///
777    /// This method is useful for out-of-band access to databse state.
778    ///
779    /// **NOTE:** See the warning around staging updates to handles returned through this method in
780    /// the documentation for [`Database::get_table`].
781    pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
782        &mut *self
783            .tables
784            .get_mut(id)
785            .expect("must access a table that has been declared in this database")
786            .table
787    }
788
789    /// Remove every row from the given table.
790    ///
791    /// This is intended as a faster alternative to staging a per-row
792    /// `stage_remove` for every key in the table. The underlying [`Table::clear`]
793    /// implementation drops the row storage in bulk and bumps the table's major
794    /// generation, so any cached indexes/subsets observed by future readers will
795    /// be lazily rebuilt against the now-empty table. Any pending staged
796    /// inserts or removes for this table are dropped (they pre-dated the clear,
797    /// so they no longer make sense once the table is empty).
798    ///
799    /// This method also resets the cached column- and key-indexes for the
800    /// table so subsequent merges can take the `Arc::get_mut`-based reset path,
801    /// matching the invariant maintained by [`Database::merge_all`].
802    ///
803    /// This does **not** flush pending changes for *other* tables; it is the
804    /// caller's responsibility to call [`Database::merge_all`] beforehand if
805    /// they need staged updates from a previous step to land before the clear.
806    pub fn clear_table(&mut self, table: TableId) {
807        let info = self
808            .tables
809            .get_mut(table)
810            .expect("must access a table that has been declared in this database");
811        let prev_len = info.table.len();
812        info.table.clear();
813        // The version bump from `clear` is enough on its own to make the
814        // indexes self-refresh on next access (see `Index::refresh`). We still
815        // reset them eagerly here so that the next `merge_all` sees the same
816        // "indexes are resettable" state it expects after a successful merge.
817        info.column_indexes.update(|_, ti| {
818            if let Some(arc) = Arc::get_mut(ti) {
819                arc.reset();
820            }
821        });
822        info.indexes.update(|_, ti| {
823            if let Some(arc) = Arc::get_mut(ti) {
824                arc.reset();
825            }
826        });
827        self.total_size_estimate = self.total_size_estimate.wrapping_sub(prev_len);
828    }
829
830    pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
831        plan::plan_query(query, ColumnCardEst::new(self))
832    }
833}
834
835impl Drop for Database {
836    fn drop(&mut self) {
837        // Clean up the ambient thread pool.
838        //
839        // Calling mem::forget on the egraph can result in much faster execution times.
840        with_pool_set(PoolSet::clear);
841        rayon::broadcast(|_| with_pool_set(PoolSet::clear));
842    }
843}
844
845/// The core logic behind getting and updating a hash index.
846///
847/// This is in a separate function to allow us to reuse it while already
848/// borrowing a `TableInfo`.
849fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
850    let index: Arc<_> = table_info.indexes.get_or_insert(cols.into(), || {
851        Arc::new(ResettableOnceLock::new(Index::new(
852            cols.to_vec(),
853            TupleIndex::new(cols.len()),
854        )))
855    });
856    index.get_or_update(|index| {
857        index.refresh(table_info.table.as_ref());
858    });
859    debug_assert!(
860        !index
861            .get()
862            .unwrap()
863            .needs_refresh(table_info.table.as_ref())
864    );
865    index
866}
867
868/// The core logic behind getting and updating a column index.
869///
870/// This is the single-column analog to [`get_index_from_tableinfo`].
871fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
872    let index: Arc<_> = table_info.column_indexes.get_or_insert(col, || {
873        Arc::new(ResettableOnceLock::new(Index::new(
874            vec![col],
875            ColumnIndex::new(),
876        )))
877    });
878    index.get_or_update(|index| {
879        index.refresh(table_info.table.as_ref());
880    });
881    debug_assert!(
882        !index
883            .get()
884            .unwrap()
885            .needs_refresh(table_info.table.as_ref())
886    );
887    index
888}
889
890#[derive(Clone)]
891pub struct ColumnCardEst<'a> {
892    db: &'a Database,
893}
894
895impl ColumnCardEst<'_> {
896    pub fn new(db: &Database) -> ColumnCardEst<'_> {
897        ColumnCardEst { db }
898    }
899
900    pub fn col_uniqueness(&self, table: TableId, col: ColumnId) -> ColUniqueness {
901        let col_idx = get_column_index_from_tableinfo(&self.db.tables[table], col);
902        let table = &self.db.tables[table].table;
903        ColUniqueness {
904            col_size: col_idx.get().unwrap().len(),
905            table_size: table.len(),
906        }
907    }
908}
909
910impl std::fmt::Debug for ColumnCardEst<'_> {
911    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
912        f.debug_struct("ColumnCardEst").finish_non_exhaustive()
913    }
914}
915
916/// A coarse cardinality estimate for a column of a table, used by the query
917/// planner to decide which variable to eliminate next during tree
918/// decomposition.
919///
920/// `table_size` is the number of rows in the (sub)table and `col_size` is the
921/// number of distinct values in the column. Their ratio
922/// (`table_size / col_size`) approximates the average number of rows that share
923/// a given value of the column: a smaller ratio means the column is closer to
924/// being unique and therefore cheaper to join on. [`ColUniqueness`] is ordered
925/// by this ratio (see the [`Ord`] impl), so the planner prefers variables with
926/// the most selective (most unique) columns.
927#[derive(Copy, Clone, Debug)]
928pub struct ColUniqueness {
929    table_size: usize,
930    col_size: usize,
931}
932
933impl Default for ColUniqueness {
934    fn default() -> ColUniqueness {
935        ColUniqueness {
936            table_size: 1,
937            col_size: 1,
938        }
939    }
940}
941
942impl ColUniqueness {
943    #[allow(dead_code)] // not yet wired up into the planner
944    fn scale(&self, subset_size: usize) -> ColUniqueness {
945        if self.table_size == 0 || subset_size == 0 {
946            return ColUniqueness {
947                table_size: 0,
948                col_size: 0,
949            };
950        }
951        ColUniqueness {
952            table_size: subset_size,
953            col_size: self.col_size.saturating_mul(subset_size) / self.table_size,
954        }
955    }
956    fn join(&self, other: &ColUniqueness) -> ColUniqueness {
957        ColUniqueness {
958            table_size: self.table_size.saturating_mul(other.table_size),
959            col_size: self.col_size.max(other.col_size),
960        }
961    }
962
963    #[allow(dead_code)] // not yet wired up into the planner
964    fn col_size(&self) -> usize {
965        self.col_size
966    }
967}
968
969impl PartialEq for ColUniqueness {
970    fn eq(&self, other: &Self) -> bool {
971        self.cmp(other) == std::cmp::Ordering::Equal
972    }
973}
974
975impl Eq for ColUniqueness {}
976
977impl PartialOrd for ColUniqueness {
978    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
979        Some(self.cmp(other))
980    }
981}
982
983impl Ord for ColUniqueness {
984    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
985        (self.table_size.saturating_mul(other.col_size))
986            .cmp(&(other.table_size.saturating_mul(self.col_size)))
987    }
988}