egglog_core_relations/
query.rs

1//! APIs for building a query of a database.
2
3use std::{iter::once, sync::Arc};
4
5use crate::{
6    free_join::plan::{DecomposedPlan, JoinStageBlocks, SinglePlan},
7    numeric_id::{DenseIdMap, IdVec, NumericId, define_id},
8};
9use smallvec::SmallVec;
10use thiserror::Error;
11
12use crate::{
13    BaseValueId, CounterId, ExternalFunctionId, PoolSet,
14    action::{Instr, QueryEntry, WriteVal},
15    common::HashMap,
16    free_join::{
17        ActionId, AtomId, Database, ProcessedConstraints, SubAtom, TableId, TableInfo, VarInfo,
18        Variable,
19        plan::{JoinHeader, JoinStages, Plan, PlanStrategy},
20    },
21    pool::{Pooled, with_pool_set},
22    table_spec::{ColumnId, Constraint},
23};
24
25define_id!(pub RuleId, u32, "An identifier for a rule in a rule set");
26
27/// Resolves variables and atoms in a rule to their string names.
28#[allow(dead_code)]
29#[derive(Debug, Clone)]
30pub struct SymbolMap {
31    pub atoms: HashMap<AtomId, Arc<str>>,
32    pub vars: HashMap<Variable, Arc<str>>,
33}
34
35/// A cached plan for a given rule.
36pub struct CachedPlan {
37    plan: Plan,
38    desc: Arc<str>,
39    symbol_map: SymbolMap,
40    actions: ActionInfo,
41}
42
43#[derive(Debug, Clone)]
44pub(crate) struct ActionInfo {
45    pub(crate) used_vars: SmallVec<[Variable; 4]>,
46    pub(crate) instrs: Arc<Pooled<Vec<Instr>>>,
47}
48
49/// A set of rules to run against a [`Database`].
50///
51/// See [`Database::new_rule_set`] for more information.
52#[derive(Default)]
53pub struct RuleSet {
54    /// The contents of the queries (i.e. the LHS of the rules) for each rule in the set, along
55    /// with a description of the rule.
56    ///
57    /// The action here is used to map between rule descriptions and plans, which contain ActionIds. The current
58    /// accounting logic assumes that rules and actions stand in a bijection. If we relaxed that
59    /// later on, most of the core logic would still work but the accounting logic could get more
60    /// complex.
61    pub(crate) plans: IdVec<RuleId, (Plan, Arc<str> /* description */, SymbolMap)>,
62    pub(crate) actions: DenseIdMap<ActionId, ActionInfo>,
63}
64
65impl RuleSet {
66    pub fn build_cached_plan(&self, rule_id: RuleId) -> CachedPlan {
67        let (plan, desc, symbol_map) = self.plans.get(rule_id).expect("rule must exist");
68        let actions = self
69            .actions
70            .get(plan.actions())
71            .expect("action must exist")
72            .clone();
73        CachedPlan {
74            plan: plan.clone(),
75            desc: desc.clone(),
76            symbol_map: symbol_map.clone(),
77            actions,
78        }
79    }
80}
81
82/// Builder for a [`RuleSet`].
83///
84/// There are in general two ways to add rules to a rule set:
85///
86/// 1. Use the QueryBuilder and RuleBuilder APIs to construct a rule from scratch.
87/// 2. Use a previously cached plan and add extra constraints to it.
88///
89/// The pattern this is used by egglog is as follows: An egglog rule is first compiled to a cached
90/// plan using builder patterns at declaration time, and each time the rule is run, it is added to
91/// a ruleset using the cached plan and possibly some extra constraints (e.g., timestamp).
92///
93/// See [`Database::new_rule_set`] for more information.
94pub struct RuleSetBuilder<'outer> {
95    rule_set: RuleSet,
96    db: &'outer mut Database,
97}
98
99impl<'outer> RuleSetBuilder<'outer> {
100    pub fn new(db: &'outer mut Database) -> Self {
101        Self {
102            rule_set: Default::default(),
103            db,
104        }
105    }
106
107    /// Estimate the size of the subset of the table matching the given
108    /// constraint.
109    ///
110    /// This is a wrapper around the [`Database::estimate_size`] method.
111    pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
112        self.db.estimate_size(table, c)
113    }
114
115    /// Add a rule to this rule set.
116    pub fn new_rule<'a>(&'a mut self) -> QueryBuilder<'outer, 'a> {
117        let instrs = with_pool_set(PoolSet::get);
118        QueryBuilder {
119            rsb: self,
120            instrs,
121            query: Query {
122                var_info: Default::default(),
123                atoms: Default::default(),
124                // start with an invalid ActionId
125                action: ActionId::new(u32::MAX),
126                plan_strategy: Default::default(),
127                fun_deps: Default::default(),
128            },
129        }
130    }
131
132    fn reprocess_constraints(
133        &self,
134        table: TableId,
135        atom: AtomId,
136        constraints: &[Constraint],
137    ) -> Option<JoinHeader> {
138        let processed = self.db.process_constraints(table, constraints);
139        if !processed.slow.is_empty() {
140            panic!(
141                "Cached plans only support constraints with a fast pushdown. \
142                 Got: {constraints:?} for table {table:?}",
143            );
144        }
145        if processed.subset.size() == 0 {
146            return None;
147        }
148        Some(JoinHeader {
149            atom,
150            constraints: processed.fast,
151            subset: processed.subset,
152        })
153    }
154
155    fn push_extra_constraints(
156        &self,
157        headers: &mut Vec<JoinHeader>,
158        atoms: &Arc<DenseIdMap<AtomId, Atom>>,
159        extra_constraints: &[(AtomId, Constraint)],
160    ) -> Option<()> {
161        for (atom_id, constraint) in extra_constraints {
162            let atom_info = atoms.get(*atom_id).expect("atom must exist in plan");
163            let table = atom_info.table;
164            headers.push(self.reprocess_constraints(
165                table,
166                *atom_id,
167                std::slice::from_ref(constraint),
168            )?);
169        }
170        Some(())
171    }
172
173    fn reprocess_existing_headers(
174        &self,
175        headers: &mut Vec<JoinHeader>,
176        atoms: &Arc<DenseIdMap<AtomId, Atom>>,
177        existing: &[JoinHeader],
178    ) -> Option<()> {
179        for JoinHeader {
180            atom, constraints, ..
181        } in existing
182        {
183            let atom_info = atoms.get(*atom).expect("atom must exist in plan");
184            let table = atom_info.table;
185            headers.push(self.reprocess_constraints(table, *atom, constraints)?);
186        }
187        Some(())
188    }
189
190    fn get_rule_with_extra_constraints(
191        &self,
192        cached: &CachedPlan,
193        action_id: ActionId,
194        extra_constraints: &[(AtomId, Constraint)],
195    ) -> Option<Plan> {
196        match &cached.plan {
197            Plan::SinglePlan(cached_plan) => {
198                let mut headers = vec![];
199                let stages = JoinStages {
200                    instrs: cached_plan.stages.instrs.clone(),
201                };
202                self.push_extra_constraints(&mut headers, &cached_plan.atoms, extra_constraints)?;
203                self.reprocess_existing_headers(
204                    &mut headers,
205                    &cached_plan.atoms,
206                    &cached_plan.header,
207                )?;
208                Some(Plan::SinglePlan(SinglePlan {
209                    atoms: cached_plan.atoms.clone(),
210                    header: headers,
211                    stages,
212                    actions: action_id,
213                }))
214            }
215            Plan::DecomposedPlan(cached_plan) => {
216                let mut blocks = Vec::with_capacity(cached_plan.stages.blocks.len());
217                let mut headers = vec![];
218                self.push_extra_constraints(&mut headers, &cached_plan.atoms, extra_constraints)?;
219                self.reprocess_existing_headers(
220                    &mut headers,
221                    &cached_plan.atoms,
222                    &cached_plan.header,
223                )?;
224                for cached_block in cached_plan.stages.blocks.iter() {
225                    let stages = JoinStages {
226                        instrs: cached_block.0.instrs.clone(),
227                    };
228                    blocks.push((stages, cached_block.1.clone()));
229                }
230                let result_block = JoinStages {
231                    instrs: cached_plan.result_block.instrs.clone(),
232                };
233                Some(Plan::DecomposedPlan(DecomposedPlan {
234                    atoms: cached_plan.atoms.clone(),
235                    header: headers,
236                    stages: JoinStageBlocks { blocks },
237                    actions: action_id,
238                    result_block,
239                }))
240            }
241        }
242    }
243
244    /// Add a rule to this rule set based on a previously cached plan, optionally
245    /// with additional constraints applied on top.
246    ///
247    /// Returns `None` if the query is provably empty given the current database
248    /// state (i.e. some constraint narrows a table to zero matching rows), in
249    /// which case no rule or action is allocated. Returns `Some(RuleId)` otherwise.
250    ///
251    /// The primary use-case is seminaive evaluation: an egglog rule is compiled
252    /// once into a [`CachedPlan`] and then added to a fresh [`RuleSet`] each
253    /// iteration with timestamp constraints (e.g. `GeConst` on the focus atom)
254    /// that select only new tuples. If no new tuples exist for an atom, the
255    /// `None` return allows the caller to skip that variant entirely.
256    pub fn add_rule_from_cached_plan(
257        &mut self,
258        cached: &CachedPlan,
259        extra_constraints: &[(AtomId, Constraint)],
260    ) -> Option<RuleId> {
261        // Peek at the action_id without allocating it yet, so we don't break
262        // the rules<->actions bijection if the query turns out to be empty.
263        let action_id = self.rule_set.actions.next_id();
264        let plan = self.get_rule_with_extra_constraints(cached, action_id, extra_constraints)?;
265        // The query is non-empty: now commit the action and the plan.
266        let actual_action_id = self.rule_set.actions.push(cached.actions.clone());
267        debug_assert_eq!(action_id, actual_action_id);
268        Some(
269            self.rule_set
270                .plans
271                .push((plan, cached.desc.clone(), cached.symbol_map.clone())),
272        )
273    }
274
275    /// Build the ruleset.
276    pub fn build(self) -> RuleSet {
277        self.rule_set
278    }
279}
280
281/// Builder for the "query" portion of the rule.
282///
283/// Queries specify scans or joins over the database that bind variables that
284/// are accessible to rules.
285pub struct QueryBuilder<'outer, 'a> {
286    rsb: &'a mut RuleSetBuilder<'outer>,
287    query: Query,
288    instrs: Pooled<Vec<Instr>>,
289}
290
291impl<'outer, 'a> QueryBuilder<'outer, 'a> {
292    /// Finish the query and start building the right-hand side of the rule.
293    pub fn build(self) -> RuleBuilder<'outer, 'a> {
294        RuleBuilder { qb: self }
295    }
296
297    /// Set the target plan strategy to use to execute this query.
298    pub fn set_plan_strategy(&mut self, strategy: PlanStrategy) {
299        self.query.plan_strategy = strategy;
300    }
301
302    /// Create a new variable of the given type.
303    pub fn new_var(&mut self) -> Variable {
304        self.query.var_info.push(VarInfo {
305            occurrences: Default::default(),
306            used_in_rhs: false,
307            defined_in_rhs: false,
308            name: None,
309        })
310    }
311
312    pub fn new_var_named(&mut self, name: &str) -> Variable {
313        self.query.var_info.push(VarInfo {
314            occurrences: Default::default(),
315            used_in_rhs: false,
316            defined_in_rhs: false,
317            name: Some(name.into()),
318        })
319    }
320
321    fn mark_used<'b>(&mut self, entries: impl IntoIterator<Item = &'b QueryEntry>) {
322        for entry in entries {
323            if let QueryEntry::Var(v) = entry {
324                self.query.var_info[*v].used_in_rhs = true;
325            }
326        }
327    }
328
329    fn mark_defined(&mut self, entry: &QueryEntry) {
330        // TODO: use some of this information in query planning, e.g. dedup at match time.
331        if let QueryEntry::Var(v) = entry {
332            self.query.var_info[*v].defined_in_rhs = true;
333        }
334    }
335
336    /// Add the given atom to the query, with the given variables and constraints.
337    ///
338    /// NB: it is possible to constrain two non-equal variables to be equal
339    /// given this setup. Doing this will not cause any problems but
340    /// nevertheless is not recommended.
341    ///
342    /// The returned `AtomId` can be used to refer to this atom when adding constraints in
343    /// [`RuleSetBuilder::add_rule_from_cached_plan`].
344    ///
345    /// # Panics
346    /// Like most methods that take a [`TableId`], this method will panic if the
347    /// given table is not declared in the corresponding database.
348    pub fn add_atom<'b>(
349        &mut self,
350        table_id: TableId,
351        vars: &[QueryEntry],
352        cs: impl IntoIterator<Item = &'b Constraint>,
353    ) -> Result<AtomId, QueryError> {
354        let info = &self.rsb.db.tables[table_id];
355        let arity = info.spec.arity();
356        let check_constraint = |c: &Constraint| {
357            let process_col = |col: &ColumnId| -> Result<(), QueryError> {
358                if col.index() >= arity {
359                    Err(QueryError::InvalidConstraint {
360                        constraint: c.clone(),
361                        column: col.index(),
362                        table: table_id,
363                        arity,
364                    })
365                } else {
366                    Ok(())
367                }
368            };
369            match c {
370                Constraint::Eq { l_col, r_col } => {
371                    process_col(l_col)?;
372                    process_col(r_col)
373                }
374                Constraint::EqConst { col, .. }
375                | Constraint::LtConst { col, .. }
376                | Constraint::GtConst { col, .. }
377                | Constraint::LeConst { col, .. }
378                | Constraint::GeConst { col, .. } => process_col(col),
379            }
380        };
381        if arity != vars.len() {
382            return Err(QueryError::BadArity {
383                table: table_id,
384                expected: arity,
385                got: vars.len(),
386            });
387        }
388        let cs = Vec::from_iter(
389            cs.into_iter()
390                .cloned()
391                .chain(vars.iter().enumerate().filter_map(|(i, qe)| match qe {
392                    QueryEntry::Var(_) => None,
393                    QueryEntry::Const(c) => Some(Constraint::EqConst {
394                        col: ColumnId::from_usize(i),
395                        val: *c,
396                    }),
397                })),
398        );
399        cs.iter().try_fold((), |_, c| check_constraint(c))?;
400        let processed = self.rsb.db.process_constraints(table_id, &cs);
401        let mut atom = Atom {
402            table: table_id,
403            var_columns: Default::default(),
404            constraints: processed,
405        };
406        let next_atom = AtomId::from_usize(self.query.atoms.n_ids());
407        let mut subatoms = HashMap::<Variable, SubAtom>::default();
408        for (i, qe) in vars.iter().enumerate() {
409            let var = match qe {
410                QueryEntry::Var(var) => *var,
411                QueryEntry::Const(_) => {
412                    continue;
413                }
414            };
415            if var == Variable::placeholder() {
416                continue;
417            }
418            let col = ColumnId::from_usize(i);
419            if let Some(prev) = atom.var_columns.insert(var, col) {
420                atom.constraints.slow.push(Constraint::Eq {
421                    l_col: col,
422                    r_col: prev,
423                })
424            };
425            subatoms
426                .entry(var)
427                .or_insert_with(|| SubAtom::new(next_atom))
428                .vars
429                .push(col);
430        }
431        for (var, subatom) in subatoms {
432            self.query
433                .var_info
434                .get_mut(var)
435                .expect("all variables must be bound in current query")
436                .occurrences
437                .push(subatom);
438        }
439
440        // Add functional dependencies for this atom.
441        let get_var = |qe: &QueryEntry| match qe {
442            QueryEntry::Var(v) => Some(*v),
443            QueryEntry::Const(_) => None,
444        };
445        let antecedent = vars[..info.spec().n_keys]
446            .iter()
447            .filter_map(get_var)
448            .collect::<Vec<_>>();
449        let consequent = vars[info.spec().n_keys..]
450            .iter()
451            .filter_map(get_var)
452            .collect::<Vec<_>>();
453        self.query.fun_deps.add_dependency(antecedent, consequent);
454
455        Ok(self.query.atoms.push(atom))
456    }
457}
458
459#[derive(Debug, Error)]
460pub enum QueryError {
461    #[error("table {table:?} has {expected:?} keys but got {got:?}")]
462    KeyArityMismatch {
463        table: TableId,
464        expected: usize,
465        got: usize,
466    },
467    #[error("table {table:?} has {expected:?} columns but got {got:?}")]
468    TableArityMismatch {
469        table: TableId,
470        expected: usize,
471        got: usize,
472    },
473
474    #[error(
475        "counter used in column {column_id:?} of table {table:?}, which is declared as a base value"
476    )]
477    CounterUsedInBaseColumn {
478        table: TableId,
479        column_id: ColumnId,
480        base: BaseValueId,
481    },
482
483    #[error("attempt to compare two groups of values, one of length {l}, another of length {r}")]
484    MultiComparisonMismatch { l: usize, r: usize },
485
486    #[error("table {table:?} expected {expected:?} columns but got {got:?}")]
487    BadArity {
488        table: TableId,
489        expected: usize,
490        got: usize,
491    },
492
493    #[error("expected {expected:?} columns in schema but got {got:?}")]
494    InvalidSchema { expected: usize, got: usize },
495
496    #[error(
497        "constraint {constraint:?} on table {table:?} references column {column:?}, but the table has arity {arity:?}"
498    )]
499    InvalidConstraint {
500        constraint: Constraint,
501        column: usize,
502        table: TableId,
503        arity: usize,
504    },
505}
506
507/// Builder for the "action" portion of the rule.
508///
509/// Rules can refer to the variables bound in their query to modify the database.
510pub struct RuleBuilder<'outer, 'a> {
511    qb: QueryBuilder<'outer, 'a>,
512}
513
514impl RuleBuilder<'_, '_> {
515    fn table_info(&self, table: TableId) -> &TableInfo {
516        self.qb.rsb.db.get_table_info(table)
517    }
518
519    /// Build the finished query.
520    pub fn build(self) -> RuleId {
521        self.build_with_description("")
522    }
523
524    fn build_symbol_map(&self) -> SymbolMap {
525        let var_info = &self.qb.query.var_info;
526        SymbolMap {
527            atoms: self
528                .qb
529                .query
530                .atoms
531                .iter()
532                .filter_map(|(id, atom)| {
533                    let name = self.table_info(atom.table).name.clone();
534                    name.map(|name| (id, name))
535                })
536                .collect(),
537            vars: var_info
538                .iter()
539                .filter_map(|(id, info)| info.name.as_ref().map(|name| (id, name.clone())))
540                .collect(),
541        }
542    }
543
544    pub fn build_with_description(mut self, desc: impl Into<String>) -> RuleId {
545        let var_info = &self.qb.query.var_info;
546        let symbol_map = self.build_symbol_map();
547        // Generate an id for our actions and slot them in.
548        let used_vars = SmallVec::from_iter(var_info.iter().filter_map(|(v, info)| {
549            if info.used_in_rhs && !info.defined_in_rhs {
550                Some(v)
551            } else {
552                None
553            }
554        }));
555        let action_id = self.qb.rsb.rule_set.actions.push(ActionInfo {
556            instrs: Arc::new(self.qb.instrs),
557            used_vars,
558        });
559        self.qb.query.action = action_id;
560        // Plan the query
561        let plan = self.qb.rsb.db.plan_query(self.qb.query);
562        let desc: String = desc.into();
563        // Add it to the ruleset.
564        self.qb
565            .rsb
566            .rule_set
567            .plans
568            .push((plan, desc.into(), symbol_map))
569    }
570
571    /// Return a variable containing the result of reading the specified counter.
572    pub fn read_counter(&mut self, counter: CounterId) -> Variable {
573        let dst = self.qb.new_var();
574        self.qb.instrs.push(Instr::ReadCounter { counter, dst });
575        self.qb.mark_defined(&dst.into());
576        dst
577    }
578
579    /// Return a variable containing the result of looking up the specified
580    /// column from the row corresponding to given keys in the given
581    /// table.
582    ///
583    /// If the key does not currently have a mapping in the table, the values
584    /// specified by `default_vals` will be inserted.
585    pub fn lookup_or_insert(
586        &mut self,
587        table: TableId,
588        args: &[QueryEntry],
589        default_vals: &[WriteVal],
590        dst_col: ColumnId,
591    ) -> Result<Variable, QueryError> {
592        let table_info = self.table_info(table);
593        self.validate_keys(table, table_info, args)?;
594        self.validate_vals(table, table_info, default_vals.iter())?;
595        let res = self.qb.new_var();
596        self.qb.instrs.push(Instr::LookupOrInsertDefault {
597            table,
598            args: args.to_vec(),
599            default: default_vals.to_vec(),
600            dst_col,
601            dst_var: res,
602        });
603        self.qb.mark_used(args);
604        self.qb
605            .mark_used(default_vals.iter().filter_map(|x| match x {
606                WriteVal::QueryEntry(qe) => Some(qe),
607                WriteVal::IncCounter(_) | WriteVal::CurrentVal(_) => None,
608            }));
609        self.qb.mark_defined(&res.into());
610        Ok(res)
611    }
612
613    /// Return a variable containing the result of looking up the specified
614    /// column from the row corresponding to given keys in the given
615    /// table.
616    ///
617    /// If the key does not currently have a mapping in the table, the variable
618    /// takes the value of `default`.
619    pub fn lookup_with_default(
620        &mut self,
621        table: TableId,
622        args: &[QueryEntry],
623        default: QueryEntry,
624        dst_col: ColumnId,
625    ) -> Result<Variable, QueryError> {
626        let table_info = self.table_info(table);
627        self.validate_keys(table, table_info, args)?;
628        let res = self.qb.new_var();
629        self.qb.instrs.push(Instr::LookupWithDefault {
630            table,
631            args: args.to_vec(),
632            dst_col,
633            dst_var: res,
634            default,
635        });
636        self.qb.mark_used(args);
637        self.qb.mark_used(&[default]);
638        self.qb.mark_defined(&res.into());
639        Ok(res)
640    }
641
642    /// Return a variable containing the result of looking up the specified
643    /// column from the row corresponding to given keys in the given
644    /// table.
645    ///
646    /// If the key does not currently have a mapping in the table, execution of
647    /// the rule is halted.
648    pub fn lookup(
649        &mut self,
650        table: TableId,
651        args: &[QueryEntry],
652        dst_col: ColumnId,
653    ) -> Result<Variable, QueryError> {
654        let table_info = self.table_info(table);
655        self.validate_keys(table, table_info, args)?;
656        let res = self.qb.new_var();
657        self.qb.instrs.push(Instr::Lookup {
658            table,
659            args: args.to_vec(),
660            dst_col,
661            dst_var: res,
662        });
663        self.qb.mark_used(args);
664        self.qb.mark_defined(&res.into());
665        Ok(res)
666    }
667
668    /// Insert the specified values into the given table.
669    pub fn insert(&mut self, table: TableId, vals: &[QueryEntry]) -> Result<(), QueryError> {
670        let table_info = self.table_info(table);
671        self.validate_row(table, table_info, vals)?;
672        self.qb.instrs.push(Instr::Insert {
673            table,
674            vals: vals.to_vec(),
675        });
676        self.qb.mark_used(vals);
677        Ok(())
678    }
679
680    /// Insert the specified values into the given table if `l` and `r` are equal.
681    pub fn insert_if_eq(
682        &mut self,
683        table: TableId,
684        l: QueryEntry,
685        r: QueryEntry,
686        vals: &[QueryEntry],
687    ) -> Result<(), QueryError> {
688        let table_info = self.table_info(table);
689        self.validate_row(table, table_info, vals)?;
690        self.qb.instrs.push(Instr::InsertIfEq {
691            table,
692            l,
693            r,
694            vals: vals.to_vec(),
695        });
696        self.qb
697            .mark_used(vals.iter().chain(once(&l)).chain(once(&r)));
698        Ok(())
699    }
700
701    /// Remove the specified entry from the given table, if it is there.
702    pub fn remove(&mut self, table: TableId, args: &[QueryEntry]) -> Result<(), QueryError> {
703        let table_info = self.table_info(table);
704        self.validate_keys(table, table_info, args)?;
705        self.qb.instrs.push(Instr::Remove {
706            table,
707            args: args.to_vec(),
708        });
709        self.qb.mark_used(args);
710        Ok(())
711    }
712
713    /// Apply the given external function to the specified arguments.
714    pub fn call_external(
715        &mut self,
716        func: ExternalFunctionId,
717        args: &[QueryEntry],
718    ) -> Result<Variable, QueryError> {
719        let res = self.qb.new_var();
720        self.qb.instrs.push(Instr::External {
721            func,
722            args: args.to_vec(),
723            dst: res,
724        });
725        self.qb.mark_used(args);
726        self.qb.mark_defined(&res.into());
727        Ok(res)
728    }
729
730    /// Look up the given key in the given table. If the lookup fails, then call the given external
731    /// function with the given arguments. Bind the result to the returned variable. If the
732    /// external function returns None (and the lookup fails) then the execution of the rule halts.
733    pub fn lookup_with_fallback(
734        &mut self,
735        table: TableId,
736        key: &[QueryEntry],
737        dst_col: ColumnId,
738        func: ExternalFunctionId,
739        func_args: &[QueryEntry],
740    ) -> Result<Variable, QueryError> {
741        let table_info = self.table_info(table);
742        self.validate_keys(table, table_info, key)?;
743        let res = self.qb.new_var();
744        self.qb.instrs.push(Instr::LookupWithFallback {
745            table,
746            table_key: key.to_vec(),
747            func,
748            func_args: func_args.to_vec(),
749            dst_var: res,
750            dst_col,
751        });
752        self.qb.mark_used(key);
753        self.qb.mark_used(func_args);
754        self.qb.mark_defined(&res.into());
755        Ok(res)
756    }
757
758    pub fn call_external_with_fallback(
759        &mut self,
760        f1: ExternalFunctionId,
761        args1: &[QueryEntry],
762        f2: ExternalFunctionId,
763        args2: &[QueryEntry],
764    ) -> Result<Variable, QueryError> {
765        let res = self.qb.new_var();
766        self.qb.instrs.push(Instr::ExternalWithFallback {
767            f1,
768            args1: args1.to_vec(),
769            f2,
770            args2: args2.to_vec(),
771            dst: res,
772        });
773        self.qb.mark_used(args1);
774        self.qb.mark_used(args2);
775        self.qb.mark_defined(&res.into());
776        Ok(res)
777    }
778
779    /// Continue execution iff the two arguments are equal.
780    pub fn assert_eq(&mut self, l: QueryEntry, r: QueryEntry) {
781        self.qb.instrs.push(Instr::AssertEq(l, r));
782        self.qb.mark_used(&[l, r]);
783    }
784
785    /// Continue execution iff the two arguments are not equal.
786    pub fn assert_ne(&mut self, l: QueryEntry, r: QueryEntry) -> Result<(), QueryError> {
787        self.qb.instrs.push(Instr::AssertNe(l, r));
788        self.qb.mark_used(&[l, r]);
789        Ok(())
790    }
791
792    /// Continue execution iff there is some `i` such that `l[i] != r[i]`.
793    ///
794    /// This is useful when doing egglog-style rebuilding.
795    pub fn assert_any_ne(&mut self, l: &[QueryEntry], r: &[QueryEntry]) -> Result<(), QueryError> {
796        if l.len() != r.len() {
797            return Err(QueryError::MultiComparisonMismatch {
798                l: l.len(),
799                r: r.len(),
800            });
801        }
802
803        let mut ops = Vec::with_capacity(l.len() + r.len());
804        ops.extend_from_slice(l);
805        ops.extend_from_slice(r);
806        self.qb.instrs.push(Instr::AssertAnyNe {
807            ops,
808            divider: l.len(),
809        });
810        self.qb.mark_used(l);
811        self.qb.mark_used(r);
812        Ok(())
813    }
814
815    fn validate_row(
816        &self,
817        table: TableId,
818        info: &TableInfo,
819        vals: &[QueryEntry],
820    ) -> Result<(), QueryError> {
821        if vals.len() != info.spec.arity() {
822            Err(QueryError::TableArityMismatch {
823                table,
824                expected: info.spec.arity(),
825                got: vals.len(),
826            })
827        } else {
828            Ok(())
829        }
830    }
831
832    fn validate_keys(
833        &self,
834        table: TableId,
835        info: &TableInfo,
836        keys: &[QueryEntry],
837    ) -> Result<(), QueryError> {
838        if keys.len() != info.spec.n_keys {
839            Err(QueryError::KeyArityMismatch {
840                table,
841                expected: info.spec.n_keys,
842                got: keys.len(),
843            })
844        } else {
845            Ok(())
846        }
847    }
848
849    fn validate_vals<'b>(
850        &self,
851        table: TableId,
852        info: &TableInfo,
853        vals: impl Iterator<Item = &'b WriteVal>,
854    ) -> Result<(), QueryError> {
855        for (i, _) in vals.enumerate() {
856            let col = i + info.spec.n_keys;
857            if col >= info.spec.arity() {
858                return Err(QueryError::TableArityMismatch {
859                    table,
860                    expected: info.spec.arity(),
861                    got: col,
862                });
863            }
864        }
865        Ok(())
866    }
867}
868
869#[derive(Debug, Clone)]
870pub(crate) struct Atom {
871    pub(crate) table: TableId,
872    pub(crate) var_columns: VarColumnMap,
873    /// These constraints are an initial take at processing "fast" constraints as well as a
874    /// potential list of "slow" constraints.
875    ///
876    /// Fast constraints get re-computed when queries are executed. In particular, this makes it
877    /// possible to cache plans and add new fast constraints to them without re-planning.
878    pub(crate) constraints: ProcessedConstraints,
879}
880
881impl Atom {
882    pub(crate) fn vars(&self) -> impl Iterator<Item = Variable> + '_ {
883        self.var_columns.vars()
884    }
885
886    pub(crate) fn get_var(&self, col: ColumnId) -> Option<Variable> {
887        self.var_columns.get_var(col)
888    }
889
890    pub(crate) fn get_col(&self, var: Variable) -> Option<ColumnId> {
891        self.var_columns.get_col(var)
892    }
893}
894
895#[derive(Clone, Default)]
896pub(crate) struct VarColumnMap {
897    var_to_column: DenseIdMap<Variable, ColumnId>,
898    column_to_var: DenseIdMap<ColumnId, Variable>,
899}
900
901impl std::fmt::Debug for VarColumnMap {
902    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
903        let mut entries: Vec<_> = self.column_to_var.iter().collect();
904        entries.sort_by_key(|(col, _)| col.index());
905
906        f.write_str("VarColumnMap(")?;
907        for (i, (col, var)) in entries.iter().enumerate() {
908            if i > 0 {
909                f.write_str(", ")?;
910            }
911            write!(f, "{col:?} -> {var:?}")?;
912        }
913        f.write_str(")")
914    }
915}
916
917impl VarColumnMap {
918    pub(crate) fn insert(&mut self, var: Variable, col: ColumnId) -> Option<ColumnId> {
919        let prev = self.var_to_column.insert(var, col);
920        self.column_to_var.insert(col, var);
921        prev
922    }
923
924    pub(crate) fn get_col(&self, var: Variable) -> Option<ColumnId> {
925        self.var_to_column.get(var).copied()
926    }
927
928    pub(crate) fn get_var(&self, col: ColumnId) -> Option<Variable> {
929        self.column_to_var.get(col).copied()
930    }
931
932    pub(crate) fn iter(&self) -> impl Iterator<Item = (ColumnId, Variable)> + '_ {
933        self.column_to_var.iter().map(|(col, var)| (col, *var))
934    }
935
936    pub(crate) fn vars(&self) -> impl Iterator<Item = Variable> + '_ {
937        self.iter().map(|(_, var)| var)
938    }
939
940    pub(crate) fn is_empty(&self) -> bool {
941        self.var_to_column.len() == 0
942    }
943}
944
945/// A functional dependency inferencer.
946///
947/// A functional dependency (x, y, ...) -> (u, v, ...) means that if we know
948/// the values of x, y, ..., then we can determine u, v, ...
949///
950/// This data structure can compute the closure of a set of variables under
951/// a set of functional dependencies.
952#[derive(Clone, Default)]
953pub(crate) struct FunDeps {
954    /// List of functional dependencies (antecedent -> consequent)
955    dependencies: Vec<(Vec<Variable>, Vec<Variable>)>,
956}
957
958impl FunDeps {
959    /// Add a functional dependency: antecedent -> consequent.
960    pub fn add_dependency(&mut self, antecedent: Vec<Variable>, consequent: Vec<Variable>) {
961        // Don't add trivial dependencies.
962        if !antecedent.is_empty() {
963            self.dependencies.push((antecedent, consequent));
964        }
965    }
966
967    /// Returns all variables that can be determined from the input variables
968    /// using the functional dependencies.
969    pub fn closure(
970        &self,
971        variables: impl IntoIterator<Item = Variable>,
972    ) -> DenseIdMap<Variable, ()> {
973        let mut result: DenseIdMap<Variable, ()> =
974            DenseIdMap::from_iter(variables.into_iter().map(|v| (v, ())));
975        let mut changed = true;
976
977        while changed {
978            changed = false;
979            for (antecedent, consequent) in &self.dependencies {
980                // If all variables in the antecedent are in the result,
981                // add all variables in the consequent.
982                if antecedent.iter().all(|v| result.contains_key(*v)) {
983                    for v in consequent {
984                        if !result.contains_key(*v) {
985                            result.insert(*v, ());
986                            changed = true;
987                        }
988                    }
989                }
990            }
991        }
992
993        result
994    }
995}
996
997impl std::fmt::Debug for FunDeps {
998    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
999        use std::fmt::Write;
1000
1001        let mut deps = String::new();
1002
1003        for (i, (ant, cons)) in self.dependencies.iter().enumerate() {
1004            if i > 0 {
1005                deps.push_str("; ");
1006            }
1007
1008            deps.push('{');
1009            for (j, v) in ant.iter().enumerate() {
1010                if j > 0 {
1011                    deps.push_str(", ");
1012                }
1013                write!(&mut deps, "{v:?}")?;
1014            }
1015            deps.push('}');
1016
1017            deps.push_str(" -> ");
1018
1019            deps.push('{');
1020            for (j, v) in cons.iter().enumerate() {
1021                if j > 0 {
1022                    deps.push_str(", ");
1023                }
1024                write!(&mut deps, "{v:?}")?;
1025            }
1026            deps.push('}');
1027        }
1028
1029        write!(f, "FunDeps {{ {deps} }}")
1030    }
1031}
1032
1033pub(crate) struct Query {
1034    pub(crate) var_info: DenseIdMap<Variable, VarInfo>,
1035    pub(crate) atoms: DenseIdMap<AtomId, Atom>,
1036    pub(crate) action: ActionId,
1037    pub(crate) plan_strategy: PlanStrategy,
1038    pub(crate) fun_deps: FunDeps,
1039}