egglog_core_relations/free_join/
plan.rs

1//! This module defines query optimization for egglog. The main entry point is `plan_query`, which takes a `Query` and produces a `Plan`.
2//!
3//! At a high level, the query planner has two phases: **(hyper)tree decomposition** and **join planning for each bag**.
4//! Both phases are very subtle, and heuristics are heavily used for good performance.
5//!
6//! # (Hyper)tree Decomposition
7//!
8//! A conjunctive query can be viewed as a hypergraph where variables are vertices and atoms (relations) are hyperedges.
9//! The idea of tree decomposition is to break this hypergraph into a tree of overlapping subqueries called *bags*,
10//! each of which is cheaper to evaluate independently. This is the classical idea behind tree decomposition and the
11//! Yannakakis algorithm.
12//!
13//! The decomposition proceeds via *variable elimination*: we iteratively pick a variable `v` and eliminate the neighborhood
14//! `N(v)` (which also includes `v`) from the hypergraph, and add back a hyperedge consisting of `N(v) - {v}`, until
15//! there are no variables left. Each elimination step gives us a bag. A min-fill heuristic
16//! (`next_var_to_eliminate`) guides the order of elimination to keep bags small. After all variables are eliminated,
17//! redundant bags are pruned: bags subsumed by another (all their variables are covered) are merged, and "ears"
18//! are merged into their parent.
19//!
20//! We then topologically sort the bags and decide which variables are "message variables" and which are private to the bag.
21//! The materialized result of each bag has its output keyed on the *message variables* it shares with
22//! its parent, and the parent uses that materialization to prune its own search space.
23//!
24//! When the query hypergraph is a single connected component with no beneficial decomposition, the planner falls back to
25//! a `SinglePlan` with no materialization steps.
26//!
27//! # Join Planning for a Single Bag
28//!
29//! Once each bag (subquery) is isolated, the planner generates a sequence of `JoinStage` instructions that enumerate
30//! all satisfying tuples for that bag. Two heuristics are supported:
31//!
32//! - **Generic Join** (`PlanStrategy::Gj`): The classic worst-case optimal join algorithm. Each stage picks one variable
33//!   and intersects the columns of atoms that correspond to this variable (`JoinStage::Intersect`).
34//!
35//! - **Free Join** (`PlanStrategy::PureSize` / `PlanStrategy::MinCover`): From Remy's paper. The planning algorithm
36//!   does the following: Each stage it selects a *cover* — a (sub)atom whose columns span the variables being bound in that step — and
37//!   uses it to probe all other atoms that share those variables (`JoinStage::FusedIntersect`). When the cover is an
38//!   entire atom and there is only one relation to probe, this degenerates to a hash join; when covers are single-column
39//!   scans it ~ recovers generic join*.
40//!
41//!   *: although this is not worst-case optimal because it does not necessarily picks the smallest side to scan.
42//!
43//! Both strategies produce a flat list of `JoinStage` instructions that are fused where possible (`JoinStage::fuse`) to
44//! reduce the number of passes over the data. A `JoinHeader` is prepended to each plan to apply constant constraints and
45//! pre-filter the driving relation before the main join loop begins.
46//!
47use std::{collections::BTreeMap, iter, mem, sync::Arc};
48
49use crate::{
50    TableId,
51    free_join::{ColUniqueness, ColumnCardEst, ProcessedConstraints},
52    numeric_id::{DenseIdMap, NumericId},
53    query::{FunDeps, SymbolMap},
54};
55use egglog_numeric_id::define_id;
56use fixedbitset::FixedBitSet;
57use smallvec::{SmallVec, smallvec};
58
59use crate::{
60    common::{HashMap, HashSet, IndexSet},
61    offsets::Subset,
62    pool::Pooled,
63    query::{Atom, Query, VarColumnMap},
64    table_spec::Constraint,
65};
66
67use super::{ActionId, AtomId, ColumnId, SubAtom, VarInfo, Variable};
68
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub(crate) struct ScanSpec {
71    pub to_index: SubAtom,
72    // Only yield rows where the given constraints match.
73    pub constraints: Vec<Constraint>,
74}
75
76#[derive(Clone, Debug, PartialEq, Eq)]
77pub(crate) struct SingleScanSpec {
78    pub atom: AtomId,
79    pub column: ColumnId,
80    pub cs: Vec<Constraint>,
81}
82
83define_id!(pub(crate) MatId, u32, "An identifier for materialization within a decomposed plan.");
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub(crate) enum MatScanMode {
87    Full,
88    KeyOnly,
89    Value(SmallVec<[Variable; 16]>),
90    Lookup(SmallVec<[Variable; 16]>),
91}
92
93/// Join headers evaluate constraints on a single atom; they prune the search space before the rest
94/// of the join plan is executed.
95pub(crate) struct JoinHeader {
96    pub atom: AtomId,
97    /// We currently aren't using these at all. The plan is to use this to
98    /// dedup plan stages later (it also helps for debugging).
99    #[allow(unused)]
100    pub constraints: Pooled<Vec<Constraint>>,
101    /// A pre-computed table subset that we can use to filter the table,
102    /// given these constaints.
103    ///
104    /// Why use the constraints at all? Because we want to use them to
105    /// discover common plan nodes from different queries (subsets can be
106    /// large).
107    pub subset: Subset,
108}
109
110impl std::fmt::Debug for JoinHeader {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("JoinHeader")
113            .field("atom", &self.atom)
114            .field("constraints", &self.constraints)
115            .field(
116                "subset",
117                &format_args!("Subset(size={})", self.subset.size()),
118            )
119            .finish()
120    }
121}
122
123impl Clone for JoinHeader {
124    fn clone(&self) -> Self {
125        JoinHeader {
126            atom: self.atom,
127            constraints: Pooled::cloned(&self.constraints),
128            subset: self.subset.clone(),
129        }
130    }
131}
132
133#[derive(Debug, Clone)]
134pub(crate) enum JoinStage {
135    /// `Intersect` takes a variable and intersects a set of atoms
136    /// on that variable.
137    /// This corresponds to the classic generic join algorithm.
138    Intersect {
139        var: Variable,
140        scans: SmallVec<[SingleScanSpec; 3]>,
141    },
142    /// `FusedIntersect` takes a "cover" (sub)atom and use it to probe other (sub)atoms.
143    /// This corresponds to the free join algorithm, or when to_intersect.len() == 1 and cover is
144    /// the entire atom, a hash join.
145    FusedIntersect {
146        cover: ScanSpec,
147        bind: SmallVec<[(ColumnId, Variable); 2]>,
148        // to_intersect.1 is the index into the cover atom.
149        to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)>,
150    },
151    FusedIntersectMat {
152        cover: MatId,
153        mode: MatScanMode,
154        bind: SmallVec<[(ColumnId, Variable); 2]>,
155        to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)>,
156    },
157}
158
159/// Merge every `FusedIntersect { to_intersect: [] }` into the first earlier such stage on the
160/// same cover atom, so each atom contributes at most one single-scan stage. Same-atom no-
161/// `to_intersect` covers can always be merged: their projections share an atom and any
162/// per-atom constraints are produced exactly once by `take_atom_constraints_if_new`.
163fn fuse_single_scans(stages: &mut Vec<JoinStage>) {
164    let mut i = 0;
165    while i < stages.len() {
166        let cur_atom = match &stages[i] {
167            JoinStage::FusedIntersect {
168                cover,
169                to_intersect,
170                ..
171            } if to_intersect.is_empty() => cover.to_index.atom,
172            _ => {
173                i += 1;
174                continue;
175            }
176        };
177        let target = (0..i).find(|&j| {
178            matches!(
179                &stages[j],
180                JoinStage::FusedIntersect { cover, to_intersect, .. }
181                    if to_intersect.is_empty() && cover.to_index.atom == cur_atom
182            )
183        });
184        let Some(j) = target else {
185            i += 1;
186            continue;
187        };
188        let JoinStage::FusedIntersect {
189            cover: cover_i,
190            bind: bind_i,
191            ..
192        } = stages.remove(i)
193        else {
194            unreachable!("checked above")
195        };
196        let JoinStage::FusedIntersect {
197            cover: cover_j,
198            bind: bind_j,
199            ..
200        } = &mut stages[j]
201        else {
202            unreachable!("checked above")
203        };
204        cover_j.to_index.vars.extend(cover_i.to_index.vars);
205        cover_j.constraints.extend(cover_i.constraints);
206        bind_j.extend(bind_i);
207        // Don't advance `i`: stages.remove shifts later entries down by one.
208    }
209}
210
211#[derive(Debug, Clone)]
212pub(crate) enum Plan {
213    SinglePlan(SinglePlan),
214    DecomposedPlan(DecomposedPlan),
215}
216impl Plan {
217    pub fn actions(&self) -> ActionId {
218        match self {
219            Plan::SinglePlan(p) => p.actions,
220            Plan::DecomposedPlan(p) => p.actions,
221        }
222    }
223
224    pub fn atoms(&self) -> Arc<DenseIdMap<AtomId, Atom>> {
225        match self {
226            Plan::SinglePlan(p) => p.atoms.clone(),
227            Plan::DecomposedPlan(p) => p.atoms.clone(),
228        }
229    }
230
231    pub(crate) fn to_report(&self, _symbol_map: &SymbolMap) -> egglog_reports::Plan {
232        match self {
233            Plan::SinglePlan(p) => p.to_report(_symbol_map),
234            Plan::DecomposedPlan(_) => {
235                todo!()
236            }
237        }
238    }
239
240    pub(crate) fn header(&self) -> &[JoinHeader] {
241        match self {
242            Plan::SinglePlan(p) => &p.header,
243            Plan::DecomposedPlan(p) => &p.header,
244        }
245    }
246}
247
248#[derive(Debug, Clone)]
249pub(crate) struct SinglePlan {
250    pub atoms: Arc<DenseIdMap<AtomId, Atom>>,
251    pub header: Vec<JoinHeader>,
252    pub stages: JoinStages,
253    pub actions: ActionId,
254}
255
256#[derive(Debug, Clone)]
257pub(crate) struct JoinStages {
258    pub instrs: Arc<Vec<JoinStage>>,
259}
260
261/// Specification of the materialization of the intermediate results, as required by tree decomposition.
262/// A materialization has two parts. The message variables are variables that are passed to and joined with later stages,
263/// and the value/private variables are variables that only occur in the current (and maybe previous) bags.
264///
265/// A materialization thus looks like a map from values of the message variables to sets of values of the private variables,
266/// and when we evaluate other bags, only the keys (message variables) are looked up or enumerated. This is because
267/// the private variables are not relevant to the evaluation of other bags. A key idea of tree decomposition is to separate
268/// independent parts of a query and make sure they are evaluated independently.
269#[derive(Debug, Clone)]
270pub(crate) struct MatSpec {
271    // Variables that are used by later stages
272    pub msg_vars: SmallVec<[Variable; 16]>,
273    // Variables that are not used by later stages.
274    pub val_vars: SmallVec<[Variable; 16]>,
275}
276
277#[derive(Debug, Clone)]
278pub(crate) struct JoinStageBlocks {
279    // each block is a list of instructions and how to yield
280    pub blocks: Vec<(JoinStages, MatSpec)>,
281}
282
283#[derive(Debug, Clone)]
284pub(crate) struct DecomposedPlan {
285    pub atoms: Arc<DenseIdMap<AtomId, Atom>>,
286    pub header: Vec<JoinHeader>,
287    pub stages: JoinStageBlocks,
288    pub result_block: JoinStages,
289    pub actions: ActionId,
290}
291
292impl SinglePlan {
293    pub(crate) fn to_report(&self, symbol_map: &SymbolMap) -> egglog_reports::Plan {
294        use egglog_reports::{
295            Plan as ReportPlan, Scan as ReportScan, SingleScan as ReportSingleScan,
296            Stage as ReportStage,
297        };
298        const INTERNAL_PREFIX: &str = "@";
299        let get_var = |var: Variable| {
300            symbol_map
301                .vars
302                .get(&var)
303                .map(|s| s.to_string())
304                .unwrap_or_else(|| format!("{INTERNAL_PREFIX}x{var:?}"))
305        };
306        let get_atom = |atom: AtomId| {
307            symbol_map
308                .atoms
309                .get(&atom)
310                .map(|s| s.to_string())
311                .unwrap_or_else(|| format!("{INTERNAL_PREFIX}R{atom:?}"))
312        };
313        let mut stages = Vec::new();
314        for (i, stage) in self.stages.instrs.iter().enumerate() {
315            let report_stage = match stage {
316                JoinStage::Intersect { var, scans } => {
317                    let var_name = get_var(*var);
318                    let report_scans = scans
319                        .iter()
320                        .map(|scan| {
321                            let atom_name = get_atom(scan.atom);
322                            ReportSingleScan(
323                                atom_name,
324                                (var_name.clone(), scan.column.index() as i64),
325                            )
326                        })
327                        .collect();
328                    ReportStage::Intersect {
329                        scans: report_scans,
330                    }
331                }
332                JoinStage::FusedIntersect {
333                    cover,
334                    bind: _,
335                    to_intersect,
336                } => {
337                    let cover_atom_name = get_atom(cover.to_index.atom);
338                    let cover_cols: Vec<(String, i64)> = cover
339                        .to_index
340                        .vars
341                        .iter()
342                        .map(|col| {
343                            let var_name =
344                                get_var(self.atoms[cover.to_index.atom].get_var(*col).unwrap());
345                            (var_name, col.index() as i64)
346                        })
347                        .collect();
348                    let report_cover = ReportScan(cover_atom_name, cover_cols);
349                    let report_to_intersect = to_intersect
350                        .iter()
351                        .map(|(scan, key_spec)| {
352                            let atom_name = get_atom(scan.to_index.atom);
353                            let cols: Vec<(String, i64)> = key_spec
354                                .iter()
355                                .map(|col| {
356                                    let var_name = get_var(
357                                        self.atoms[scan.to_index.atom].get_var(*col).unwrap(),
358                                    );
359                                    (var_name, col.index() as i64)
360                                })
361                                .collect();
362                            ReportScan(atom_name, cols)
363                        })
364                        .collect();
365                    ReportStage::FusedIntersect {
366                        cover: report_cover,
367                        to_intersect: report_to_intersect,
368                    }
369                }
370                JoinStage::FusedIntersectMat {
371                    cover: _,
372                    mode: _,
373                    bind: _,
374                    to_intersect: _,
375                } => {
376                    todo!("materialization")
377                }
378            };
379            let next = if i == self.stages.instrs.len() - 1 {
380                vec![]
381            } else {
382                vec![i + 1]
383            };
384            stages.push((report_stage, None, next));
385        }
386        ReportPlan { stages }
387    }
388}
389
390/// The algorithm used to produce a join plan.
391#[derive(Default, Copy, Clone)]
392pub enum PlanStrategy {
393    /// Free Join: Iteratively pick the smallest atom as the cover for the next
394    /// stage, until all subatoms have been visited.
395    PureSize,
396
397    /// Free Join: Pick an approximate minimal set of covers, then order those
398    /// covers in increasing order of size.
399    ///
400    /// This is similar to PureSize but we first limit the potential atoms that
401    /// can act as covers so as to minimize the total number of stages in the
402    /// plan. This is only an approximate minimum: the problem of finding the
403    /// exact minimum ("set cover") is NP-hard.
404    MinCover,
405
406    /// Generate a plan for the classic Generic Join algorithm, constraining a
407    /// single variable per stage.
408    #[default]
409    Gj,
410}
411
412/// Pick the next variable to eliminate and computes its neighborhood.
413///
414/// Each time, we pick a variable that has the least number of occurrences and find its neighborhood* (i.e.,
415/// the set of variables that share an atom with it). We pick the neighborhood based on the "min-fill" heuristic,
416/// which tries to eliminate neighborhood that would introduce the least number of new hyperedges.
417/// A hyperedge is introduced during variable elimination if two variables that don't share an atom before are in the same neighborhood.
418///
419/// *: We find the closure of the neighborhood under functional dependencies, since these variables are "for free".
420fn next_var_to_eliminate(
421    vars: &DenseIdMap<Variable, VarInfo>,
422    atoms: &DenseIdMap<AtomId, Atom>,
423    fun_deps: &FunDeps,
424    col_est: &ColumnCardEst<'_>,
425) -> Option<IndexSet<Variable>> {
426    let (_var, subquery_vars) = vars
427        .iter()
428        .map(|(var, vinfo)| {
429            let subquery_vars = atoms
430                .iter()
431                // every atom that contains this variable
432                .filter(|(_, atom)| atom.get_col(var).is_some())
433                // every variable of those atoms
434                .flat_map(|(_, atom)| atom.vars());
435
436            // Optimization: use functional dependencies to find all variables inferred by the
437            // current neightborhood.
438            // let subquery_vars = fun_deps.closure(subquery_vars);
439            let subquery_vars: DenseIdMap<_, ()> = subquery_vars.map(|v| (v, ())).collect();
440
441            let occ = atoms
442                .iter()
443                .filter(|(_, atom)| atom.vars().any(|v| subquery_vars.contains_key(v)))
444                .count();
445            let size_estimation = vinfo
446                .occurrences
447                .iter()
448                .filter_map(|occ| {
449                    let atom = &atoms[occ.atom];
450                    let table = atom.table;
451                    if table.is_dummy() {
452                        return None;
453                    }
454                    let col = atom.get_col(var).unwrap();
455                    // TODO: plan header before query decomposition so we know the exact
456                    // subset we are handling
457                    Some(col_est.col_uniqueness(table, col))
458                })
459                .fold(ColUniqueness::default(), |a, b| a.join(&b));
460            ((occ, size_estimation), var, subquery_vars)
461            // (occ, var, subquery_vars)
462        })
463        .min_by_key(|a| a.0)
464        .map(|a| (a.1, a.2))?;
465    Some(IndexSet::from_iter(
466        fun_deps
467            .closure(subquery_vars.iter().map(|(var, _)| var))
468            .into_iter()
469            .map(|(var, _)| var),
470    ))
471}
472
473/// It updates the hypergraph with the given bag of variables by:
474/// 1. Remove atoms that only contain variables in the bag and remove those atoms from variable's occurrences,
475/// 2. Add a covering hyperedge that contains every non-private variable.
476fn update_hypergraph(
477    subquery_vars: &IndexSet<Variable>,
478    vars: &mut DenseIdMap<Variable, VarInfo>,
479    atoms: &mut DenseIdMap<AtomId, Atom>,
480) {
481    // Build the covering hyperedge before we remove from the hypergraph
482
483    // Find variables that occur not just in the subquery
484    let covering_vars: Vec<_> = subquery_vars
485        .iter()
486        .copied()
487        .filter(|&var| {
488            vars.contains_key(var)
489                && vars[var].occurrences.iter().any(|occ| {
490                    atoms[occ.atom]
491                        .vars()
492                        .any(|ov| !subquery_vars.contains(&ov))
493                })
494        })
495        .collect();
496
497    // Remove atoms from the hypergraph
498    let mut removed = Vec::new();
499    atoms.retain(|atom_id, atom| {
500        if atom.vars().all(|var| subquery_vars.contains(&var)) {
501            removed.push(atom_id);
502            false
503        } else {
504            true
505        }
506    });
507
508    // Update occurrences to reflect removed atoms
509    for &subq_var in subquery_vars.iter() {
510        if vars.contains_key(subq_var) {
511            vars[subq_var]
512                .occurrences
513                .retain(|occ| !removed.contains(&occ.atom));
514
515            if vars[subq_var].occurrences.is_empty() {
516                vars.unwrap_val(subq_var);
517            }
518        }
519    }
520
521    // Add the covering atom to the hypergraph
522    let mut var_columns = VarColumnMap::default();
523    for (ix, var) in covering_vars.iter().enumerate() {
524        var_columns.insert(*var, ColumnId::from_usize(ix));
525    }
526    let fake_atom_id = atoms.push(Atom {
527        var_columns,
528        constraints: ProcessedConstraints::dummy(),
529        table: TableId::dummy(),
530    });
531
532    // Update variable occurrences to include the covering atom
533    for (i, &covering_var) in covering_vars.iter().enumerate() {
534        vars[covering_var].occurrences.push(SubAtom {
535            atom: fake_atom_id,
536            vars: smallvec![ColumnId::from_usize(i)],
537        });
538    }
539}
540
541/// This function does tree decomposition. At a high level, it takes a bag (equivalently, a `PlanningContext`, a subquery, a hypergraph,
542/// or a set of variables + atoms), and returns a list of bags that forms a tree decomposition.
543///
544/// Recall that a bag is equivalent to a hypergraph, where vertices = variables and hyperedges = atoms.
545///
546/// The algorithm is based on the classical variable elimination, where it iteratively removes neighborhoods until no variables are left.
547/// More specifically, it iteratively
548///
549/// 1. Select a variable `v` and its neighborhood `N(v)`, based on the "min-fill" heuristic. (`next_var_to_eliminate`)
550/// 2. Remove the neighborhood from the working hypergraph. (`update_hypergraph`)
551/// 3. Add a covering atom that contains variables `N(v) - {v}` to the working hypergraph. (`update_hypergraph`)
552/// 4. Step 1-3 gives us a set of variables `N(v)`. We need to construct a subquery from it. This step is a bit subtle.
553///
554///    For example, consider the rectangle query `R(x, y), S(y, z), T(z, w), U(w, x)`. Let's say we pick variable `x` to eliminate.
555///    The neighborhood `N(x)` of `x` is {x, y, z}. A naive approach is to subquery would be `R(x, y), S(y, z)`, but this query can have size quadratic,
556///    even when the final output size is small. The issue here is `x` and `z` are not fully constrained in this subquery.
557///    Another approach is to include every atom that contains variables in `N(x)`, but this gives us the entire query as the subquery for that rectangle query,
558///    which is also not ideal because the rectangle query should be broken into two bags.
559///
560///    The solution is to include every atom that contains variables in `N(x)`, but only keep the variables in `N(x)` in those atoms. For the rectangle example,
561///    this would be `R(x, y), S(y, z), T(z, -), U(-, x)`, where `-` means we don't expand this variable during evaluation. As a result, the produced PlanningContext
562///    may have atoms whose variables are not in `PlanningContext::vars`. The query planner for a single bag handles this correctly.
563///
564/// Now we have collected a list of bags, but they are very redundant. (Remember the variable elimination loop is run |vars| steps, because each iteration eliminates
565/// only one variable.) We need to prune these bags. See the comments in the code for details.
566///
567/// Another invariant we maintain is higher-indexed bags are heavier (closer to the root of the tree decomposition), so they will be evaluated later and constrained
568/// by evaluation of earlier bags.
569fn decompose_into_bags<'a>(original_ctx: &PlanningContext<'a>) -> Vec<PlanningContext<'a>> {
570    let mut atoms = original_ctx.atoms.clone();
571    let mut vars = original_ctx.vars.clone();
572
573    // Prune variables with no occurrences
574    for (var, vinfo) in original_ctx.vars.iter() {
575        if vinfo.occurrences.is_empty() {
576            vars.take(var).unwrap();
577        }
578    }
579
580    let mut bags = Vec::new();
581
582    // Variable elimination loop
583    while let Some(subquery_vars) =
584        next_var_to_eliminate(&vars, &atoms, &original_ctx.fun_deps, &original_ctx.col_est)
585    {
586        // Create a fake covering atom to bridge back to the main query
587        // Remove hyperedges that only contain subquery variables.
588        update_hypergraph(&subquery_vars, &mut vars, &mut atoms);
589
590        // Collect atoms that only contain subquery variables.
591        let subquery_atoms: DenseIdMap<AtomId, Atom> = original_ctx
592            .atoms
593            .iter()
594            .filter(|(_, atom)| atom.vars().any(|var| subquery_vars.contains(&var)))
595            .map(|(atom_id, atom)| (atom_id, atom.clone()))
596            .collect();
597
598        let subquery_var_map = DenseIdMap::from_iter(subquery_vars.iter().map(|var| {
599            let mut var_info = original_ctx.vars[*var].clone();
600            // NB: used_in_rhs is handled in [`plan_single_bag`]
601            var_info
602                .occurrences
603                .retain(|occ| subquery_atoms.contains_key(occ.atom));
604            (*var, var_info)
605        }));
606
607        bags.push(PlanningContext {
608            vars: subquery_var_map,
609            atoms: subquery_atoms,
610            fun_deps: original_ctx.fun_deps.clone(),
611            col_est: original_ctx.col_est.clone(),
612        });
613    }
614
615    assert!(
616        !atoms.iter().any(|(_, atom_info)| {
617            !atom_info.table.is_dummy() && !atom_info.var_columns.is_empty()
618        }),
619        "All atoms should be put into bags"
620    );
621
622    // Iteratively prune the query
623    let mut changed = true;
624    while changed {
625        changed = false;
626        // Pruning 1: Remove bags that are subsumed by others. A bag is subsumed by another bag if all of its variables are contained in the other bag,
627        // so the output of this bag must be a subset of the bigger bag.
628        let mut pruned_bags: Vec<PlanningContext> = Vec::with_capacity(bags.len());
629        for mut bag1 in bags.into_iter() {
630            pruned_bags.retain_mut(|bag2| {
631                let leq = bag1.is_subsumed_by(bag2);
632                let geq = bag2.is_subsumed_by(&bag1);
633                if leq || geq {
634                    bag1.merge_bag(bag2);
635                    changed = true;
636                    false
637                } else {
638                    true
639                }
640            });
641            pruned_bags.push(bag1);
642        }
643
644        // Pruning 2: Find "ears" and merge them with other bags. A bag is an ear if one of its atoms covers all of its variables, i.e., it only has one useful
645        // relation. We can safely remove an ear if it shares variables with only one bag - in this case, that bag is necessarily the parent in the tree decomposition.
646        //
647        // Why removing ears? Let's say an ear has the form R(x, y, z) with message variable {x}. The evaluation of its parent will already intersect on `x` with `R(x, y, z)`,
648        // so if `y` and `z` are expanded at the innermost loop of the evaluation, this does not incur any overhead. Versus if we keep this ear as a separate bag,
649        // we would need to first build a map x -> (y, z) only to enumerate each x to get the corresponding (y, z) values.
650        bags = pruned_bags;
651        let is_ear = |bag: &PlanningContext| {
652            bag.atoms.iter().any(|(_atom_id, atom)| {
653                let all_vars = original_ctx.fun_deps.closure(atom.vars());
654                bag.is_subsumed_by_vars(&all_vars)
655            })
656            // HACK: this weird condition says if there's exactly one atom whose variables are all wanted, then we can also treat it as an ear,
657            // because other atoms in the bag are likely added only to constrain the bag. This is approximately what a bag is, but not really.
658            // However, removing this condition makes some benchmark much worse...
659            || bag
660                .atoms
661                .iter()
662                .filter(|(_atom_id, atom)| bag.has_vars(atom.vars()))
663                .count()
664                == 1
665        };
666
667        let mut i = 0;
668        while i < bags.len() {
669            if !is_ear(&bags[i]) {
670                i += 1;
671                continue;
672            }
673
674            // Find the bag that shares the most variables with this ear bag, and merge the ear bag into it.
675            let parent = bags
676                .iter()
677                .enumerate()
678                .rev()
679                .filter(|(j, _)| *j != i)
680                .map(|(j, b)| (j, b.common_vars_with(&bags[i]).count()))
681                .collect::<Vec<_>>();
682
683            let j = parent.into_iter().max_by_key(|(_, count)| *count);
684            if j.is_none() || j.unwrap().1 == 0 {
685                i += 1;
686                continue;
687            }
688            let j = j.unwrap().0;
689
690            // Invariant: bigger-numbered bags are heavier and should stay at the root of the tree
691            if i < j {
692                let bag = bags.remove(i);
693                bags[j - 1].merge_bag(&bag);
694            } else {
695                let bag = bags.remove(j);
696                bags[i - 1].merge_bag(&bag);
697            }
698            changed = true;
699        }
700    }
701    bags
702}
703
704/// Topologically sorts bags based on variable dependencies, and merges bags so
705/// that the final result is a *chain*. This means `plan_single_bag` only ever
706/// needs a single prologue per bag and never an epilogue. This is because the
707/// epilogues do not participate in joins and are checked only after the main
708/// join loop, so they can easily lead to cartesian products.
709///
710/// At every DFS node we pick one child as the chain continuation. Every other reachable bag —
711/// siblings *and* their entire sub-trees — gets absorbed into the current chain node. The
712/// continuation is picked in a way that minimizes the maximum number of atoms in a bag, i.e.,
713/// the pathwidth.
714///
715/// The pathwidth of a path decomposition is the maximum bag size (minus one) over all bags,
716/// and the size of a bag is measured as the number of atoms in the bag.
717fn topologically_sort_bags(bags: Vec<PlanningContext>) -> Vec<PlanningContext> {
718    let mut all_children_list: Vec<Vec<usize>> = vec![vec![]; bags.len()];
719    // best_pathwidth[i] = the best pathwidth of the chain if we pick bag i
720    // to be the chain child.
721    let mut best_pathwidth = vec![usize::MAX; bags.len()];
722    let mut full = vec![HashSet::default(); bags.len()];
723    let mut choice = vec![usize::MAX; bags.len()];
724    for i in 0..bags.len() {
725        let mut full_i: HashSet<AtomId> =
726            bags[i].atoms.iter().map(|(atom_id, _)| atom_id).collect();
727        for child in all_children_list[i].iter() {
728            full_i.extend(full[*child].iter().copied());
729        }
730        full[i] = full_i;
731        best_pathwidth[i] = full[i].len();
732        for chain_child in all_children_list[i].iter() {
733            let mut chain_score: HashSet<_> =
734                bags[i].atoms.iter().map(|(atom_id, _)| atom_id).collect();
735            chain_score.extend(
736                all_children_list[*chain_child]
737                    .iter()
738                    .filter(|child| *child != chain_child)
739                    .flat_map(|child| full[*child].iter().copied()),
740            );
741            let s = chain_score.len().max(best_pathwidth[*chain_child]);
742            if s <= best_pathwidth[i] {
743                best_pathwidth[i] = s;
744                choice[i] = *chain_child;
745            }
746        }
747
748        // Find the parent of this bag, which must be the lowerest-numbered bag
749        // that shares the most variables with it.
750        let parent = bags
751            .iter()
752            .enumerate()
753            .skip(i + 1)
754            .map(|(j, b)| (j, b.common_vars_with(&bags[i]).count()))
755            .filter(|(_, count)| *count > 0)
756            .max_by_key(|(j, count)| (*count, -(*j as isize)));
757        if let Some((j, _count)) = parent {
758            all_children_list[j].push(i);
759        }
760    }
761
762    let mut bags_opt = bags.into_iter().map(Some).collect::<Vec<_>>();
763    let mut bags_topo = Vec::<PlanningContext>::with_capacity(bags_opt.len());
764    let mut visited = vec![false; bags_opt.len()];
765    // Stack entries: (bag_id, parent). `parent` is None for chain nodes (the bag is
766    // pushed to `bags_topo` as a new standalone entry) and Some(idx) for nodes being
767    // absorbed into `bags_topo[idx]`.
768    let mut stack: Vec<(usize, Option<usize>)> = Vec::new();
769
770    // Starting from the last, since early bags are more likely to be leaves and we don't
771    // want a leafy bag to be a root.
772    for i in (0..bags_opt.len()).rev() {
773        if visited[i] {
774            continue;
775        }
776        stack.push((i, None));
777        visited[i] = true;
778
779        while let Some((bag_id, parent)) = stack.pop() {
780            let bag = mem::take(&mut bags_opt[bag_id]).unwrap();
781
782            let this;
783            if let Some(parent) = parent {
784                bags_topo[parent].merge_bag(&bag);
785                this = parent;
786            } else {
787                this = bags_topo.len();
788            }
789
790            let all_children = &mut all_children_list[bag_id];
791
792            if parent.is_some() {
793                // This bag is being absorbed into `bags_topo[this]`. To keep the
794                // result a chain, every descendant of this bag is also absorbed —
795                // none of them get to spawn a new chain node.
796                for &i in all_children.iter() {
797                    visited[i] = true;
798                    stack.push((i, Some(this)));
799                }
800            } else {
801                // This bag is a chain node. The child that minimizes pathwidth continues the
802                // chain; the rest (and all their descendants, via the branch above)
803                // are absorbed into this chain node.
804                if !all_children.is_empty() {
805                    for &i in all_children[1..].iter() {
806                        if i == choice[bag_id] {
807                            continue;
808                        }
809                        visited[i] = true;
810                        stack.push((i, Some(this)));
811                    }
812                    visited[choice[bag_id]] = true;
813                    stack.push((choice[bag_id], None));
814                }
815            }
816
817            if parent.is_none() {
818                bags_topo.push(bag);
819            }
820        }
821    }
822
823    bags_topo.reverse();
824    bags_topo
825}
826
827/// Counts how many bags each variable appears in.
828///
829/// This is used to determine whether a variable should be passed as a message
830/// variable (if used in later bags) or a value variable (if only used in the current bag).
831fn count_variable_usage_per_bag(bags: &[PlanningContext]) -> DenseIdMap<Variable, usize> {
832    let mut n_used_in_bag = DenseIdMap::new();
833    for bag in bags {
834        for (var, _vinfo) in bag.vars.iter() {
835            if !n_used_in_bag.contains_key(var) {
836                n_used_in_bag.insert(var, 0);
837            }
838            n_used_in_bag[var] += 1;
839        }
840    }
841    n_used_in_bag
842}
843
844/// Plans the execution stages for a single bag.
845///
846/// This involves:
847/// - Dividing variables into message variables (passed to later stages) and value variables
848/// - Planning join stages within the bag
849/// - Adding prologue and epilogue instructions so that the bag is constrained by previous materializations.
850///
851/// This function also sets the `used_in_rhs` field for variables. A variable is not used in RHS during the planning
852/// of a bag if it's not used in later bags.
853fn plan_single_bag(
854    bag: &mut PlanningContext,
855    blocks: &[(JoinStages, MatSpec)],
856    // If this bag has been used to prune its parent
857    has_block_contributed: &mut [bool],
858    n_used_in_bag: &mut DenseIdMap<Variable, usize>,
859    strat: PlanStrategy,
860) -> (Vec<JoinHeader>, JoinStages, MatSpec) {
861    let mut msg_vars = smallvec![];
862    let mut val_vars = smallvec![];
863
864    // Classify variables as message or value variables
865    for (var, vinfo) in bag.vars.iter_mut() {
866        n_used_in_bag[var] -= 1;
867        if n_used_in_bag[var] > 0 {
868            // If this is a public variable, then we need to pass it on anyway
869            vinfo.used_in_rhs = true;
870            msg_vars.push(var);
871        } else {
872            // If this variable is not used in later and previous bag,
873            // and it is not used in the right hand side,
874            // this variable doesn't need to be expanded.
875            if !vinfo.used_in_rhs
876                && blocks.iter().all(|(_, spec)| !spec.msg_vars.contains(&var))
877                && n_used_in_bag[var] == 0
878            {
879                continue;
880            }
881            val_vars.push(var);
882            vinfo.used_in_rhs = true;
883        }
884    }
885
886    let mut stripped_bag = bag.clone();
887
888    // Add prologue and epilogue instructions to look up previous materialized bags
889    // These are constraints from children blocks. If there's only one such block, it can be the header.
890    // Otherwise, they have to be epilogue instructions doing filtering at the end, which is less efficient.
891    let mut prologue = None;
892    let mut epilogue = Vec::new();
893    for (i, prev_block) in blocks.iter().enumerate().rev() {
894        if prev_block.1.msg_vars.is_empty() {
895            continue;
896        }
897        if !has_block_contributed[i]
898            && prev_block
899                .1
900                .msg_vars
901                .iter()
902                .all(|var| bag.vars.contains_key(*var))
903        {
904            has_block_contributed[i] = true;
905            if prologue.is_none() {
906                let bind = prev_block
907                    .1
908                    .msg_vars
909                    .iter()
910                    .enumerate()
911                    .map(|(j, var)| (ColumnId::from_usize(j), *var))
912                    .collect();
913                let mut to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)> = vec![];
914                for (col, var) in prev_block.1.msg_vars.iter().enumerate() {
915                    let vinfo = &bag.vars[*var];
916                    for occ in vinfo.occurrences.iter() {
917                        let isect = match to_intersect
918                            .iter_mut()
919                            .find(|(spec, _)| spec.to_index.atom == occ.atom)
920                        {
921                            Some(isect) => isect,
922                            None => {
923                                to_intersect.push((
924                                    ScanSpec {
925                                        to_index: SubAtom {
926                                            atom: occ.atom,
927                                            vars: smallvec![],
928                                        },
929                                        constraints: vec![],
930                                    },
931                                    smallvec![],
932                                ));
933                                to_intersect.last_mut().unwrap()
934                            }
935                        };
936                        isect.0.to_index.vars.extend(occ.vars.iter().copied());
937                        isect
938                            .1
939                            .extend(occ.vars.iter().map(|_| ColumnId::from_usize(col)));
940                    }
941                }
942
943                prologue = Some(JoinStage::FusedIntersectMat {
944                    cover: MatId::from_usize(i),
945                    mode: MatScanMode::KeyOnly,
946                    bind,
947                    to_intersect,
948                });
949
950                stripped_bag
951                    .vars
952                    .retain(|var, _vinfo| !prev_block.1.msg_vars.contains(&var));
953            } else {
954                epilogue.push(JoinStage::FusedIntersectMat {
955                    cover: MatId::from_usize(i),
956                    mode: MatScanMode::Lookup(prev_block.1.msg_vars.clone()),
957                    bind: smallvec![],
958                    to_intersect: vec![],
959                });
960            }
961        }
962    }
963
964    let (header, mut instrs) = plan_stages(&stripped_bag, strat);
965    instrs.splice(0..0, prologue);
966    instrs.extend(epilogue);
967
968    let stages = JoinStages {
969        instrs: Arc::new(instrs),
970    };
971
972    (header, stages, MatSpec { msg_vars, val_vars })
973}
974
975/// Builds the final result block that collects results from all materialized bags.
976///
977/// This performs a bottom-up pass through the materialized bags, binding value
978/// variables and gathering results. Each block is scanned at most once.
979fn build_result_block(blocks: &[(JoinStages, MatSpec)]) -> JoinStages {
980    let mut result_block = Vec::new();
981    let mut pinned_vars = DenseIdMap::<Variable, ()>::new();
982
983    for (i, (_stages, mat_spec)) in blocks.iter().enumerate().rev() {
984        let to_bind: SmallVec<[(ColumnId, Variable); 2]> = mat_spec
985            .val_vars
986            .iter()
987            .copied()
988            .enumerate()
989            .filter(|(_, var)| !pinned_vars.contains_key(*var))
990            .map(|(i, var)| (ColumnId::from_usize(i), var))
991            .collect();
992
993        if to_bind.is_empty() {
994            continue;
995        }
996
997        for (_, var) in to_bind.iter() {
998            pinned_vars.insert(*var, ());
999        }
1000
1001        result_block.push(JoinStage::FusedIntersectMat {
1002            cover: MatId::from_usize(i),
1003            mode: if i == blocks.len() - 1 {
1004                MatScanMode::Full
1005            } else {
1006                MatScanMode::Value(mat_spec.msg_vars.clone())
1007            },
1008            bind: to_bind,
1009            to_intersect: vec![],
1010        });
1011    }
1012
1013    JoinStages {
1014        instrs: Arc::new(result_block),
1015    }
1016}
1017
1018/// The last stage and the result block have the following structure:
1019///
1020/// for ...
1021///    yield [] -> x1, x2, ... as Mn
1022///
1023/// For x1, x2, ... in Mn:
1024///   ...
1025///
1026/// This can be fused into one loop
1027///
1028/// This is currently not used because somehow iterating the materialized RowBuffer is much faster than iterating the table
1029#[allow(unused)]
1030fn fuse_last_stage(
1031    mut blocks: Vec<(JoinStages, MatSpec)>,
1032    result_block: JoinStages,
1033) -> (Vec<(JoinStages, MatSpec)>, JoinStages) {
1034    if blocks.is_empty() {
1035        return (blocks, result_block);
1036    }
1037
1038    let last_block = blocks.pop().unwrap();
1039    assert!(last_block.1.msg_vars.is_empty());
1040    if !matches!(
1041        result_block.instrs[0],
1042        JoinStage::FusedIntersectMat {
1043            cover,
1044            mode: MatScanMode::Full,
1045            ..
1046        } if cover == MatId::from_usize(blocks.len()
1047    )) {
1048        // If the first stage of the result block does not scan the last materialization
1049        return (blocks, result_block);
1050    }
1051
1052    // Fuse the instructions
1053    let mut last_block = last_block.0;
1054    let mut instrs = Arc::unwrap_or_clone(last_block.instrs);
1055    instrs.extend(result_block.instrs[1..].iter().cloned());
1056    last_block.instrs = Arc::new(instrs);
1057
1058    (blocks, last_block)
1059}
1060
1061/// Eagerly lift materialization lookups up
1062///
1063/// For example, in the following, looking up of `r` can be lifted up before `z`
1064///
1065/// for x in R isec S:
1066///  R = R[x]; S = S[x]
1067///  for z in R:
1068///   if r in Mat[x]:
1069///     yield
1070fn loop_lifting(stages: JoinStages) -> JoinStages {
1071    let mut instrs = Arc::unwrap_or_clone(stages.instrs);
1072    for i in 1..instrs.len() {
1073        if let JoinStage::FusedIntersectMat {
1074            cover: _,
1075            mode: MatScanMode::Lookup(vars),
1076            bind,
1077            to_intersect,
1078        } = &instrs[i]
1079        {
1080            assert!(bind.is_empty() && to_intersect.is_empty());
1081            let vars = vars.clone();
1082            let mut j = i;
1083            while j > 0 {
1084                if matches!(
1085                    &instrs[j - 1], JoinStage::FusedIntersect { bind, .. } | JoinStage::FusedIntersectMat { bind, ..}
1086                        if bind.iter().all(|(_, var)| !vars.contains(var))
1087                ) || matches!(&instrs[j - 1], JoinStage::Intersect { var, .. } if !vars.contains(var))
1088                {
1089                    instrs.swap(j - 1, j);
1090                    j -= 1;
1091                } else {
1092                    break;
1093                }
1094            }
1095        }
1096    }
1097    JoinStages {
1098        instrs: Arc::new(instrs),
1099    }
1100}
1101
1102/// This is the main entry point for query optimization using tree decomposition.
1103pub(crate) fn tree_decompose_and_plan(
1104    ctx: PlanningContext,
1105    strat: PlanStrategy,
1106    actions: ActionId,
1107    no_decomp: bool,
1108) -> Plan {
1109    macro_rules! fast_path {
1110        () => {{
1111            let (header, instrs) = plan_stages(&ctx, strat);
1112            let stages = JoinStages {
1113                instrs: Arc::new(instrs),
1114            };
1115
1116            Plan::SinglePlan(SinglePlan {
1117                atoms: Arc::new(ctx.atoms),
1118                header,
1119                stages,
1120                actions,
1121            })
1122        }};
1123    }
1124    if no_decomp || ctx.atoms.len() <= 2 {
1125        return fast_path!();
1126    }
1127
1128    // Step 1: Decompose the query into tree-structured bags
1129    let bags = decompose_into_bags(&ctx);
1130    if bags.len() <= 1 {
1131        // Don't do Yannakakis if it's just one bag
1132        return fast_path!();
1133    }
1134
1135    // Step 2: Sort bags topologically and merge leafy bags with their parents
1136    let mut bags = topologically_sort_bags(bags);
1137
1138    if bags.len() <= 1 {
1139        return fast_path!();
1140    }
1141
1142    // Step 3: Count variable usage across bags. Used for deciding if a variable is public (i.e., message variables) or private.
1143    let mut n_used_in_bag = count_variable_usage_per_bag(&bags);
1144    let mut has_block_contributed = vec![false; bags.len()];
1145
1146    // Step 4: Plan each bag and create materialization blocks
1147    let mut blocks = Vec::new();
1148    let mut header = vec![];
1149    for bag in bags.iter_mut() {
1150        let (bag_header, stages, mat_spec) = plan_single_bag(
1151            bag,
1152            &blocks,
1153            &mut has_block_contributed,
1154            &mut n_used_in_bag,
1155            strat,
1156        );
1157        blocks.push((stages, mat_spec));
1158        header.extend(bag_header);
1159    }
1160
1161    // Step 5: Build the final result block
1162    let result_block = build_result_block(&blocks);
1163
1164    // Optimization the avoids the last materialization
1165    // let (blocks, result_block) = fuse_last_stage(blocks, result_block);
1166
1167    // Lifting variables
1168    let blocks = blocks
1169        .into_iter()
1170        .map(|(stages, mat_spec)| (loop_lifting(stages), mat_spec))
1171        .collect::<Vec<_>>();
1172    let result_block = loop_lifting(result_block);
1173
1174    Plan::DecomposedPlan(DecomposedPlan {
1175        atoms: Arc::new(ctx.atoms),
1176        header,
1177        stages: JoinStageBlocks { blocks },
1178        result_block,
1179        actions,
1180    })
1181}
1182
1183pub(crate) fn plan_query<'a>(query: Query, col_est: ColumnCardEst<'a>) -> Plan {
1184    let atoms = query.atoms;
1185    let ctx = PlanningContext {
1186        vars: query.var_info,
1187        atoms,
1188        fun_deps: Arc::new(query.fun_deps),
1189        col_est,
1190    };
1191    tree_decompose_and_plan(ctx, query.plan_strategy, query.action, query.no_decomp)
1192}
1193
1194/// StageInfo is an intermediate stage used to describe the ordering of
1195/// operations. One of these contains enough information to "expand" it to a
1196/// JoinStage, but it still contains variable information.
1197///
1198/// This separation makes it easier for us to iterate with different planning
1199/// algorithms while sharing the same "backend" that generates a concrete plan.
1200#[derive(Debug)]
1201struct StageInfo {
1202    cover: SubAtom,
1203    vars: SmallVec<[Variable; 1]>,
1204    filters: Vec<(
1205        SubAtom,                 /* the subatom to index */
1206        SmallVec<[ColumnId; 2]>, /* how to build a key for that index from the cover atom */
1207    )>,
1208}
1209
1210/// Immutable context for query planning containing references to query metadata.
1211#[derive(Debug, Clone)]
1212pub(crate) struct PlanningContext<'a> {
1213    vars: DenseIdMap<Variable, VarInfo>,
1214    atoms: DenseIdMap<AtomId, Atom>,
1215    fun_deps: Arc<FunDeps>,
1216    col_est: ColumnCardEst<'a>,
1217}
1218
1219impl<'a> PlanningContext<'a> {
1220    fn is_subsumed_by(&self, bag2: &PlanningContext<'a>) -> bool {
1221        self.is_subsumed_by_vars(&bag2.vars)
1222    }
1223
1224    fn is_subsumed_by_vars<I>(&self, bag2: &DenseIdMap<Variable, I>) -> bool {
1225        self.vars.iter().all(|(var, _)| bag2.contains_key(var))
1226    }
1227
1228    fn merge_bag(&mut self, bag2: &PlanningContext<'a>) {
1229        for (var, vinfo) in bag2.vars.iter() {
1230            if self.vars.contains_key(var) {
1231                for new_occ in vinfo.occurrences.iter().cloned() {
1232                    if !self.vars[var]
1233                        .occurrences
1234                        .iter()
1235                        .any(|occ| occ.atom == new_occ.atom)
1236                    {
1237                        self.vars[var].occurrences.push(new_occ);
1238                    }
1239                }
1240            } else {
1241                self.vars.insert(var, vinfo.clone());
1242            }
1243        }
1244        for (atom_id, atom) in bag2.atoms.iter() {
1245            // atoms don't need to be merged
1246            if !self.atoms.contains_key(atom_id) {
1247                self.atoms.insert(atom_id, atom.clone());
1248            }
1249        }
1250    }
1251
1252    fn common_vars_with<'b>(
1253        &'b self,
1254        other: &'b PlanningContext<'a>,
1255    ) -> impl Iterator<Item = Variable> + 'b {
1256        self.vars
1257            .iter()
1258            .filter(|(var, _)| other.vars.contains_key(*var))
1259            .map(|(var, _)| var)
1260    }
1261
1262    fn has_vars(&self, mut vars: impl Iterator<Item = Variable>) -> bool {
1263        vars.all(|var| self.vars.contains_key(var))
1264    }
1265}
1266
1267type VarSet = FixedBitSet;
1268type AtomSet = FixedBitSet;
1269
1270/// Mutable state tracked during query planning.
1271#[derive(Clone)]
1272pub(crate) struct PlanningState {
1273    used_vars: VarSet,
1274    constrained_atoms: AtomSet,
1275}
1276
1277impl PlanningState {
1278    fn new(n_vars: usize, n_atoms: usize) -> Self {
1279        Self {
1280            used_vars: VarSet::with_capacity(n_vars),
1281            constrained_atoms: AtomSet::with_capacity(n_atoms),
1282        }
1283    }
1284
1285    fn mark_var_used(&mut self, var: Variable) {
1286        self.used_vars.insert(var.index());
1287    }
1288
1289    fn is_var_used(&self, var: Variable) -> bool {
1290        self.used_vars.contains(var.index())
1291    }
1292
1293    fn mark_atom_constrained(&mut self, atom: AtomId) {
1294        self.constrained_atoms.insert(atom.index());
1295    }
1296
1297    fn is_atom_constrained(&self, atom: AtomId) -> bool {
1298        self.constrained_atoms.contains(atom.index())
1299    }
1300}
1301
1302/// Datastructure used to greedily solve the set cover problem for a given free
1303/// join plan.
1304struct BucketQueue<'a> {
1305    var_info: &'a DenseIdMap<Variable, VarInfo>,
1306    cover: VarSet,
1307    atom_info: DenseIdMap<AtomId, VarSet>,
1308    sizes: BTreeMap<usize, IndexSet<AtomId>>,
1309}
1310
1311impl<'a> BucketQueue<'a> {
1312    fn new(var_info: &'a DenseIdMap<Variable, VarInfo>, atoms: &DenseIdMap<AtomId, Atom>) -> Self {
1313        let cover = VarSet::with_capacity(var_info.n_ids());
1314        let mut atom_info = DenseIdMap::with_capacity(atoms.n_ids());
1315        let mut sizes = BTreeMap::<usize, IndexSet<AtomId>>::new();
1316        for (id, atom) in atoms.iter() {
1317            let mut bitset = VarSet::with_capacity(var_info.n_ids());
1318            for var in atom.vars() {
1319                bitset.insert(var.index());
1320            }
1321            sizes.entry(bitset.count_ones(..)).or_default().insert(id);
1322            atom_info.insert(id, bitset);
1323        }
1324        BucketQueue {
1325            var_info,
1326            cover,
1327            atom_info,
1328            sizes,
1329        }
1330    }
1331
1332    /// Return the atom with the largest number of uncovered variables. A
1333    /// variable is "covered" if a previous call to `pop_min` returned an atom
1334    /// referencing that variable.
1335    fn pop_min(&mut self) -> Option<AtomId> {
1336        // Pick an arbitrary atom from the smallest bucket.
1337        let (_, atoms) = self.sizes.iter_mut().next_back()?;
1338        let res = atoms.pop().unwrap();
1339        let vars = self.atom_info[res].clone();
1340        // For each variable that we added to the cover, remove it from the
1341        // entries in atom_info referencing it and update `sizes` to reflect the
1342        // new ordering.
1343        for new_var in vars.difference(&self.cover).map(Variable::from_usize) {
1344            for subatom in &self.var_info[new_var].occurrences {
1345                let cur_set = &mut self.atom_info[subatom.atom];
1346                let old_size = cur_set.count_ones(..);
1347                cur_set.difference_with(&vars);
1348                let new_size = cur_set.count_ones(..);
1349                if old_size == new_size {
1350                    continue;
1351                }
1352                if let Some(old_size_set) = self.sizes.get_mut(&old_size) {
1353                    old_size_set.swap_remove(&subatom.atom);
1354                    if old_size_set.is_empty() {
1355                        self.sizes.remove(&old_size);
1356                    }
1357                }
1358                if new_size > 0 {
1359                    self.sizes.entry(new_size).or_default().insert(subatom.atom);
1360                }
1361            }
1362        }
1363        self.cover.union_with(&vars);
1364        Some(res)
1365    }
1366}
1367
1368/// Build join headers from fast constraints and compute remaining constraints for planning.
1369/// Returns (headers, remaining_constraints) tuple.
1370fn plan_headers<'a, 'b>(
1371    ctx: &'b PlanningContext<'a>,
1372) -> (
1373    Vec<JoinHeader>,
1374    DenseIdMap<
1375        AtomId,
1376        (
1377            usize, /* The approx size of the subset matching the constraints. */
1378            &'b Pooled<Vec<Constraint>>,
1379        ),
1380    >,
1381) {
1382    let mut header = Vec::new();
1383    let mut remaining_constraints: DenseIdMap<AtomId, (usize, &Pooled<Vec<Constraint>>)> =
1384        Default::default();
1385
1386    for (atom, atom_info) in ctx.atoms.iter() {
1387        remaining_constraints.insert(
1388            atom,
1389            (
1390                atom_info.constraints.approx_size(),
1391                &atom_info.constraints.slow,
1392            ),
1393        );
1394        if !atom_info.constraints.fast.is_empty() {
1395            header.push(JoinHeader {
1396                atom,
1397                constraints: Pooled::cloned(&atom_info.constraints.fast),
1398                subset: atom_info.constraints.subset.clone(),
1399            });
1400        }
1401    }
1402
1403    (header, remaining_constraints)
1404}
1405
1406/// Plan query execution stages using the specified strategy.
1407/// Returns (header, instructions) tuple that can be assembled into a Plan by the caller.
1408/// It does not directly return the plan because the caller may want to further modify the stages.
1409fn plan_stages(ctx: &PlanningContext, strat: PlanStrategy) -> (Vec<JoinHeader>, Vec<JoinStage>) {
1410    let (header, remaining_constraints) = plan_headers(ctx);
1411    let mut instrs = Vec::new();
1412    let mut state = PlanningState::new(ctx.vars.n_ids(), ctx.atoms.n_ids());
1413
1414    match strat {
1415        PlanStrategy::PureSize | PlanStrategy::MinCover => {
1416            plan_free_join(ctx, &mut state, strat, &remaining_constraints, &mut instrs)
1417        }
1418        PlanStrategy::Gj => plan_gj(ctx, &mut state, &remaining_constraints, &mut instrs),
1419    };
1420
1421    (header, instrs)
1422}
1423
1424/// Plan free join queries using pure size or minimal cover strategy.
1425fn plan_free_join(
1426    ctx: &PlanningContext,
1427    state: &mut PlanningState,
1428    strat: PlanStrategy,
1429    remaining_constraints: &DenseIdMap<AtomId, (usize, &Pooled<Vec<Constraint>>)>,
1430    stages: &mut Vec<JoinStage>,
1431) {
1432    let mut size_info = Vec::<(AtomId, usize)>::new();
1433
1434    match strat {
1435        PlanStrategy::PureSize => {
1436            for (atom, (size, _)) in remaining_constraints.iter() {
1437                size_info.push((atom, *size));
1438            }
1439        }
1440        PlanStrategy::MinCover => {
1441            let mut eligible_covers = HashSet::default();
1442            let mut queue = BucketQueue::new(&ctx.vars, &ctx.atoms);
1443            while let Some(atom) = queue.pop_min() {
1444                eligible_covers.insert(atom);
1445            }
1446            for (atom, (size, _)) in remaining_constraints
1447                .iter()
1448                .filter(|(atom, _)| eligible_covers.contains(atom))
1449            {
1450                size_info.push((atom, *size));
1451            }
1452        }
1453        PlanStrategy::Gj => unreachable!(),
1454    };
1455
1456    size_info.sort_by_key(|(_, size)| *size);
1457    let mut atoms = size_info.iter().map(|(atom, _)| *atom);
1458
1459    while let Some(info) = get_next_freejoin_stage(ctx, state, &mut atoms) {
1460        let stage = compile_stage(ctx, state, info);
1461        stages.push(stage);
1462    }
1463}
1464
1465/// Generate the next free join stage by picking an atom from the ordering.
1466/// Returns the stage info and updated state, or None if all atoms are covered.
1467fn get_next_freejoin_stage(
1468    ctx: &PlanningContext,
1469    state: &mut PlanningState,
1470    ordering: &mut impl Iterator<Item = AtomId>,
1471) -> Option<StageInfo> {
1472    let mut scratch_subatom: HashMap<AtomId, SmallVec<[ColumnId; 2]>> = Default::default();
1473
1474    loop {
1475        let mut covered = false;
1476        let atom = ordering.next()?;
1477        let atom_info = &ctx.atoms[atom];
1478        let mut cover = SubAtom::new(atom);
1479        let mut vars = SmallVec::<[Variable; 1]>::new();
1480
1481        for (ix, var) in atom_info.var_columns.iter() {
1482            if state.is_var_used(var) {
1483                continue;
1484            }
1485            // This atom is not completely covered by previous stages.
1486            covered = true;
1487            state.mark_var_used(var);
1488            vars.push(var);
1489            cover.vars.push(ix);
1490
1491            for subatom in ctx.vars[var].occurrences.iter() {
1492                if subatom.atom == atom {
1493                    continue;
1494                }
1495                scratch_subatom
1496                    .entry(subatom.atom)
1497                    .or_default()
1498                    .extend(subatom.vars.iter().copied());
1499            }
1500        }
1501
1502        if !covered {
1503            // Search the next atom.
1504            continue;
1505        }
1506
1507        let mut filters = Vec::new();
1508        for (atom, cols) in scratch_subatom.drain() {
1509            let mut form_key = SmallVec::<[ColumnId; 2]>::new();
1510            for var_ix in &cols {
1511                let var = ctx.atoms[atom].get_var(*var_ix).unwrap();
1512                // form_key is an index _into the subatom forming the cover_.
1513                let cover_col = vars.iter().position(|v| *v == var).unwrap();
1514                form_key.push(ColumnId::from_usize(cover_col));
1515            }
1516            filters.push((SubAtom { atom, vars: cols }, form_key));
1517        }
1518
1519        return Some(StageInfo {
1520            cover,
1521            vars,
1522            filters,
1523        });
1524    }
1525}
1526
1527/// Plan generic join queries (one variable per stage).
1528///
1529/// Variables are visited in their natural id order. Runtime `sort_plan_by_size` reorders stages
1530/// anyway, so static ordering only needs to be deterministic; [`fuse_single_scans`] collapses
1531/// any same-atom single-scans afterwards regardless of where they ended up.
1532fn plan_gj(
1533    ctx: &PlanningContext,
1534    state: &mut PlanningState,
1535    _remaining_constraints: &DenseIdMap<AtomId, (usize, &Pooled<Vec<Constraint>>)>,
1536    stages: &mut Vec<JoinStage>,
1537) {
1538    let mut planned_vars = Vec::with_capacity(ctx.vars.n_ids());
1539    let mut atoms_hit = AtomSet::with_capacity(ctx.atoms.n_ids());
1540    for (var, var_info) in ctx.vars.iter() {
1541        let n_occs = var_info.occurrences.len();
1542        if n_occs == 0 {
1543            // No occurrences: ignore (may be bound on the RHS or simply unused).
1544            continue;
1545        }
1546        if n_occs == 1 && !var_info.used_in_rhs {
1547            // Skip for now; we'll plan it below only if its atom is otherwise unmentioned.
1548            continue;
1549        }
1550        for subatom in var_info.occurrences.iter() {
1551            atoms_hit.set(subatom.atom.index(), true);
1552        }
1553        planned_vars.push(var);
1554    }
1555    for (var, var_info) in ctx.vars.iter() {
1556        if var_info.occurrences.len() == 1 && !var_info.used_in_rhs {
1557            // The variable looks "unused" but we still need to touch its atom if nothing else
1558            // has so the join visits every relation.
1559            let subatom = &var_info.occurrences[0];
1560            if !atoms_hit.contains(subatom.atom.index()) {
1561                atoms_hit.set(subatom.atom.index(), true);
1562                planned_vars.push(var);
1563            }
1564        }
1565    }
1566    for var in planned_vars {
1567        let occ = ctx.vars[var].occurrences[0].clone();
1568        let mut info = StageInfo {
1569            cover: occ,
1570            vars: smallvec![var],
1571            filters: Default::default(),
1572        };
1573        for occ in &ctx.vars[var].occurrences[1..] {
1574            info.filters
1575                .push((occ.clone(), smallvec![ColumnId::new(0); occ.vars.len()]));
1576        }
1577
1578        stages.push(compile_stage(ctx, state, info));
1579    }
1580    fuse_single_scans(stages);
1581}
1582
1583/// Compile a stage info into a concrete join stage, updating constraint state.
1584fn compile_stage(
1585    ctx: &PlanningContext,
1586    state: &mut PlanningState,
1587    StageInfo {
1588        cover,
1589        vars,
1590        filters,
1591    }: StageInfo,
1592) -> JoinStage {
1593    fn take_atom_constraints_if_new(
1594        ctx: &PlanningContext,
1595        state: &mut PlanningState,
1596        atom: AtomId,
1597    ) -> Vec<Constraint> {
1598        if state.is_atom_constrained(atom) {
1599            Default::default()
1600        } else {
1601            state.mark_atom_constrained(atom);
1602            ctx.atoms[atom].constraints.slow.clone()
1603        }
1604    }
1605
1606    // Only do this if it's a join of more than one relations
1607    if vars.len() == 1 && !filters.is_empty() {
1608        let scans = SmallVec::<[SingleScanSpec; 3]>::from_iter(
1609            iter::once(&cover)
1610                .chain(filters.iter().map(|(x, _)| x))
1611                .map(|subatom| {
1612                    let atom = subatom.atom;
1613                    SingleScanSpec {
1614                        atom,
1615                        column: subatom.vars[0],
1616                        cs: take_atom_constraints_if_new(ctx, state, atom),
1617                    }
1618                }),
1619        );
1620
1621        return JoinStage::Intersect {
1622            var: vars[0],
1623            scans,
1624        };
1625    }
1626
1627    // FusedIntersect case
1628    let atom = cover.atom;
1629
1630    let cover_spec = ScanSpec {
1631        to_index: cover,
1632        constraints: take_atom_constraints_if_new(ctx, state, atom),
1633    };
1634
1635    let mut bind = SmallVec::new();
1636    for var in vars {
1637        bind.push((ctx.atoms[atom].get_col(var).unwrap(), var));
1638    }
1639
1640    let mut to_intersect = Vec::with_capacity(filters.len());
1641    for (subatom, key_spec) in filters {
1642        let atom = subatom.atom;
1643        let scan = ScanSpec {
1644            to_index: subatom,
1645            constraints: take_atom_constraints_if_new(ctx, state, atom),
1646        };
1647        to_intersect.push((scan, key_spec));
1648    }
1649
1650    JoinStage::FusedIntersect {
1651        cover: cover_spec,
1652        bind,
1653        to_intersect,
1654    }
1655}