egglog_core_relations/free_join/
mod.rs

1//! Execute queries against a database using a variant of Free Join.
2use std::{
3    mem,
4    sync::{
5        Arc,
6        atomic::{AtomicUsize, Ordering},
7    },
8};
9
10use crate::{
11    common::IndexSet,
12    hash_index::IndexCatalog,
13    numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id},
14};
15use egglog_concurrency::{NotificationList, ResettableOnceLock};
16use rayon::prelude::*;
17use smallvec::SmallVec;
18
19use crate::{
20    BaseValues, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
21    action::{
22        Bindings, DbView,
23        mask::{Mask, MaskIter, ValueSource},
24    },
25    dependency_graph::DependencyGraph,
26    hash_index::{ColumnIndex, Index, IndexBase},
27    offsets::Subset,
28    parallel_heuristics::parallelize_db_level_op,
29    pool::{Pool, Pooled, with_pool_set},
30    query::{Query, RuleSetBuilder},
31    table_spec::{
32        ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
33    },
34};
35
36use self::plan::Plan;
37use crate::action::ExecutionState;
38
39pub(crate) mod execute;
40pub(crate) mod frame_update;
41pub(crate) mod plan;
42
43define_id!(
44    pub AtomId,
45    u32,
46    "A component of a query consisting of a function and a list of variables or constants"
47);
48define_id!(pub Variable, u32, "a variable in a query");
49
50impl Variable {
51    pub fn placeholder() -> Variable {
52        Variable::new(!0)
53    }
54}
55
56define_id!(pub TableId, u32, "a table in the database");
57define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
58
59#[derive(Debug)]
60pub(crate) struct ProcessedConstraints {
61    /// The subset of the table matching the fast constraints. If there are no
62    /// fast constraints then this is the full table.
63    pub(crate) subset: Subset,
64    /// The constraints that can be evaluated quickly (O(log(n)) or O(1)).
65    pub(crate) fast: Pooled<Vec<Constraint>>,
66    /// The constraints that require an O(n) scan to evaluate.
67    pub(crate) slow: Pooled<Vec<Constraint>>,
68}
69
70impl Clone for ProcessedConstraints {
71    fn clone(&self) -> Self {
72        ProcessedConstraints {
73            subset: self.subset.clone(),
74            fast: Pooled::cloned(&self.fast),
75            slow: Pooled::cloned(&self.slow),
76        }
77    }
78}
79
80impl ProcessedConstraints {
81    /// The size of the subset of the table matching the fast constraints.
82    fn approx_size(&self) -> usize {
83        self.subset.size()
84    }
85}
86
87#[derive(Clone, Debug, PartialEq, Eq)]
88pub(crate) struct SubAtom {
89    pub(crate) atom: AtomId,
90    pub(crate) vars: SmallVec<[ColumnId; 2]>,
91}
92
93impl SubAtom {
94    pub(crate) fn new(atom: AtomId) -> SubAtom {
95        SubAtom {
96            atom,
97            vars: Default::default(),
98        }
99    }
100}
101
102#[derive(Debug)]
103pub(crate) struct VarInfo {
104    pub(crate) occurrences: Vec<SubAtom>,
105    /// Whether or not this variable shows up in the "actions" portion of a
106    /// rule.
107    pub(crate) used_in_rhs: bool,
108    pub(crate) defined_in_rhs: bool,
109    pub(crate) name: Option<Arc<str>>,
110}
111
112pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
113pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
114
115pub struct TableInfo {
116    pub(crate) name: Option<Arc<str>>,
117    pub(crate) spec: TableSpec,
118    pub(crate) table: WrappedTable,
119    pub(crate) indexes: IndexCatalog<SmallVec<[ColumnId; 4]>, HashIndex>,
120    pub(crate) column_indexes: IndexCatalog<ColumnId, HashColumnIndex>,
121}
122
123impl TableInfo {
124    pub fn table(&self) -> &WrappedTable {
125        &self.table
126    }
127
128    pub fn name(&self) -> Option<&str> {
129        self.name.as_deref()
130    }
131
132    pub fn spec(&self) -> &TableSpec {
133        &self.spec
134    }
135}
136
137impl Clone for TableInfo {
138    fn clone(&self) -> Self {
139        fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
140            map: &IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>>,
141            table: WrappedTableRef,
142        ) -> IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>> {
143            map.map(|table_ref| {
144                let (k, v) = table_ref;
145                let v: Index<TI> = v
146                    .get_or_update(|index| {
147                        index.refresh(table);
148                    })
149                    .clone();
150                (k.clone(), Arc::new(ResettableOnceLock::new(v)))
151            })
152        }
153        TableInfo {
154            name: self.name.clone(),
155            spec: self.spec.clone(),
156            table: self.table.dyn_clone(),
157            indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
158            column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
159        }
160    }
161}
162
163define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
164define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
165
166/// External functions allow external callers to manipulate database state in
167/// near-arbitrary ways.
168///
169/// This is a useful, if low-level, interface for extending this database with
170/// functionality and state not built into the core model.
171pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
172    /// Invoke the function with mutable access to the database. If a value is
173    /// not returned, halt the execution of the current rule.
174    fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
175}
176
177/// Automatically generate an `ExternalFunction` implementation from a function.
178pub fn make_external_func<
179    F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
180>(
181    f: F,
182) -> impl ExternalFunction {
183    #[derive(Clone)]
184    struct Wrapped<F>(F);
185    impl<F> ExternalFunction for Wrapped<F>
186    where
187        F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
188    {
189        fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
190            (self.0)(state, args)
191        }
192    }
193    Wrapped(f)
194}
195
196/// A vectorized variant of [`ExternalFunction::invoke`] to avoid repeated dynamic dispatch.
197pub(crate) fn invoke_batch(
198    this: &dyn ExternalFunction,
199    state: &mut ExecutionState,
200    mask: &mut Mask,
201    bindings: &mut Bindings,
202    args: &[QueryEntry],
203    out_var: Variable,
204) {
205    let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool().clone());
206    let mut out = pool.get();
207    out.reserve(mask.len());
208    for_each_binding_with_mask!(mask, args, bindings, |iter| {
209        iter.fill_vec(&mut out, Value::stale, |_, args| {
210            this.invoke(state, args.as_slice())
211        });
212    });
213    bindings.insert(out_var, &out);
214}
215
216/// A variant of [`invoke_batch`] that overwrites the output variable,
217/// rather than assigning all new values.
218///
219/// *Panics* This method will panic if `out_var` doesn't already have an appropriately-sized
220/// vector bound in `bindings`.
221pub(crate) fn invoke_batch_assign(
222    this: &dyn ExternalFunction,
223    state: &mut ExecutionState,
224    mask: &mut Mask,
225    bindings: &mut Bindings,
226    args: &[QueryEntry],
227    out_var: Variable,
228) {
229    let mut out = bindings.take(out_var).expect("out_var must be bound");
230    for_each_binding_with_mask!(mask, args, bindings, |iter| {
231        iter.assign_vec_and_retain(&mut out.vals, |_, args| this.invoke(state, &args))
232    });
233    bindings.replace(out);
234}
235
236// Implements `Clone` for `Box<dyn ExternalFunction>`.
237dyn_clone::clone_trait_object!(ExternalFunction);
238
239pub(crate) type ExternalFunctions =
240    DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunction>>;
241
242#[derive(Default)]
243pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
244
245impl Clone for Counters {
246    fn clone(&self) -> Counters {
247        let mut map = DenseIdMap::new();
248        for (k, v) in self.0.iter() {
249            // NB: we may want to experiment with Ordering::Relaxed here.
250            map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)))
251        }
252        Counters(map)
253    }
254}
255
256impl Counters {
257    pub(crate) fn read(&self, ctr: CounterId) -> usize {
258        self.0[ctr].load(Ordering::Acquire)
259    }
260    pub(crate) fn inc(&self, ctr: CounterId) -> usize {
261        // We synchronize with `read_counter` but not with other increments.
262        // NB: we may want to experiment with Ordering::Relaxed here.
263        self.0[ctr].fetch_add(1, Ordering::Release)
264    }
265}
266
267/// A collection of tables and indexes over them.
268///
269/// A database also owns the memory pools used by its tables.
270#[derive(Clone, Default)]
271pub struct Database {
272    // NB: some fields are pub(crate) to allow some internal modules to avoid
273    // borrowing the whole table.
274    pub(crate) tables: DenseIdMap<TableId, TableInfo>,
275    // TODO: having a single AtomicUsize per counter can lead to contention. We
276    // should look into prefetching counters when creating a new ExecutionState
277    // and incrementing locally. Note that the batch size shouldn't be too big
278    // because we keep an array per id in the UF.
279    pub(crate) counters: Counters,
280    pub(crate) external_functions: ExternalFunctions,
281    container_values: ContainerValues,
282    /// `notification_list` contains the list of tables that have been modified since the last call
283    /// to [`Database::merge_all`].
284    notification_list: NotificationList<TableId>,
285    // Tracks the relative dependencies between tables during merge operations.
286    deps: DependencyGraph,
287    base_values: BaseValues,
288    /// A rough estimate of the total size of the database.
289    ///
290    /// This is primarily used to determine whether or not to attempt to do some operations in
291    /// parallel.
292    total_size_estimate: usize,
293}
294
295impl Database {
296    /// Create an empty Database.
297    ///
298    /// Queries are executed using the current rayon thread pool, which defaults to the global
299    /// thread pool.
300    pub fn new() -> Database {
301        Database::default()
302    }
303
304    /// Initialize a new rulse set to run against this database.
305    pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
306        RuleSetBuilder::new(self)
307    }
308
309    /// Add a new external function to the database.
310    pub fn add_external_function(
311        &mut self,
312        f: Box<dyn ExternalFunction + 'static>,
313    ) -> ExternalFunctionId {
314        self.external_functions.push(f)
315    }
316
317    /// Free an existing external function. Make sure not to use `id` afterwards.
318    pub fn free_external_function(&mut self, id: ExternalFunctionId) {
319        self.external_functions.take(id);
320    }
321
322    pub fn base_values(&self) -> &BaseValues {
323        &self.base_values
324    }
325
326    pub fn base_values_mut(&mut self) -> &mut BaseValues {
327        &mut self.base_values
328    }
329
330    pub fn container_values(&self) -> &ContainerValues {
331        &self.container_values
332    }
333
334    pub fn container_values_mut(&mut self) -> &mut ContainerValues {
335        &mut self.container_values
336    }
337
338    pub fn rebuild_containers(&mut self, table_id: TableId) -> bool {
339        let mut containers = mem::take(&mut self.container_values);
340        let table = &self.tables[table_id].table;
341        let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
342        self.container_values = containers;
343        res
344    }
345
346    /// Apply the value-level rebuild encoded by `func_id` to all the tables in `to_rebuild`.
347    ///
348    /// The native [`Table::apply_rebuild`] method takes a `next_ts` argument for filling in new
349    /// values in a table like [`crate::SortedWritesTable`] where values in a certain column need
350    /// to be inserted in sorted order; the `next_ts` argument to this method is passed to
351    /// `apply_rebuild` for this purpose.
352    pub fn apply_rebuild(
353        &mut self,
354        func_id: TableId,
355        to_rebuild: &[TableId],
356        next_ts: Value,
357    ) -> bool {
358        let func = self.tables.take(func_id).unwrap();
359        if parallelize_db_level_op(self.total_size_estimate) {
360            let mut tables = Vec::with_capacity(to_rebuild.len());
361            for id in to_rebuild {
362                tables.push((*id, self.tables.take(*id).unwrap()));
363            }
364            tables.par_iter_mut().for_each(|(id, info)| {
365                if info.table.apply_rebuild(
366                    func_id,
367                    &func.table,
368                    next_ts,
369                    &mut ExecutionState::new(self.read_only_view(), Default::default()),
370                ) {
371                    self.notification_list.notify(*id);
372                }
373            });
374            for (id, info) in tables {
375                self.tables.insert(id, info);
376            }
377        } else {
378            for id in to_rebuild {
379                let mut info = self.tables.take(*id).unwrap();
380                if info.table.apply_rebuild(
381                    func_id,
382                    &func.table,
383                    next_ts,
384                    &mut ExecutionState::new(self.read_only_view(), Default::default()),
385                ) {
386                    self.notification_list.notify(*id);
387                }
388                self.tables.insert(*id, info);
389            }
390        }
391        self.tables.insert(func_id, func);
392        self.merge_all()
393    }
394
395    /// Run `f` with access to an `ExecutionState` mapped to this database.
396    pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
397        let mut state = ExecutionState::new(self.read_only_view(), Default::default());
398        f(&mut state)
399    }
400
401    pub(crate) fn read_only_view(&self) -> DbView<'_> {
402        DbView {
403            table_info: &self.tables,
404            counters: &self.counters,
405            external_funcs: &self.external_functions,
406            bases: &self.base_values,
407            containers: &self.container_values,
408            notification_list: &self.notification_list,
409        }
410    }
411
412    /// Estimate the size of the table. If a constraint is provided, return an
413    /// estimate of the size of the subset of the table matching the constraint.
414    pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
415        let table_info = self
416            .tables
417            .get(table)
418            .expect("table must be declared in the current database");
419        let table = &table_info.table;
420        if let Some(c) = c {
421            if let Some(sub) = table.fast_subset(&c) {
422                // In the case where a the constraint can be computed quickly,
423                // we do not filter for staleness, which may over-approximate.
424                sub.size()
425            } else {
426                table.refine_one(table.refine_live(table.all()), &c).size()
427            }
428        } else {
429            table.len()
430        }
431    }
432
433    /// Create a new counter for this database.
434    ///
435    /// These counters can be used to generate unique ids as part of an action.
436    pub fn add_counter(&mut self) -> CounterId {
437        self.counters.0.push(AtomicUsize::new(0))
438    }
439
440    /// Increment the given counter and return its previous value.
441    pub fn inc_counter(&self, counter: CounterId) -> usize {
442        self.counters.inc(counter)
443    }
444
445    /// Get the current value of the given counter.
446    pub fn read_counter(&self, counter: CounterId) -> usize {
447        self.counters.read(counter)
448    }
449
450    /// A helper for merging all pending updates. Used to write to the database after updates have
451    /// been staged. Returns true if any tuples were added.
452    ///
453    /// Exposed for testing purposes.
454    ///
455    /// Useful for out-of-band insertions into the database.
456    pub fn merge_all(&mut self) -> bool {
457        let mut ever_changed = false;
458        let do_parallel = parallelize_db_level_op(self.total_size_estimate);
459        let mut to_merge = IndexSet::default();
460        loop {
461            to_merge.clear();
462            let to_merge_vec = self.notification_list.reset();
463            if to_merge_vec.len() < 4 {
464                ever_changed |= self.merge_simple(to_merge_vec);
465                break;
466            }
467            for table in to_merge_vec {
468                to_merge.insert(table);
469            }
470
471            let mut changed = false;
472            let mut tables_merging = DenseIdMap::<
473                TableId,
474                (
475                    // The info needed to merge this table.
476                    Option<TableInfo>,
477                    // Pre-allocated write buffers, according to the tables declared write
478                    // dependencies.
479                    DenseIdMap<TableId, Box<dyn MutationBuffer>>,
480                ),
481            >::with_capacity(self.tables.n_ids());
482            for stratum in self.deps.strata() {
483                // Initialize the write dependencies first.
484                for table in stratum.intersection(&to_merge).copied() {
485                    let mut bufs = DenseIdMap::default();
486                    for dep in self.deps.write_deps(table) {
487                        if let Some(info) = self.tables.get(dep) {
488                            bufs.insert(dep, info.table.new_buffer());
489                        }
490                    }
491                    tables_merging.insert(table, (None, bufs));
492                }
493                // Then initialize read dependencies (this two-phase structure is why we have an
494                // Option in the tables_merging map).
495                for table in stratum.intersection(&to_merge).copied() {
496                    tables_merging[table].0 = Some(self.tables.unwrap_val(table));
497                }
498                let db = self.read_only_view();
499                changed |= if do_parallel {
500                    tables_merging
501                        .par_iter_mut()
502                        .map(|(_, (info, buffers))| {
503                            let mut es = ExecutionState::new(db, mem::take(buffers));
504                            info.as_mut().unwrap().table.merge(&mut es).added || es.changed
505                        })
506                        .max()
507                        .unwrap_or(false)
508                } else {
509                    tables_merging
510                        .iter_mut()
511                        .map(|(_, (info, buffers))| {
512                            let mut es = ExecutionState::new(db, mem::take(buffers));
513                            info.as_mut().unwrap().table.merge(&mut es).added || es.changed
514                        })
515                        .max()
516                        .unwrap_or(false)
517                };
518                for (id, (table, _)) in tables_merging.drain() {
519                    self.tables.insert(id, table.unwrap());
520                }
521            }
522            ever_changed |= changed;
523        }
524        // Reset all indexes to force an update on the next access.
525        let mut size_estimate = 0;
526        for (_, info) in self.tables.iter_mut() {
527            info.column_indexes.update(|_, ti| {
528                Arc::get_mut(ti).unwrap().reset();
529            });
530            info.indexes.update(|_, ti| {
531                Arc::get_mut(ti).unwrap().reset();
532            });
533            size_estimate += info.table.len();
534        }
535        self.total_size_estimate = size_estimate;
536        ever_changed
537    }
538
539    /// A "fast path" merge method that is not optimized for parallelism and does not respect read
540    /// and write dependencies. This ends up being faster than the full "strata-aware" option in
541    /// the body of `merge_all`.
542    fn merge_simple(&mut self, mut to_merge: SmallVec<[TableId; 4]>) -> bool {
543        let mut changed = false;
544        while !to_merge.is_empty() {
545            for table_id in to_merge.iter().copied() {
546                let mut info = self.tables.unwrap_val(table_id);
547                let mut es = ExecutionState::new(self.read_only_view(), Default::default());
548                changed |= info.table.merge(&mut es).added || es.changed;
549                self.tables.insert(table_id, info);
550            }
551            to_merge = self.notification_list.reset();
552        }
553        changed
554    }
555
556    /// A low-level helper for merging pending updates to a particular function.
557    ///
558    /// Callers should prefer `merge_all`, as the process of merging the data
559    /// for a particular table may cause other updates to be buffered
560    /// elesewhere. The `merge_all` method runs merges to a fixed point to avoid
561    /// surprises here.
562    pub fn merge_table(&mut self, table: TableId) -> bool {
563        let mut info = self.tables.unwrap_val(table);
564        self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
565        let table_changed = info.table.merge(&mut ExecutionState::new(
566            self.read_only_view(),
567            Default::default(),
568        ));
569        self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
570        self.tables.insert(table, info);
571        table_changed.added
572    }
573
574    /// Get id of the next table to be added to the database.
575    ///
576    /// This can be useful for "knot tying", when tables need to reference their
577    /// own id.
578    pub fn next_table_id(&self) -> TableId {
579        self.tables.next_id()
580    }
581
582    /// Add a table with the given schema to the database.
583    ///
584    /// The table must have a compatible spec with `types` (e.g. same number of
585    /// columns).
586    pub fn add_table<T: Table + Sized + 'static>(
587        &mut self,
588        table: T,
589        read_deps: impl IntoIterator<Item = TableId>,
590        write_deps: impl IntoIterator<Item = TableId>,
591    ) -> TableId {
592        self.add_table_impl(table, None, read_deps, write_deps)
593    }
594
595    pub fn add_table_named<T: Table + Sized + 'static>(
596        &mut self,
597        table: T,
598        name: Arc<str>,
599        read_deps: impl IntoIterator<Item = TableId>,
600        write_deps: impl IntoIterator<Item = TableId>,
601    ) -> TableId {
602        self.add_table_impl(table, Some(name), read_deps, write_deps)
603    }
604
605    fn add_table_impl<T: Table + Sized + 'static>(
606        &mut self,
607        table: T,
608        name: Option<Arc<str>>,
609        read_deps: impl IntoIterator<Item = TableId>,
610        write_deps: impl IntoIterator<Item = TableId>,
611    ) -> TableId {
612        let spec = table.spec();
613        let table = WrappedTable::new(table);
614        let res = self.tables.push(TableInfo {
615            name,
616            spec,
617            table,
618            indexes: IndexCatalog::new(),
619            column_indexes: IndexCatalog::new(),
620        });
621        self.deps.add_table(res, read_deps, write_deps);
622        res
623    }
624
625    /// Get direct mutable access to the table.
626    ///
627    /// This method is useful for out-of-band access to databse state.
628    ///
629    /// **NOTE:** It is legal to call [`Table::new_buffer`] on the returned table handle, and use
630    /// that to stage updates to the given table via [`MutationBuffer::stage_insert`] or
631    /// [`MutationBuffer::stage_remove`], however this is *likely to be a source of bugs*.
632    ///
633    /// Updates staged in this way will not cause `table` to be marked as having pending changes in
634    /// the next call to [`Database::merge_all`]. Instead, such users should use
635    /// [`Database::new_buffer`], which plumbs this signal through correctly, or better yet,
636    /// perform all updates through an [`ExecutionState`] or a [`crate::RuleBuilder`]. If these
637    /// options do not work, then calling [`Database::merge_table`] directly will force a merge
638    /// call on the table.
639    pub fn get_table(&self, table: TableId) -> &WrappedTable {
640        &self
641            .tables
642            .get(table)
643            .expect("must access a table that has been declared in this database")
644            .table
645    }
646
647    /// Get a handle on the given table along with metadata about it.
648    ///
649    ///
650    /// **NOTE:** See the note on [`Database::get_table`] around manually staging updates.
651    pub fn get_table_info(&self, table: TableId) -> &TableInfo {
652        self.tables
653            .get(table)
654            .expect("must access a table that has been declared in this database")
655    }
656
657    /// Create a new mutation buffer for the table with id `id`.
658    ///
659    /// This will marked the given table as potentially changed for the next round of merging.
660    /// Unlike calling [`Table::new_buffer`] on a table returned from a getter, this method also
661    /// triggers change notification metadata that is read by [`Database::merge_all`].
662    pub fn new_buffer(&self, id: TableId) -> Box<dyn MutationBuffer> {
663        self.notification_list.notify(id);
664        self.get_table(id).new_buffer()
665    }
666
667    pub(crate) fn process_constraints(
668        &self,
669        table: TableId,
670        cs: &[Constraint],
671    ) -> ProcessedConstraints {
672        let table_info = &self.tables[table];
673        let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
674        slow.retain(|c| {
675            let (col, val) = match c {
676                Constraint::EqConst { col, val } => (*col, *val),
677                Constraint::Eq { .. }
678                | Constraint::LtConst { .. }
679                | Constraint::GtConst { .. }
680                | Constraint::LeConst { .. }
681                | Constraint::GeConst { .. } => return true,
682            };
683            // We are looking up by a constant: this is something we can build
684            // an index for as long as the column is cacheable.
685            if *table_info
686                .spec
687                .uncacheable_columns
688                .get(col)
689                .unwrap_or(&false)
690            {
691                return true;
692            }
693            // We have or will build an index: upgrade this constraint to
694            // 'fast'.
695            fast.push(c.clone());
696            let index = get_column_index_from_tableinfo(table_info, col);
697            match index.get().unwrap().get_subset(&val) {
698                Some(s) => {
699                    with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
700                }
701                None => {
702                    // There are no rows matching this key! We can constrain this to nothing.
703                    subset = Subset::empty();
704                }
705            }
706            // Remove this constraint from the slow list.
707            false
708        });
709        ProcessedConstraints { subset, fast, slow }
710    }
711
712    /// Get direct mutable access to the table.
713    ///
714    /// This method is useful for out-of-band access to databse state.
715    ///
716    /// **NOTE:** See the warning around staging updates to handles returned through this method in
717    /// the documentation for [`Database::get_table`].
718    pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
719        &mut *self
720            .tables
721            .get_mut(id)
722            .expect("must access a table that has been declared in this database")
723            .table
724    }
725
726    pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
727        plan::plan_query(query)
728    }
729}
730
731impl Drop for Database {
732    fn drop(&mut self) {
733        // Clean up the ambient thread pool.
734        //
735        // Calling mem::forget on the egraph can result in much faster execution times.
736        with_pool_set(PoolSet::clear);
737        rayon::broadcast(|_| with_pool_set(PoolSet::clear));
738    }
739}
740
741/// The core logic behind getting and updating a hash index.
742///
743/// This is in a separate function to allow us to reuse it while already
744/// borrowing a `TableInfo`.
745fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
746    let index: Arc<_> = table_info.indexes.get_or_insert(cols.into(), || {
747        Arc::new(ResettableOnceLock::new(Index::new(
748            cols.to_vec(),
749            TupleIndex::new(cols.len()),
750        )))
751    });
752    index.get_or_update(|index| {
753        index.refresh(table_info.table.as_ref());
754    });
755    debug_assert!(
756        !index
757            .get()
758            .unwrap()
759            .needs_refresh(table_info.table.as_ref())
760    );
761    index
762}
763
764/// The core logic behind getting and updating a column index.
765///
766/// This is the single-column analog to [`get_index_from_tableinfo`].
767fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
768    let index: Arc<_> = table_info.column_indexes.get_or_insert(col, || {
769        Arc::new(ResettableOnceLock::new(Index::new(
770            vec![col],
771            ColumnIndex::new(),
772        )))
773    });
774    index.get_or_update(|index| {
775        index.refresh(table_info.table.as_ref());
776    });
777    debug_assert!(
778        !index
779            .get()
780            .unwrap()
781            .needs_refresh(table_info.table.as_ref())
782    );
783    index
784}