egglog_core_relations/
query.rs

1//! APIs for building a query of a database.
2
3use std::{iter::once, sync::Arc};
4
5use crate::{
6    free_join::plan::{DecomposedPlan, JoinStageBlocks, SinglePlan},
7    numeric_id::{DenseIdMap, IdVec, NumericId, define_id},
8};
9use smallvec::SmallVec;
10use thiserror::Error;
11
12use crate::{
13    BaseValueId, CounterId, ExternalFunctionId, PoolSet,
14    action::{Instr, QueryEntry, WriteVal},
15    common::HashMap,
16    free_join::{
17        ActionId, AtomId, Database, ProcessedConstraints, SubAtom, TableId, TableInfo, VarInfo,
18        Variable,
19        plan::{JoinHeader, JoinStages, Plan, PlanStrategy},
20    },
21    pool::{Pooled, with_pool_set},
22    table_spec::{ColumnId, Constraint},
23};
24
25define_id!(pub RuleId, u32, "An identifier for a rule in a rule set");
26
27/// Resolves variables and atoms in a rule to their string names.
28#[allow(dead_code)]
29#[derive(Debug, Clone)]
30pub struct SymbolMap {
31    pub atoms: HashMap<AtomId, Arc<str>>,
32    pub vars: HashMap<Variable, Arc<str>>,
33}
34
35/// A cached plan for a given rule.
36pub struct CachedPlan {
37    plan: Plan,
38    desc: Arc<str>,
39    symbol_map: SymbolMap,
40    actions: ActionInfo,
41}
42
43#[derive(Debug, Clone)]
44pub(crate) struct ActionInfo {
45    pub(crate) used_vars: SmallVec<[Variable; 4]>,
46    pub(crate) instrs: Arc<Pooled<Vec<Instr>>>,
47}
48
49/// A set of rules to run against a [`Database`].
50///
51/// See [`Database::new_rule_set`] for more information.
52#[derive(Default)]
53pub struct RuleSet {
54    /// The contents of the queries (i.e. the LHS of the rules) for each rule in the set, along
55    /// with a description of the rule.
56    ///
57    /// The action here is used to map between rule descriptions and plans, which contain ActionIds. The current
58    /// accounting logic assumes that rules and actions stand in a bijection. If we relaxed that
59    /// later on, most of the core logic would still work but the accounting logic could get more
60    /// complex.
61    pub(crate) plans: IdVec<RuleId, (Plan, Arc<str> /* description */, SymbolMap)>,
62    pub(crate) actions: DenseIdMap<ActionId, ActionInfo>,
63}
64
65impl RuleSet {
66    pub fn build_cached_plan(&self, rule_id: RuleId) -> CachedPlan {
67        let (plan, desc, symbol_map) = self.plans.get(rule_id).expect("rule must exist");
68        let actions = self
69            .actions
70            .get(plan.actions())
71            .expect("action must exist")
72            .clone();
73        CachedPlan {
74            plan: plan.clone(),
75            desc: desc.clone(),
76            symbol_map: symbol_map.clone(),
77            actions,
78        }
79    }
80}
81
82/// Builder for a [`RuleSet`].
83///
84/// There are in general two ways to add rules to a rule set:
85///
86/// 1. Use the QueryBuilder and RuleBuilder APIs to construct a rule from scratch.
87/// 2. Use a previously cached plan and add extra constraints to it.
88///
89/// The pattern this is used by egglog is as follows: An egglog rule is first compiled to a cached
90/// plan using builder patterns at declaration time, and each time the rule is run, it is added to
91/// a ruleset using the cached plan and possibly some extra constraints (e.g., timestamp).
92///
93/// See [`Database::new_rule_set`] for more information.
94pub struct RuleSetBuilder<'outer> {
95    rule_set: RuleSet,
96    db: &'outer mut Database,
97}
98
99impl<'outer> RuleSetBuilder<'outer> {
100    pub fn new(db: &'outer mut Database) -> Self {
101        Self {
102            rule_set: Default::default(),
103            db,
104        }
105    }
106
107    /// Estimate the size of the subset of the table matching the given
108    /// constraint.
109    ///
110    /// This is a wrapper around the [`Database::estimate_size`] method.
111    pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
112        self.db.estimate_size(table, c)
113    }
114
115    /// Add a rule to this rule set.
116    pub fn new_rule<'a>(&'a mut self) -> QueryBuilder<'outer, 'a> {
117        let instrs = with_pool_set(PoolSet::get);
118        QueryBuilder {
119            rsb: self,
120            instrs,
121            query: Query {
122                var_info: Default::default(),
123                atoms: Default::default(),
124                // start with an invalid ActionId
125                action: ActionId::new(u32::MAX),
126                plan_strategy: Default::default(),
127                fun_deps: Default::default(),
128                no_decomp: false,
129            },
130        }
131    }
132
133    fn reprocess_constraints(
134        &self,
135        table: TableId,
136        atom: AtomId,
137        constraints: &[Constraint],
138    ) -> Option<JoinHeader> {
139        let processed = self.db.process_constraints(table, constraints);
140        if !processed.slow.is_empty() {
141            panic!(
142                "Cached plans only support constraints with a fast pushdown. \
143                 Got: {constraints:?} for table {table:?}",
144            );
145        }
146        if processed.subset.size() == 0 {
147            return None;
148        }
149        Some(JoinHeader {
150            atom,
151            constraints: processed.fast,
152            subset: processed.subset,
153        })
154    }
155
156    fn push_extra_constraints(
157        &self,
158        headers: &mut Vec<JoinHeader>,
159        atoms: &Arc<DenseIdMap<AtomId, Atom>>,
160        extra_constraints: &[(AtomId, Constraint)],
161    ) -> Option<()> {
162        for (atom_id, constraint) in extra_constraints {
163            let atom_info = atoms.get(*atom_id).expect("atom must exist in plan");
164            let table = atom_info.table;
165            headers.push(self.reprocess_constraints(
166                table,
167                *atom_id,
168                std::slice::from_ref(constraint),
169            )?);
170        }
171        Some(())
172    }
173
174    fn reprocess_existing_headers(
175        &self,
176        headers: &mut Vec<JoinHeader>,
177        atoms: &Arc<DenseIdMap<AtomId, Atom>>,
178        existing: &[JoinHeader],
179    ) -> Option<()> {
180        for JoinHeader {
181            atom, constraints, ..
182        } in existing
183        {
184            let atom_info = atoms.get(*atom).expect("atom must exist in plan");
185            let table = atom_info.table;
186            headers.push(self.reprocess_constraints(table, *atom, constraints)?);
187        }
188        Some(())
189    }
190
191    fn get_rule_with_extra_constraints(
192        &self,
193        cached: &CachedPlan,
194        action_id: ActionId,
195        extra_constraints: &[(AtomId, Constraint)],
196    ) -> Option<Plan> {
197        match &cached.plan {
198            Plan::SinglePlan(cached_plan) => {
199                let mut headers = vec![];
200                let stages = JoinStages {
201                    instrs: cached_plan.stages.instrs.clone(),
202                };
203                self.push_extra_constraints(&mut headers, &cached_plan.atoms, extra_constraints)?;
204                self.reprocess_existing_headers(
205                    &mut headers,
206                    &cached_plan.atoms,
207                    &cached_plan.header,
208                )?;
209                Some(Plan::SinglePlan(SinglePlan {
210                    atoms: cached_plan.atoms.clone(),
211                    header: headers,
212                    stages,
213                    actions: action_id,
214                }))
215            }
216            Plan::DecomposedPlan(cached_plan) => {
217                let mut blocks = Vec::with_capacity(cached_plan.stages.blocks.len());
218                let mut headers = vec![];
219                self.push_extra_constraints(&mut headers, &cached_plan.atoms, extra_constraints)?;
220                self.reprocess_existing_headers(
221                    &mut headers,
222                    &cached_plan.atoms,
223                    &cached_plan.header,
224                )?;
225                for cached_block in cached_plan.stages.blocks.iter() {
226                    let stages = JoinStages {
227                        instrs: cached_block.0.instrs.clone(),
228                    };
229                    blocks.push((stages, cached_block.1.clone()));
230                }
231                let result_block = JoinStages {
232                    instrs: cached_plan.result_block.instrs.clone(),
233                };
234                Some(Plan::DecomposedPlan(DecomposedPlan {
235                    atoms: cached_plan.atoms.clone(),
236                    header: headers,
237                    stages: JoinStageBlocks { blocks },
238                    actions: action_id,
239                    result_block,
240                }))
241            }
242        }
243    }
244
245    /// Add a rule to this rule set based on a previously cached plan, optionally
246    /// with additional constraints applied on top.
247    ///
248    /// Returns `None` if the query is provably empty given the current database
249    /// state (i.e. some constraint narrows a table to zero matching rows), in
250    /// which case no rule or action is allocated. Returns `Some(RuleId)` otherwise.
251    ///
252    /// The primary use-case is seminaive evaluation: an egglog rule is compiled
253    /// once into a [`CachedPlan`] and then added to a fresh [`RuleSet`] each
254    /// iteration with timestamp constraints (e.g. `GeConst` on the focus atom)
255    /// that select only new tuples. If no new tuples exist for an atom, the
256    /// `None` return allows the caller to skip that variant entirely.
257    pub fn add_rule_from_cached_plan(
258        &mut self,
259        cached: &CachedPlan,
260        extra_constraints: &[(AtomId, Constraint)],
261    ) -> Option<RuleId> {
262        // Peek at the action_id without allocating it yet, so we don't break
263        // the rules<->actions bijection if the query turns out to be empty.
264        let action_id = self.rule_set.actions.next_id();
265        let plan = self.get_rule_with_extra_constraints(cached, action_id, extra_constraints)?;
266        // The query is non-empty: now commit the action and the plan.
267        let actual_action_id = self.rule_set.actions.push(cached.actions.clone());
268        debug_assert_eq!(action_id, actual_action_id);
269        Some(
270            self.rule_set
271                .plans
272                .push((plan, cached.desc.clone(), cached.symbol_map.clone())),
273        )
274    }
275
276    /// Build the ruleset.
277    pub fn build(self) -> RuleSet {
278        self.rule_set
279    }
280}
281
282/// Builder for the "query" portion of the rule.
283///
284/// Queries specify scans or joins over the database that bind variables that
285/// are accessible to rules.
286pub struct QueryBuilder<'outer, 'a> {
287    rsb: &'a mut RuleSetBuilder<'outer>,
288    query: Query,
289    instrs: Pooled<Vec<Instr>>,
290}
291
292impl<'outer, 'a> QueryBuilder<'outer, 'a> {
293    /// Finish the query and start building the right-hand side of the rule.
294    pub fn build(self) -> RuleBuilder<'outer, 'a> {
295        RuleBuilder { qb: self }
296    }
297
298    /// Set the target plan strategy to use to execute this query.
299    pub fn set_plan_strategy(&mut self, strategy: PlanStrategy) {
300        self.query.plan_strategy = strategy;
301    }
302
303    /// If `true`, the query planner will skip tree-decomposition
304    /// for query decomposition and always use evaluate the query as a single bag.
305    pub fn set_no_decomp(&mut self, no_decomp: bool) {
306        self.query.no_decomp = no_decomp;
307    }
308
309    /// Create a new variable of the given type.
310    pub fn new_var(&mut self) -> Variable {
311        self.query.var_info.push(VarInfo {
312            occurrences: Default::default(),
313            used_in_rhs: false,
314            defined_in_rhs: false,
315            name: None,
316        })
317    }
318
319    pub fn new_var_named(&mut self, name: &str) -> Variable {
320        self.query.var_info.push(VarInfo {
321            occurrences: Default::default(),
322            used_in_rhs: false,
323            defined_in_rhs: false,
324            name: Some(name.into()),
325        })
326    }
327
328    fn mark_used<'b>(&mut self, entries: impl IntoIterator<Item = &'b QueryEntry>) {
329        for entry in entries {
330            if let QueryEntry::Var(v) = entry {
331                self.query.var_info[*v].used_in_rhs = true;
332            }
333        }
334    }
335
336    fn mark_defined(&mut self, entry: &QueryEntry) {
337        // TODO: use some of this information in query planning, e.g. dedup at match time.
338        if let QueryEntry::Var(v) = entry {
339            self.query.var_info[*v].defined_in_rhs = true;
340        }
341    }
342
343    /// Add the given atom to the query, with the given variables and constraints.
344    ///
345    /// NB: it is possible to constrain two non-equal variables to be equal
346    /// given this setup. Doing this will not cause any problems but
347    /// nevertheless is not recommended.
348    ///
349    /// The returned `AtomId` can be used to refer to this atom when adding constraints in
350    /// [`RuleSetBuilder::add_rule_from_cached_plan`].
351    ///
352    /// # Panics
353    /// Like most methods that take a [`TableId`], this method will panic if the
354    /// given table is not declared in the corresponding database.
355    pub fn add_atom<'b>(
356        &mut self,
357        table_id: TableId,
358        vars: &[QueryEntry],
359        cs: impl IntoIterator<Item = &'b Constraint>,
360    ) -> Result<AtomId, QueryError> {
361        let info = &self.rsb.db.tables[table_id];
362        let arity = info.spec.arity();
363        let check_constraint = |c: &Constraint| {
364            let process_col = |col: &ColumnId| -> Result<(), QueryError> {
365                if col.index() >= arity {
366                    Err(QueryError::InvalidConstraint {
367                        constraint: c.clone(),
368                        column: col.index(),
369                        table: table_id,
370                        arity,
371                    })
372                } else {
373                    Ok(())
374                }
375            };
376            match c {
377                Constraint::Eq { l_col, r_col } => {
378                    process_col(l_col)?;
379                    process_col(r_col)
380                }
381                Constraint::EqConst { col, .. }
382                | Constraint::LtConst { col, .. }
383                | Constraint::GtConst { col, .. }
384                | Constraint::LeConst { col, .. }
385                | Constraint::GeConst { col, .. } => process_col(col),
386            }
387        };
388        if arity != vars.len() {
389            return Err(QueryError::BadArity {
390                table: table_id,
391                expected: arity,
392                got: vars.len(),
393            });
394        }
395        let cs = Vec::from_iter(
396            cs.into_iter()
397                .cloned()
398                .chain(vars.iter().enumerate().filter_map(|(i, qe)| match qe {
399                    QueryEntry::Var(_) => None,
400                    QueryEntry::Const(c) => Some(Constraint::EqConst {
401                        col: ColumnId::from_usize(i),
402                        val: *c,
403                    }),
404                })),
405        );
406        cs.iter().try_fold((), |_, c| check_constraint(c))?;
407        let processed = self.rsb.db.process_constraints(table_id, &cs);
408        let mut atom = Atom {
409            table: table_id,
410            var_columns: Default::default(),
411            constraints: processed,
412        };
413        let next_atom = AtomId::from_usize(self.query.atoms.n_ids());
414        let mut subatoms = HashMap::<Variable, SubAtom>::default();
415        for (i, qe) in vars.iter().enumerate() {
416            let var = match qe {
417                QueryEntry::Var(var) => *var,
418                QueryEntry::Const(_) => {
419                    continue;
420                }
421            };
422            if var == Variable::placeholder() {
423                continue;
424            }
425            let col = ColumnId::from_usize(i);
426            if let Some(prev) = atom.var_columns.insert(var, col) {
427                atom.constraints.slow.push(Constraint::Eq {
428                    l_col: col,
429                    r_col: prev,
430                })
431            };
432            subatoms
433                .entry(var)
434                .or_insert_with(|| SubAtom::new(next_atom))
435                .vars
436                .push(col);
437        }
438        for (var, subatom) in subatoms {
439            self.query
440                .var_info
441                .get_mut(var)
442                .expect("all variables must be bound in current query")
443                .occurrences
444                .push(subatom);
445        }
446
447        // Add functional dependencies for this atom.
448        let get_var = |qe: &QueryEntry| match qe {
449            QueryEntry::Var(v) => Some(*v),
450            QueryEntry::Const(_) => None,
451        };
452        let antecedent = vars[..info.spec().n_keys]
453            .iter()
454            .filter_map(get_var)
455            .collect::<Vec<_>>();
456        let consequent = vars[info.spec().n_keys..]
457            .iter()
458            .filter_map(get_var)
459            .collect::<Vec<_>>();
460        self.query.fun_deps.add_dependency(antecedent, consequent);
461
462        Ok(self.query.atoms.push(atom))
463    }
464}
465
466#[derive(Debug, Error)]
467pub enum QueryError {
468    #[error("table {table:?} has {expected:?} keys but got {got:?}")]
469    KeyArityMismatch {
470        table: TableId,
471        expected: usize,
472        got: usize,
473    },
474    #[error("table {table:?} has {expected:?} columns but got {got:?}")]
475    TableArityMismatch {
476        table: TableId,
477        expected: usize,
478        got: usize,
479    },
480
481    #[error(
482        "counter used in column {column_id:?} of table {table:?}, which is declared as a base value"
483    )]
484    CounterUsedInBaseColumn {
485        table: TableId,
486        column_id: ColumnId,
487        base: BaseValueId,
488    },
489
490    #[error("attempt to compare two groups of values, one of length {l}, another of length {r}")]
491    MultiComparisonMismatch { l: usize, r: usize },
492
493    #[error("table {table:?} expected {expected:?} columns but got {got:?}")]
494    BadArity {
495        table: TableId,
496        expected: usize,
497        got: usize,
498    },
499
500    #[error("expected {expected:?} columns in schema but got {got:?}")]
501    InvalidSchema { expected: usize, got: usize },
502
503    #[error(
504        "constraint {constraint:?} on table {table:?} references column {column:?}, but the table has arity {arity:?}"
505    )]
506    InvalidConstraint {
507        constraint: Constraint,
508        column: usize,
509        table: TableId,
510        arity: usize,
511    },
512}
513
514/// Builder for the "action" portion of the rule.
515///
516/// Rules can refer to the variables bound in their query to modify the database.
517pub struct RuleBuilder<'outer, 'a> {
518    qb: QueryBuilder<'outer, 'a>,
519}
520
521impl RuleBuilder<'_, '_> {
522    fn table_info(&self, table: TableId) -> &TableInfo {
523        self.qb.rsb.db.get_table_info(table)
524    }
525
526    /// Build the finished query.
527    pub fn build(self) -> RuleId {
528        self.build_with_description("")
529    }
530
531    fn build_symbol_map(&self) -> SymbolMap {
532        let var_info = &self.qb.query.var_info;
533        SymbolMap {
534            atoms: self
535                .qb
536                .query
537                .atoms
538                .iter()
539                .filter_map(|(id, atom)| {
540                    let name = self.table_info(atom.table).name.clone();
541                    name.map(|name| (id, name))
542                })
543                .collect(),
544            vars: var_info
545                .iter()
546                .filter_map(|(id, info)| info.name.as_ref().map(|name| (id, name.clone())))
547                .collect(),
548        }
549    }
550
551    pub fn build_with_description(mut self, desc: impl Into<String>) -> RuleId {
552        let var_info = &self.qb.query.var_info;
553        let symbol_map = self.build_symbol_map();
554        // Generate an id for our actions and slot them in.
555        let used_vars = SmallVec::from_iter(var_info.iter().filter_map(|(v, info)| {
556            if info.used_in_rhs && !info.defined_in_rhs {
557                Some(v)
558            } else {
559                None
560            }
561        }));
562        let action_id = self.qb.rsb.rule_set.actions.push(ActionInfo {
563            instrs: Arc::new(self.qb.instrs),
564            used_vars,
565        });
566        self.qb.query.action = action_id;
567        // Plan the query
568        let plan = self.qb.rsb.db.plan_query(self.qb.query);
569        let desc: String = desc.into();
570        // Add it to the ruleset.
571        self.qb
572            .rsb
573            .rule_set
574            .plans
575            .push((plan, desc.into(), symbol_map))
576    }
577
578    /// Return a variable containing the result of reading the specified counter.
579    pub fn read_counter(&mut self, counter: CounterId) -> Variable {
580        let dst = self.qb.new_var();
581        self.qb.instrs.push(Instr::ReadCounter { counter, dst });
582        self.qb.mark_defined(&dst.into());
583        dst
584    }
585
586    /// Return a variable containing the result of looking up the specified
587    /// column from the row corresponding to given keys in the given
588    /// table.
589    ///
590    /// If the key does not currently have a mapping in the table, the values
591    /// specified by `default_vals` will be inserted.
592    pub fn lookup_or_insert(
593        &mut self,
594        table: TableId,
595        args: &[QueryEntry],
596        default_vals: &[WriteVal],
597        dst_col: ColumnId,
598    ) -> Result<Variable, QueryError> {
599        let table_info = self.table_info(table);
600        self.validate_keys(table, table_info, args)?;
601        self.validate_vals(table, table_info, default_vals.iter())?;
602        let res = self.qb.new_var();
603        self.qb.instrs.push(Instr::LookupOrInsertDefault {
604            table,
605            args: args.to_vec(),
606            default: default_vals.to_vec(),
607            dst_col,
608            dst_var: res,
609        });
610        self.qb.mark_used(args);
611        self.qb
612            .mark_used(default_vals.iter().filter_map(|x| match x {
613                WriteVal::QueryEntry(qe) => Some(qe),
614                WriteVal::IncCounter(_) | WriteVal::CurrentVal(_) => None,
615            }));
616        self.qb.mark_defined(&res.into());
617        Ok(res)
618    }
619
620    /// Return a variable containing the result of looking up the specified
621    /// column from the row corresponding to given keys in the given
622    /// table.
623    ///
624    /// If the key does not currently have a mapping in the table, the variable
625    /// takes the value of `default`.
626    pub fn lookup_with_default(
627        &mut self,
628        table: TableId,
629        args: &[QueryEntry],
630        default: QueryEntry,
631        dst_col: ColumnId,
632    ) -> Result<Variable, QueryError> {
633        let table_info = self.table_info(table);
634        self.validate_keys(table, table_info, args)?;
635        let res = self.qb.new_var();
636        self.qb.instrs.push(Instr::LookupWithDefault {
637            table,
638            args: args.to_vec(),
639            dst_col,
640            dst_var: res,
641            default,
642        });
643        self.qb.mark_used(args);
644        self.qb.mark_used(&[default]);
645        self.qb.mark_defined(&res.into());
646        Ok(res)
647    }
648
649    /// Return a variable containing the result of looking up the specified
650    /// column from the row corresponding to given keys in the given
651    /// table.
652    ///
653    /// If the key does not currently have a mapping in the table, execution of
654    /// the rule is halted.
655    pub fn lookup(
656        &mut self,
657        table: TableId,
658        args: &[QueryEntry],
659        dst_col: ColumnId,
660    ) -> Result<Variable, QueryError> {
661        let table_info = self.table_info(table);
662        self.validate_keys(table, table_info, args)?;
663        let res = self.qb.new_var();
664        self.qb.instrs.push(Instr::Lookup {
665            table,
666            args: args.to_vec(),
667            dst_col,
668            dst_var: res,
669        });
670        self.qb.mark_used(args);
671        self.qb.mark_defined(&res.into());
672        Ok(res)
673    }
674
675    /// Insert the specified values into the given table.
676    pub fn insert(&mut self, table: TableId, vals: &[QueryEntry]) -> Result<(), QueryError> {
677        let table_info = self.table_info(table);
678        self.validate_row(table, table_info, vals)?;
679        self.qb.instrs.push(Instr::Insert {
680            table,
681            vals: vals.to_vec(),
682        });
683        self.qb.mark_used(vals);
684        Ok(())
685    }
686
687    /// Insert the specified values into the given table if `l` and `r` are equal.
688    pub fn insert_if_eq(
689        &mut self,
690        table: TableId,
691        l: QueryEntry,
692        r: QueryEntry,
693        vals: &[QueryEntry],
694    ) -> Result<(), QueryError> {
695        let table_info = self.table_info(table);
696        self.validate_row(table, table_info, vals)?;
697        self.qb.instrs.push(Instr::InsertIfEq {
698            table,
699            l,
700            r,
701            vals: vals.to_vec(),
702        });
703        self.qb
704            .mark_used(vals.iter().chain(once(&l)).chain(once(&r)));
705        Ok(())
706    }
707
708    /// Remove the specified entry from the given table, if it is there.
709    pub fn remove(&mut self, table: TableId, args: &[QueryEntry]) -> Result<(), QueryError> {
710        let table_info = self.table_info(table);
711        self.validate_keys(table, table_info, args)?;
712        self.qb.instrs.push(Instr::Remove {
713            table,
714            args: args.to_vec(),
715        });
716        self.qb.mark_used(args);
717        Ok(())
718    }
719
720    /// Apply the given external function to the specified arguments.
721    pub fn call_external(
722        &mut self,
723        func: ExternalFunctionId,
724        args: &[QueryEntry],
725    ) -> Result<Variable, QueryError> {
726        let res = self.qb.new_var();
727        self.qb.instrs.push(Instr::External {
728            func,
729            args: args.to_vec(),
730            dst: res,
731        });
732        self.qb.mark_used(args);
733        self.qb.mark_defined(&res.into());
734        Ok(res)
735    }
736
737    /// Look up the given key in the given table. If the lookup fails, then call the given external
738    /// function with the given arguments. Bind the result to the returned variable. If the
739    /// external function returns None (and the lookup fails) then the execution of the rule halts.
740    pub fn lookup_with_fallback(
741        &mut self,
742        table: TableId,
743        key: &[QueryEntry],
744        dst_col: ColumnId,
745        func: ExternalFunctionId,
746        func_args: &[QueryEntry],
747    ) -> Result<Variable, QueryError> {
748        let table_info = self.table_info(table);
749        self.validate_keys(table, table_info, key)?;
750        let res = self.qb.new_var();
751        self.qb.instrs.push(Instr::LookupWithFallback {
752            table,
753            table_key: key.to_vec(),
754            func,
755            func_args: func_args.to_vec(),
756            dst_var: res,
757            dst_col,
758        });
759        self.qb.mark_used(key);
760        self.qb.mark_used(func_args);
761        self.qb.mark_defined(&res.into());
762        Ok(res)
763    }
764
765    pub fn call_external_with_fallback(
766        &mut self,
767        f1: ExternalFunctionId,
768        args1: &[QueryEntry],
769        f2: ExternalFunctionId,
770        args2: &[QueryEntry],
771    ) -> Result<Variable, QueryError> {
772        let res = self.qb.new_var();
773        self.qb.instrs.push(Instr::ExternalWithFallback {
774            f1,
775            args1: args1.to_vec(),
776            f2,
777            args2: args2.to_vec(),
778            dst: res,
779        });
780        self.qb.mark_used(args1);
781        self.qb.mark_used(args2);
782        self.qb.mark_defined(&res.into());
783        Ok(res)
784    }
785
786    /// Continue execution iff the two arguments are equal.
787    pub fn assert_eq(&mut self, l: QueryEntry, r: QueryEntry) {
788        self.qb.instrs.push(Instr::AssertEq(l, r));
789        self.qb.mark_used(&[l, r]);
790    }
791
792    /// Continue execution iff the two arguments are not equal.
793    pub fn assert_ne(&mut self, l: QueryEntry, r: QueryEntry) -> Result<(), QueryError> {
794        self.qb.instrs.push(Instr::AssertNe(l, r));
795        self.qb.mark_used(&[l, r]);
796        Ok(())
797    }
798
799    /// Continue execution iff there is some `i` such that `l[i] != r[i]`.
800    ///
801    /// This is useful when doing egglog-style rebuilding.
802    pub fn assert_any_ne(&mut self, l: &[QueryEntry], r: &[QueryEntry]) -> Result<(), QueryError> {
803        if l.len() != r.len() {
804            return Err(QueryError::MultiComparisonMismatch {
805                l: l.len(),
806                r: r.len(),
807            });
808        }
809
810        let mut ops = Vec::with_capacity(l.len() + r.len());
811        ops.extend_from_slice(l);
812        ops.extend_from_slice(r);
813        self.qb.instrs.push(Instr::AssertAnyNe {
814            ops,
815            divider: l.len(),
816        });
817        self.qb.mark_used(l);
818        self.qb.mark_used(r);
819        Ok(())
820    }
821
822    fn validate_row(
823        &self,
824        table: TableId,
825        info: &TableInfo,
826        vals: &[QueryEntry],
827    ) -> Result<(), QueryError> {
828        if vals.len() != info.spec.arity() {
829            Err(QueryError::TableArityMismatch {
830                table,
831                expected: info.spec.arity(),
832                got: vals.len(),
833            })
834        } else {
835            Ok(())
836        }
837    }
838
839    fn validate_keys(
840        &self,
841        table: TableId,
842        info: &TableInfo,
843        keys: &[QueryEntry],
844    ) -> Result<(), QueryError> {
845        if keys.len() != info.spec.n_keys {
846            Err(QueryError::KeyArityMismatch {
847                table,
848                expected: info.spec.n_keys,
849                got: keys.len(),
850            })
851        } else {
852            Ok(())
853        }
854    }
855
856    fn validate_vals<'b>(
857        &self,
858        table: TableId,
859        info: &TableInfo,
860        vals: impl Iterator<Item = &'b WriteVal>,
861    ) -> Result<(), QueryError> {
862        for (i, _) in vals.enumerate() {
863            let col = i + info.spec.n_keys;
864            if col >= info.spec.arity() {
865                return Err(QueryError::TableArityMismatch {
866                    table,
867                    expected: info.spec.arity(),
868                    got: col,
869                });
870            }
871        }
872        Ok(())
873    }
874}
875
876#[derive(Debug, Clone)]
877pub(crate) struct Atom {
878    pub(crate) table: TableId,
879    pub(crate) var_columns: VarColumnMap,
880    /// These constraints are an initial take at processing "fast" constraints as well as a
881    /// potential list of "slow" constraints.
882    ///
883    /// Fast constraints get re-computed when queries are executed. In particular, this makes it
884    /// possible to cache plans and add new fast constraints to them without re-planning.
885    pub(crate) constraints: ProcessedConstraints,
886}
887
888impl Atom {
889    pub(crate) fn vars(&self) -> impl Iterator<Item = Variable> + '_ {
890        self.var_columns.vars()
891    }
892
893    pub(crate) fn get_var(&self, col: ColumnId) -> Option<Variable> {
894        self.var_columns.get_var(col)
895    }
896
897    pub(crate) fn get_col(&self, var: Variable) -> Option<ColumnId> {
898        self.var_columns.get_col(var)
899    }
900}
901
902#[derive(Clone, Default)]
903pub(crate) struct VarColumnMap {
904    var_to_column: DenseIdMap<Variable, ColumnId>,
905    column_to_var: DenseIdMap<ColumnId, Variable>,
906}
907
908impl std::fmt::Debug for VarColumnMap {
909    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
910        let mut entries: Vec<_> = self.column_to_var.iter().collect();
911        entries.sort_by_key(|(col, _)| col.index());
912
913        f.write_str("VarColumnMap(")?;
914        for (i, (col, var)) in entries.iter().enumerate() {
915            if i > 0 {
916                f.write_str(", ")?;
917            }
918            write!(f, "{col:?} -> {var:?}")?;
919        }
920        f.write_str(")")
921    }
922}
923
924impl VarColumnMap {
925    pub(crate) fn insert(&mut self, var: Variable, col: ColumnId) -> Option<ColumnId> {
926        let prev = self.var_to_column.insert(var, col);
927        self.column_to_var.insert(col, var);
928        prev
929    }
930
931    pub(crate) fn get_col(&self, var: Variable) -> Option<ColumnId> {
932        self.var_to_column.get(var).copied()
933    }
934
935    pub(crate) fn get_var(&self, col: ColumnId) -> Option<Variable> {
936        self.column_to_var.get(col).copied()
937    }
938
939    pub(crate) fn iter(&self) -> impl Iterator<Item = (ColumnId, Variable)> + '_ {
940        self.column_to_var.iter().map(|(col, var)| (col, *var))
941    }
942
943    pub(crate) fn vars(&self) -> impl Iterator<Item = Variable> + '_ {
944        self.iter().map(|(_, var)| var)
945    }
946
947    pub(crate) fn is_empty(&self) -> bool {
948        self.var_to_column.len() == 0
949    }
950}
951
952/// A functional dependency inferencer.
953///
954/// A functional dependency (x, y, ...) -> (u, v, ...) means that if we know
955/// the values of x, y, ..., then we can determine u, v, ...
956///
957/// This data structure can compute the closure of a set of variables under
958/// a set of functional dependencies.
959#[derive(Clone, Default)]
960pub(crate) struct FunDeps {
961    /// List of functional dependencies (antecedent -> consequent)
962    dependencies: Vec<(Vec<Variable>, Vec<Variable>)>,
963}
964
965impl FunDeps {
966    /// Add a functional dependency: antecedent -> consequent.
967    pub fn add_dependency(&mut self, antecedent: Vec<Variable>, consequent: Vec<Variable>) {
968        // Don't add trivial dependencies.
969        if !antecedent.is_empty() {
970            self.dependencies.push((antecedent, consequent));
971        }
972    }
973
974    /// Returns all variables that can be determined from the input variables
975    /// using the functional dependencies.
976    pub fn closure(
977        &self,
978        variables: impl IntoIterator<Item = Variable>,
979    ) -> DenseIdMap<Variable, ()> {
980        let mut result: DenseIdMap<Variable, ()> =
981            DenseIdMap::from_iter(variables.into_iter().map(|v| (v, ())));
982        let mut changed = true;
983
984        while changed {
985            changed = false;
986            for (antecedent, consequent) in &self.dependencies {
987                // If all variables in the antecedent are in the result,
988                // add all variables in the consequent.
989                if antecedent.iter().all(|v| result.contains_key(*v)) {
990                    for v in consequent {
991                        if !result.contains_key(*v) {
992                            result.insert(*v, ());
993                            changed = true;
994                        }
995                    }
996                }
997            }
998        }
999
1000        result
1001    }
1002}
1003
1004impl std::fmt::Debug for FunDeps {
1005    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1006        use std::fmt::Write;
1007
1008        let mut deps = String::new();
1009
1010        for (i, (ant, cons)) in self.dependencies.iter().enumerate() {
1011            if i > 0 {
1012                deps.push_str("; ");
1013            }
1014
1015            deps.push('{');
1016            for (j, v) in ant.iter().enumerate() {
1017                if j > 0 {
1018                    deps.push_str(", ");
1019                }
1020                write!(&mut deps, "{v:?}")?;
1021            }
1022            deps.push('}');
1023
1024            deps.push_str(" -> ");
1025
1026            deps.push('{');
1027            for (j, v) in cons.iter().enumerate() {
1028                if j > 0 {
1029                    deps.push_str(", ");
1030                }
1031                write!(&mut deps, "{v:?}")?;
1032            }
1033            deps.push('}');
1034        }
1035
1036        write!(f, "FunDeps {{ {deps} }}")
1037    }
1038}
1039
1040pub(crate) struct Query {
1041    pub(crate) var_info: DenseIdMap<Variable, VarInfo>,
1042    pub(crate) atoms: DenseIdMap<AtomId, Atom>,
1043    pub(crate) action: ActionId,
1044    pub(crate) plan_strategy: PlanStrategy,
1045    pub(crate) fun_deps: FunDeps,
1046    /// If `true`, skip tree-decomposition during query planning and
1047    /// always use the single-bag fast path in
1048    /// [`crate::free_join::plan::tree_decompose_and_plan`]. Set via
1049    /// [`QueryBuilder::set_no_decomp`].
1050    pub(crate) no_decomp: bool,
1051}