1use std::{collections::BTreeMap, iter, mem, sync::Arc};
48
49use crate::{
50 TableId,
51 free_join::{ColUniqueness, ColumnCardEst, ProcessedConstraints},
52 numeric_id::{DenseIdMap, NumericId},
53 query::{FunDeps, SymbolMap},
54};
55use egglog_numeric_id::define_id;
56use fixedbitset::FixedBitSet;
57use smallvec::{SmallVec, smallvec};
58
59use crate::{
60 common::{HashMap, HashSet, IndexSet},
61 offsets::Subset,
62 pool::Pooled,
63 query::{Atom, Query, VarColumnMap},
64 table_spec::Constraint,
65};
66
67use super::{ActionId, AtomId, ColumnId, SubAtom, VarInfo, Variable};
68
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub(crate) struct ScanSpec {
71 pub to_index: SubAtom,
72 pub constraints: Vec<Constraint>,
74}
75
76#[derive(Clone, Debug, PartialEq, Eq)]
77pub(crate) struct SingleScanSpec {
78 pub atom: AtomId,
79 pub column: ColumnId,
80 pub cs: Vec<Constraint>,
81}
82
83define_id!(pub(crate) MatId, u32, "An identifier for materialization within a decomposed plan.");
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub(crate) enum MatScanMode {
87 Full,
88 KeyOnly,
89 Value(SmallVec<[Variable; 16]>),
90 Lookup(SmallVec<[Variable; 16]>),
91}
92
93pub(crate) struct JoinHeader {
96 pub atom: AtomId,
97 #[allow(unused)]
100 pub constraints: Pooled<Vec<Constraint>>,
101 pub subset: Subset,
108}
109
110impl std::fmt::Debug for JoinHeader {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("JoinHeader")
113 .field("atom", &self.atom)
114 .field("constraints", &self.constraints)
115 .field(
116 "subset",
117 &format_args!("Subset(size={})", self.subset.size()),
118 )
119 .finish()
120 }
121}
122
123impl Clone for JoinHeader {
124 fn clone(&self) -> Self {
125 JoinHeader {
126 atom: self.atom,
127 constraints: Pooled::cloned(&self.constraints),
128 subset: self.subset.clone(),
129 }
130 }
131}
132
133#[derive(Debug, Clone)]
134pub(crate) enum JoinStage {
135 Intersect {
139 var: Variable,
140 scans: SmallVec<[SingleScanSpec; 3]>,
141 },
142 FusedIntersect {
146 cover: ScanSpec,
147 bind: SmallVec<[(ColumnId, Variable); 2]>,
148 to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)>,
150 },
151 FusedIntersectMat {
152 cover: MatId,
153 mode: MatScanMode,
154 bind: SmallVec<[(ColumnId, Variable); 2]>,
155 to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)>,
156 },
157}
158
159fn fuse_single_scans(stages: &mut Vec<JoinStage>) {
164 let mut i = 0;
165 while i < stages.len() {
166 let cur_atom = match &stages[i] {
167 JoinStage::FusedIntersect {
168 cover,
169 to_intersect,
170 ..
171 } if to_intersect.is_empty() => cover.to_index.atom,
172 _ => {
173 i += 1;
174 continue;
175 }
176 };
177 let target = (0..i).find(|&j| {
178 matches!(
179 &stages[j],
180 JoinStage::FusedIntersect { cover, to_intersect, .. }
181 if to_intersect.is_empty() && cover.to_index.atom == cur_atom
182 )
183 });
184 let Some(j) = target else {
185 i += 1;
186 continue;
187 };
188 let JoinStage::FusedIntersect {
189 cover: cover_i,
190 bind: bind_i,
191 ..
192 } = stages.remove(i)
193 else {
194 unreachable!("checked above")
195 };
196 let JoinStage::FusedIntersect {
197 cover: cover_j,
198 bind: bind_j,
199 ..
200 } = &mut stages[j]
201 else {
202 unreachable!("checked above")
203 };
204 cover_j.to_index.vars.extend(cover_i.to_index.vars);
205 cover_j.constraints.extend(cover_i.constraints);
206 bind_j.extend(bind_i);
207 }
209}
210
211#[derive(Debug, Clone)]
212pub(crate) enum Plan {
213 SinglePlan(SinglePlan),
214 DecomposedPlan(DecomposedPlan),
215}
216impl Plan {
217 pub fn actions(&self) -> ActionId {
218 match self {
219 Plan::SinglePlan(p) => p.actions,
220 Plan::DecomposedPlan(p) => p.actions,
221 }
222 }
223
224 pub fn atoms(&self) -> Arc<DenseIdMap<AtomId, Atom>> {
225 match self {
226 Plan::SinglePlan(p) => p.atoms.clone(),
227 Plan::DecomposedPlan(p) => p.atoms.clone(),
228 }
229 }
230
231 pub(crate) fn to_report(&self, _symbol_map: &SymbolMap) -> egglog_reports::Plan {
232 match self {
233 Plan::SinglePlan(p) => p.to_report(_symbol_map),
234 Plan::DecomposedPlan(_) => {
235 todo!()
236 }
237 }
238 }
239
240 pub(crate) fn header(&self) -> &[JoinHeader] {
241 match self {
242 Plan::SinglePlan(p) => &p.header,
243 Plan::DecomposedPlan(p) => &p.header,
244 }
245 }
246}
247
248#[derive(Debug, Clone)]
249pub(crate) struct SinglePlan {
250 pub atoms: Arc<DenseIdMap<AtomId, Atom>>,
251 pub header: Vec<JoinHeader>,
252 pub stages: JoinStages,
253 pub actions: ActionId,
254}
255
256#[derive(Debug, Clone)]
257pub(crate) struct JoinStages {
258 pub instrs: Arc<Vec<JoinStage>>,
259}
260
261#[derive(Debug, Clone)]
270pub(crate) struct MatSpec {
271 pub msg_vars: SmallVec<[Variable; 16]>,
273 pub val_vars: SmallVec<[Variable; 16]>,
275}
276
277#[derive(Debug, Clone)]
278pub(crate) struct JoinStageBlocks {
279 pub blocks: Vec<(JoinStages, MatSpec)>,
281}
282
283#[derive(Debug, Clone)]
284pub(crate) struct DecomposedPlan {
285 pub atoms: Arc<DenseIdMap<AtomId, Atom>>,
286 pub header: Vec<JoinHeader>,
287 pub stages: JoinStageBlocks,
288 pub result_block: JoinStages,
289 pub actions: ActionId,
290}
291
292impl SinglePlan {
293 pub(crate) fn to_report(&self, symbol_map: &SymbolMap) -> egglog_reports::Plan {
294 use egglog_reports::{
295 Plan as ReportPlan, Scan as ReportScan, SingleScan as ReportSingleScan,
296 Stage as ReportStage,
297 };
298 const INTERNAL_PREFIX: &str = "@";
299 let get_var = |var: Variable| {
300 symbol_map
301 .vars
302 .get(&var)
303 .map(|s| s.to_string())
304 .unwrap_or_else(|| format!("{INTERNAL_PREFIX}x{var:?}"))
305 };
306 let get_atom = |atom: AtomId| {
307 symbol_map
308 .atoms
309 .get(&atom)
310 .map(|s| s.to_string())
311 .unwrap_or_else(|| format!("{INTERNAL_PREFIX}R{atom:?}"))
312 };
313 let mut stages = Vec::new();
314 for (i, stage) in self.stages.instrs.iter().enumerate() {
315 let report_stage = match stage {
316 JoinStage::Intersect { var, scans } => {
317 let var_name = get_var(*var);
318 let report_scans = scans
319 .iter()
320 .map(|scan| {
321 let atom_name = get_atom(scan.atom);
322 ReportSingleScan(
323 atom_name,
324 (var_name.clone(), scan.column.index() as i64),
325 )
326 })
327 .collect();
328 ReportStage::Intersect {
329 scans: report_scans,
330 }
331 }
332 JoinStage::FusedIntersect {
333 cover,
334 bind: _,
335 to_intersect,
336 } => {
337 let cover_atom_name = get_atom(cover.to_index.atom);
338 let cover_cols: Vec<(String, i64)> = cover
339 .to_index
340 .vars
341 .iter()
342 .map(|col| {
343 let var_name =
344 get_var(self.atoms[cover.to_index.atom].get_var(*col).unwrap());
345 (var_name, col.index() as i64)
346 })
347 .collect();
348 let report_cover = ReportScan(cover_atom_name, cover_cols);
349 let report_to_intersect = to_intersect
350 .iter()
351 .map(|(scan, key_spec)| {
352 let atom_name = get_atom(scan.to_index.atom);
353 let cols: Vec<(String, i64)> = key_spec
354 .iter()
355 .map(|col| {
356 let var_name = get_var(
357 self.atoms[scan.to_index.atom].get_var(*col).unwrap(),
358 );
359 (var_name, col.index() as i64)
360 })
361 .collect();
362 ReportScan(atom_name, cols)
363 })
364 .collect();
365 ReportStage::FusedIntersect {
366 cover: report_cover,
367 to_intersect: report_to_intersect,
368 }
369 }
370 JoinStage::FusedIntersectMat {
371 cover: _,
372 mode: _,
373 bind: _,
374 to_intersect: _,
375 } => {
376 todo!("materialization")
377 }
378 };
379 let next = if i == self.stages.instrs.len() - 1 {
380 vec![]
381 } else {
382 vec![i + 1]
383 };
384 stages.push((report_stage, None, next));
385 }
386 ReportPlan { stages }
387 }
388}
389
390#[derive(Default, Copy, Clone)]
392pub enum PlanStrategy {
393 PureSize,
396
397 MinCover,
405
406 #[default]
409 Gj,
410}
411
412fn next_var_to_eliminate(
421 vars: &DenseIdMap<Variable, VarInfo>,
422 atoms: &DenseIdMap<AtomId, Atom>,
423 fun_deps: &FunDeps,
424 col_est: &ColumnCardEst<'_>,
425) -> Option<IndexSet<Variable>> {
426 let (_var, subquery_vars) = vars
427 .iter()
428 .map(|(var, vinfo)| {
429 let subquery_vars = atoms
430 .iter()
431 .filter(|(_, atom)| atom.get_col(var).is_some())
433 .flat_map(|(_, atom)| atom.vars());
435
436 let subquery_vars: DenseIdMap<_, ()> = subquery_vars.map(|v| (v, ())).collect();
440
441 let occ = atoms
442 .iter()
443 .filter(|(_, atom)| atom.vars().any(|v| subquery_vars.contains_key(v)))
444 .count();
445 let size_estimation = vinfo
446 .occurrences
447 .iter()
448 .filter_map(|occ| {
449 let atom = &atoms[occ.atom];
450 let table = atom.table;
451 if table.is_dummy() {
452 return None;
453 }
454 let col = atom.get_col(var).unwrap();
455 Some(col_est.col_uniqueness(table, col))
458 })
459 .fold(ColUniqueness::default(), |a, b| a.join(&b));
460 ((occ, size_estimation), var, subquery_vars)
461 })
463 .min_by_key(|a| a.0)
464 .map(|a| (a.1, a.2))?;
465 Some(IndexSet::from_iter(
466 fun_deps
467 .closure(subquery_vars.iter().map(|(var, _)| var))
468 .into_iter()
469 .map(|(var, _)| var),
470 ))
471}
472
473fn update_hypergraph(
477 subquery_vars: &IndexSet<Variable>,
478 vars: &mut DenseIdMap<Variable, VarInfo>,
479 atoms: &mut DenseIdMap<AtomId, Atom>,
480) {
481 let covering_vars: Vec<_> = subquery_vars
485 .iter()
486 .copied()
487 .filter(|&var| {
488 vars.contains_key(var)
489 && vars[var].occurrences.iter().any(|occ| {
490 atoms[occ.atom]
491 .vars()
492 .any(|ov| !subquery_vars.contains(&ov))
493 })
494 })
495 .collect();
496
497 let mut removed = Vec::new();
499 atoms.retain(|atom_id, atom| {
500 if atom.vars().all(|var| subquery_vars.contains(&var)) {
501 removed.push(atom_id);
502 false
503 } else {
504 true
505 }
506 });
507
508 for &subq_var in subquery_vars.iter() {
510 if vars.contains_key(subq_var) {
511 vars[subq_var]
512 .occurrences
513 .retain(|occ| !removed.contains(&occ.atom));
514
515 if vars[subq_var].occurrences.is_empty() {
516 vars.unwrap_val(subq_var);
517 }
518 }
519 }
520
521 let mut var_columns = VarColumnMap::default();
523 for (ix, var) in covering_vars.iter().enumerate() {
524 var_columns.insert(*var, ColumnId::from_usize(ix));
525 }
526 let fake_atom_id = atoms.push(Atom {
527 var_columns,
528 constraints: ProcessedConstraints::dummy(),
529 table: TableId::dummy(),
530 });
531
532 for (i, &covering_var) in covering_vars.iter().enumerate() {
534 vars[covering_var].occurrences.push(SubAtom {
535 atom: fake_atom_id,
536 vars: smallvec![ColumnId::from_usize(i)],
537 });
538 }
539}
540
541fn decompose_into_bags<'a>(original_ctx: &PlanningContext<'a>) -> Vec<PlanningContext<'a>> {
570 let mut atoms = original_ctx.atoms.clone();
571 let mut vars = original_ctx.vars.clone();
572
573 for (var, vinfo) in original_ctx.vars.iter() {
575 if vinfo.occurrences.is_empty() {
576 vars.take(var).unwrap();
577 }
578 }
579
580 let mut bags = Vec::new();
581
582 while let Some(subquery_vars) =
584 next_var_to_eliminate(&vars, &atoms, &original_ctx.fun_deps, &original_ctx.col_est)
585 {
586 update_hypergraph(&subquery_vars, &mut vars, &mut atoms);
589
590 let subquery_atoms: DenseIdMap<AtomId, Atom> = original_ctx
592 .atoms
593 .iter()
594 .filter(|(_, atom)| atom.vars().any(|var| subquery_vars.contains(&var)))
595 .map(|(atom_id, atom)| (atom_id, atom.clone()))
596 .collect();
597
598 let subquery_var_map = DenseIdMap::from_iter(subquery_vars.iter().map(|var| {
599 let mut var_info = original_ctx.vars[*var].clone();
600 var_info
602 .occurrences
603 .retain(|occ| subquery_atoms.contains_key(occ.atom));
604 (*var, var_info)
605 }));
606
607 bags.push(PlanningContext {
608 vars: subquery_var_map,
609 atoms: subquery_atoms,
610 fun_deps: original_ctx.fun_deps.clone(),
611 col_est: original_ctx.col_est.clone(),
612 });
613 }
614
615 assert!(
616 !atoms.iter().any(|(_, atom_info)| {
617 !atom_info.table.is_dummy() && !atom_info.var_columns.is_empty()
618 }),
619 "All atoms should be put into bags"
620 );
621
622 let mut changed = true;
624 while changed {
625 changed = false;
626 let mut pruned_bags: Vec<PlanningContext> = Vec::with_capacity(bags.len());
629 for mut bag1 in bags.into_iter() {
630 pruned_bags.retain_mut(|bag2| {
631 let leq = bag1.is_subsumed_by(bag2);
632 let geq = bag2.is_subsumed_by(&bag1);
633 if leq || geq {
634 bag1.merge_bag(bag2);
635 changed = true;
636 false
637 } else {
638 true
639 }
640 });
641 pruned_bags.push(bag1);
642 }
643
644 bags = pruned_bags;
651 let is_ear = |bag: &PlanningContext| {
652 bag.atoms.iter().any(|(_atom_id, atom)| {
653 let all_vars = original_ctx.fun_deps.closure(atom.vars());
654 bag.is_subsumed_by_vars(&all_vars)
655 })
656 || bag
660 .atoms
661 .iter()
662 .filter(|(_atom_id, atom)| bag.has_vars(atom.vars()))
663 .count()
664 == 1
665 };
666
667 let mut i = 0;
668 while i < bags.len() {
669 if !is_ear(&bags[i]) {
670 i += 1;
671 continue;
672 }
673
674 let parent = bags
676 .iter()
677 .enumerate()
678 .rev()
679 .filter(|(j, _)| *j != i)
680 .map(|(j, b)| (j, b.common_vars_with(&bags[i]).count()))
681 .collect::<Vec<_>>();
682
683 let j = parent.into_iter().max_by_key(|(_, count)| *count);
684 if j.is_none() || j.unwrap().1 == 0 {
685 i += 1;
686 continue;
687 }
688 let j = j.unwrap().0;
689
690 if i < j {
692 let bag = bags.remove(i);
693 bags[j - 1].merge_bag(&bag);
694 } else {
695 let bag = bags.remove(j);
696 bags[i - 1].merge_bag(&bag);
697 }
698 changed = true;
699 }
700 }
701 bags
702}
703
704fn topologically_sort_bags(bags: Vec<PlanningContext>) -> Vec<PlanningContext> {
718 let mut all_children_list: Vec<Vec<usize>> = vec![vec![]; bags.len()];
719 let mut best_pathwidth = vec![usize::MAX; bags.len()];
722 let mut full = vec![HashSet::default(); bags.len()];
723 let mut choice = vec![usize::MAX; bags.len()];
724 for i in 0..bags.len() {
725 let mut full_i: HashSet<AtomId> =
726 bags[i].atoms.iter().map(|(atom_id, _)| atom_id).collect();
727 for child in all_children_list[i].iter() {
728 full_i.extend(full[*child].iter().copied());
729 }
730 full[i] = full_i;
731 best_pathwidth[i] = full[i].len();
732 for chain_child in all_children_list[i].iter() {
733 let mut chain_score: HashSet<_> =
734 bags[i].atoms.iter().map(|(atom_id, _)| atom_id).collect();
735 chain_score.extend(
736 all_children_list[*chain_child]
737 .iter()
738 .filter(|child| *child != chain_child)
739 .flat_map(|child| full[*child].iter().copied()),
740 );
741 let s = chain_score.len().max(best_pathwidth[*chain_child]);
742 if s <= best_pathwidth[i] {
743 best_pathwidth[i] = s;
744 choice[i] = *chain_child;
745 }
746 }
747
748 let parent = bags
751 .iter()
752 .enumerate()
753 .skip(i + 1)
754 .map(|(j, b)| (j, b.common_vars_with(&bags[i]).count()))
755 .filter(|(_, count)| *count > 0)
756 .max_by_key(|(j, count)| (*count, -(*j as isize)));
757 if let Some((j, _count)) = parent {
758 all_children_list[j].push(i);
759 }
760 }
761
762 let mut bags_opt = bags.into_iter().map(Some).collect::<Vec<_>>();
763 let mut bags_topo = Vec::<PlanningContext>::with_capacity(bags_opt.len());
764 let mut visited = vec![false; bags_opt.len()];
765 let mut stack: Vec<(usize, Option<usize>)> = Vec::new();
769
770 for i in (0..bags_opt.len()).rev() {
773 if visited[i] {
774 continue;
775 }
776 stack.push((i, None));
777 visited[i] = true;
778
779 while let Some((bag_id, parent)) = stack.pop() {
780 let bag = mem::take(&mut bags_opt[bag_id]).unwrap();
781
782 let this;
783 if let Some(parent) = parent {
784 bags_topo[parent].merge_bag(&bag);
785 this = parent;
786 } else {
787 this = bags_topo.len();
788 }
789
790 let all_children = &mut all_children_list[bag_id];
791
792 if parent.is_some() {
793 for &i in all_children.iter() {
797 visited[i] = true;
798 stack.push((i, Some(this)));
799 }
800 } else {
801 if !all_children.is_empty() {
805 for &i in all_children[1..].iter() {
806 if i == choice[bag_id] {
807 continue;
808 }
809 visited[i] = true;
810 stack.push((i, Some(this)));
811 }
812 visited[choice[bag_id]] = true;
813 stack.push((choice[bag_id], None));
814 }
815 }
816
817 if parent.is_none() {
818 bags_topo.push(bag);
819 }
820 }
821 }
822
823 bags_topo.reverse();
824 bags_topo
825}
826
827fn count_variable_usage_per_bag(bags: &[PlanningContext]) -> DenseIdMap<Variable, usize> {
832 let mut n_used_in_bag = DenseIdMap::new();
833 for bag in bags {
834 for (var, _vinfo) in bag.vars.iter() {
835 if !n_used_in_bag.contains_key(var) {
836 n_used_in_bag.insert(var, 0);
837 }
838 n_used_in_bag[var] += 1;
839 }
840 }
841 n_used_in_bag
842}
843
844fn plan_single_bag(
854 bag: &mut PlanningContext,
855 blocks: &[(JoinStages, MatSpec)],
856 has_block_contributed: &mut [bool],
858 n_used_in_bag: &mut DenseIdMap<Variable, usize>,
859 strat: PlanStrategy,
860) -> (Vec<JoinHeader>, JoinStages, MatSpec) {
861 let mut msg_vars = smallvec![];
862 let mut val_vars = smallvec![];
863
864 for (var, vinfo) in bag.vars.iter_mut() {
866 n_used_in_bag[var] -= 1;
867 if n_used_in_bag[var] > 0 {
868 vinfo.used_in_rhs = true;
870 msg_vars.push(var);
871 } else {
872 if !vinfo.used_in_rhs
876 && blocks.iter().all(|(_, spec)| !spec.msg_vars.contains(&var))
877 && n_used_in_bag[var] == 0
878 {
879 continue;
880 }
881 val_vars.push(var);
882 vinfo.used_in_rhs = true;
883 }
884 }
885
886 let mut stripped_bag = bag.clone();
887
888 let mut prologue = None;
892 let mut epilogue = Vec::new();
893 for (i, prev_block) in blocks.iter().enumerate().rev() {
894 if prev_block.1.msg_vars.is_empty() {
895 continue;
896 }
897 if !has_block_contributed[i]
898 && prev_block
899 .1
900 .msg_vars
901 .iter()
902 .all(|var| bag.vars.contains_key(*var))
903 {
904 has_block_contributed[i] = true;
905 if prologue.is_none() {
906 let bind = prev_block
907 .1
908 .msg_vars
909 .iter()
910 .enumerate()
911 .map(|(j, var)| (ColumnId::from_usize(j), *var))
912 .collect();
913 let mut to_intersect: Vec<(ScanSpec, SmallVec<[ColumnId; 2]>)> = vec![];
914 for (col, var) in prev_block.1.msg_vars.iter().enumerate() {
915 let vinfo = &bag.vars[*var];
916 for occ in vinfo.occurrences.iter() {
917 let isect = match to_intersect
918 .iter_mut()
919 .find(|(spec, _)| spec.to_index.atom == occ.atom)
920 {
921 Some(isect) => isect,
922 None => {
923 to_intersect.push((
924 ScanSpec {
925 to_index: SubAtom {
926 atom: occ.atom,
927 vars: smallvec![],
928 },
929 constraints: vec![],
930 },
931 smallvec![],
932 ));
933 to_intersect.last_mut().unwrap()
934 }
935 };
936 isect.0.to_index.vars.extend(occ.vars.iter().copied());
937 isect
938 .1
939 .extend(occ.vars.iter().map(|_| ColumnId::from_usize(col)));
940 }
941 }
942
943 prologue = Some(JoinStage::FusedIntersectMat {
944 cover: MatId::from_usize(i),
945 mode: MatScanMode::KeyOnly,
946 bind,
947 to_intersect,
948 });
949
950 stripped_bag
951 .vars
952 .retain(|var, _vinfo| !prev_block.1.msg_vars.contains(&var));
953 } else {
954 epilogue.push(JoinStage::FusedIntersectMat {
955 cover: MatId::from_usize(i),
956 mode: MatScanMode::Lookup(prev_block.1.msg_vars.clone()),
957 bind: smallvec![],
958 to_intersect: vec![],
959 });
960 }
961 }
962 }
963
964 let (header, mut instrs) = plan_stages(&stripped_bag, strat);
965 instrs.splice(0..0, prologue);
966 instrs.extend(epilogue);
967
968 let stages = JoinStages {
969 instrs: Arc::new(instrs),
970 };
971
972 (header, stages, MatSpec { msg_vars, val_vars })
973}
974
975fn build_result_block(blocks: &[(JoinStages, MatSpec)]) -> JoinStages {
980 let mut result_block = Vec::new();
981 let mut pinned_vars = DenseIdMap::<Variable, ()>::new();
982
983 for (i, (_stages, mat_spec)) in blocks.iter().enumerate().rev() {
984 let to_bind: SmallVec<[(ColumnId, Variable); 2]> = mat_spec
985 .val_vars
986 .iter()
987 .copied()
988 .enumerate()
989 .filter(|(_, var)| !pinned_vars.contains_key(*var))
990 .map(|(i, var)| (ColumnId::from_usize(i), var))
991 .collect();
992
993 if to_bind.is_empty() {
994 continue;
995 }
996
997 for (_, var) in to_bind.iter() {
998 pinned_vars.insert(*var, ());
999 }
1000
1001 result_block.push(JoinStage::FusedIntersectMat {
1002 cover: MatId::from_usize(i),
1003 mode: if i == blocks.len() - 1 {
1004 MatScanMode::Full
1005 } else {
1006 MatScanMode::Value(mat_spec.msg_vars.clone())
1007 },
1008 bind: to_bind,
1009 to_intersect: vec![],
1010 });
1011 }
1012
1013 JoinStages {
1014 instrs: Arc::new(result_block),
1015 }
1016}
1017
1018#[allow(unused)]
1030fn fuse_last_stage(
1031 mut blocks: Vec<(JoinStages, MatSpec)>,
1032 result_block: JoinStages,
1033) -> (Vec<(JoinStages, MatSpec)>, JoinStages) {
1034 if blocks.is_empty() {
1035 return (blocks, result_block);
1036 }
1037
1038 let last_block = blocks.pop().unwrap();
1039 assert!(last_block.1.msg_vars.is_empty());
1040 if !matches!(
1041 result_block.instrs[0],
1042 JoinStage::FusedIntersectMat {
1043 cover,
1044 mode: MatScanMode::Full,
1045 ..
1046 } if cover == MatId::from_usize(blocks.len()
1047 )) {
1048 return (blocks, result_block);
1050 }
1051
1052 let mut last_block = last_block.0;
1054 let mut instrs = Arc::unwrap_or_clone(last_block.instrs);
1055 instrs.extend(result_block.instrs[1..].iter().cloned());
1056 last_block.instrs = Arc::new(instrs);
1057
1058 (blocks, last_block)
1059}
1060
1061fn loop_lifting(stages: JoinStages) -> JoinStages {
1071 let mut instrs = Arc::unwrap_or_clone(stages.instrs);
1072 for i in 1..instrs.len() {
1073 if let JoinStage::FusedIntersectMat {
1074 cover: _,
1075 mode: MatScanMode::Lookup(vars),
1076 bind,
1077 to_intersect,
1078 } = &instrs[i]
1079 {
1080 assert!(bind.is_empty() && to_intersect.is_empty());
1081 let vars = vars.clone();
1082 let mut j = i;
1083 while j > 0 {
1084 if matches!(
1085 &instrs[j - 1], JoinStage::FusedIntersect { bind, .. } | JoinStage::FusedIntersectMat { bind, ..}
1086 if bind.iter().all(|(_, var)| !vars.contains(var))
1087 ) || matches!(&instrs[j - 1], JoinStage::Intersect { var, .. } if !vars.contains(var))
1088 {
1089 instrs.swap(j - 1, j);
1090 j -= 1;
1091 } else {
1092 break;
1093 }
1094 }
1095 }
1096 }
1097 JoinStages {
1098 instrs: Arc::new(instrs),
1099 }
1100}
1101
1102pub(crate) fn tree_decompose_and_plan(
1104 ctx: PlanningContext,
1105 strat: PlanStrategy,
1106 actions: ActionId,
1107 no_decomp: bool,
1108) -> Plan {
1109 macro_rules! fast_path {
1110 () => {{
1111 let (header, instrs) = plan_stages(&ctx, strat);
1112 let stages = JoinStages {
1113 instrs: Arc::new(instrs),
1114 };
1115
1116 Plan::SinglePlan(SinglePlan {
1117 atoms: Arc::new(ctx.atoms),
1118 header,
1119 stages,
1120 actions,
1121 })
1122 }};
1123 }
1124 if no_decomp || ctx.atoms.len() <= 2 {
1125 return fast_path!();
1126 }
1127
1128 let bags = decompose_into_bags(&ctx);
1130 if bags.len() <= 1 {
1131 return fast_path!();
1133 }
1134
1135 let mut bags = topologically_sort_bags(bags);
1137
1138 if bags.len() <= 1 {
1139 return fast_path!();
1140 }
1141
1142 let mut n_used_in_bag = count_variable_usage_per_bag(&bags);
1144 let mut has_block_contributed = vec![false; bags.len()];
1145
1146 let mut blocks = Vec::new();
1148 let mut header = vec![];
1149 for bag in bags.iter_mut() {
1150 let (bag_header, stages, mat_spec) = plan_single_bag(
1151 bag,
1152 &blocks,
1153 &mut has_block_contributed,
1154 &mut n_used_in_bag,
1155 strat,
1156 );
1157 blocks.push((stages, mat_spec));
1158 header.extend(bag_header);
1159 }
1160
1161 let result_block = build_result_block(&blocks);
1163
1164 let blocks = blocks
1169 .into_iter()
1170 .map(|(stages, mat_spec)| (loop_lifting(stages), mat_spec))
1171 .collect::<Vec<_>>();
1172 let result_block = loop_lifting(result_block);
1173
1174 Plan::DecomposedPlan(DecomposedPlan {
1175 atoms: Arc::new(ctx.atoms),
1176 header,
1177 stages: JoinStageBlocks { blocks },
1178 result_block,
1179 actions,
1180 })
1181}
1182
1183pub(crate) fn plan_query<'a>(query: Query, col_est: ColumnCardEst<'a>) -> Plan {
1184 let atoms = query.atoms;
1185 let ctx = PlanningContext {
1186 vars: query.var_info,
1187 atoms,
1188 fun_deps: Arc::new(query.fun_deps),
1189 col_est,
1190 };
1191 tree_decompose_and_plan(ctx, query.plan_strategy, query.action, query.no_decomp)
1192}
1193
1194#[derive(Debug)]
1201struct StageInfo {
1202 cover: SubAtom,
1203 vars: SmallVec<[Variable; 1]>,
1204 filters: Vec<(
1205 SubAtom, SmallVec<[ColumnId; 2]>, )>,
1208}
1209
1210#[derive(Debug, Clone)]
1212pub(crate) struct PlanningContext<'a> {
1213 vars: DenseIdMap<Variable, VarInfo>,
1214 atoms: DenseIdMap<AtomId, Atom>,
1215 fun_deps: Arc<FunDeps>,
1216 col_est: ColumnCardEst<'a>,
1217}
1218
1219impl<'a> PlanningContext<'a> {
1220 fn is_subsumed_by(&self, bag2: &PlanningContext<'a>) -> bool {
1221 self.is_subsumed_by_vars(&bag2.vars)
1222 }
1223
1224 fn is_subsumed_by_vars<I>(&self, bag2: &DenseIdMap<Variable, I>) -> bool {
1225 self.vars.iter().all(|(var, _)| bag2.contains_key(var))
1226 }
1227
1228 fn merge_bag(&mut self, bag2: &PlanningContext<'a>) {
1229 for (var, vinfo) in bag2.vars.iter() {
1230 if self.vars.contains_key(var) {
1231 for new_occ in vinfo.occurrences.iter().cloned() {
1232 if !self.vars[var]
1233 .occurrences
1234 .iter()
1235 .any(|occ| occ.atom == new_occ.atom)
1236 {
1237 self.vars[var].occurrences.push(new_occ);
1238 }
1239 }
1240 } else {
1241 self.vars.insert(var, vinfo.clone());
1242 }
1243 }
1244 for (atom_id, atom) in bag2.atoms.iter() {
1245 if !self.atoms.contains_key(atom_id) {
1247 self.atoms.insert(atom_id, atom.clone());
1248 }
1249 }
1250 }
1251
1252 fn common_vars_with<'b>(
1253 &'b self,
1254 other: &'b PlanningContext<'a>,
1255 ) -> impl Iterator<Item = Variable> + 'b {
1256 self.vars
1257 .iter()
1258 .filter(|(var, _)| other.vars.contains_key(*var))
1259 .map(|(var, _)| var)
1260 }
1261
1262 fn has_vars(&self, mut vars: impl Iterator<Item = Variable>) -> bool {
1263 vars.all(|var| self.vars.contains_key(var))
1264 }
1265}
1266
1267type VarSet = FixedBitSet;
1268type AtomSet = FixedBitSet;
1269
1270#[derive(Clone)]
1272pub(crate) struct PlanningState {
1273 used_vars: VarSet,
1274 constrained_atoms: AtomSet,
1275}
1276
1277impl PlanningState {
1278 fn new(n_vars: usize, n_atoms: usize) -> Self {
1279 Self {
1280 used_vars: VarSet::with_capacity(n_vars),
1281 constrained_atoms: AtomSet::with_capacity(n_atoms),
1282 }
1283 }
1284
1285 fn mark_var_used(&mut self, var: Variable) {
1286 self.used_vars.insert(var.index());
1287 }
1288
1289 fn is_var_used(&self, var: Variable) -> bool {
1290 self.used_vars.contains(var.index())
1291 }
1292
1293 fn mark_atom_constrained(&mut self, atom: AtomId) {
1294 self.constrained_atoms.insert(atom.index());
1295 }
1296
1297 fn is_atom_constrained(&self, atom: AtomId) -> bool {
1298 self.constrained_atoms.contains(atom.index())
1299 }
1300}
1301
1302struct BucketQueue<'a> {
1305 var_info: &'a DenseIdMap<Variable, VarInfo>,
1306 cover: VarSet,
1307 atom_info: DenseIdMap<AtomId, VarSet>,
1308 sizes: BTreeMap<usize, IndexSet<AtomId>>,
1309}
1310
1311impl<'a> BucketQueue<'a> {
1312 fn new(var_info: &'a DenseIdMap<Variable, VarInfo>, atoms: &DenseIdMap<AtomId, Atom>) -> Self {
1313 let cover = VarSet::with_capacity(var_info.n_ids());
1314 let mut atom_info = DenseIdMap::with_capacity(atoms.n_ids());
1315 let mut sizes = BTreeMap::<usize, IndexSet<AtomId>>::new();
1316 for (id, atom) in atoms.iter() {
1317 let mut bitset = VarSet::with_capacity(var_info.n_ids());
1318 for var in atom.vars() {
1319 bitset.insert(var.index());
1320 }
1321 sizes.entry(bitset.count_ones(..)).or_default().insert(id);
1322 atom_info.insert(id, bitset);
1323 }
1324 BucketQueue {
1325 var_info,
1326 cover,
1327 atom_info,
1328 sizes,
1329 }
1330 }
1331
1332 fn pop_min(&mut self) -> Option<AtomId> {
1336 let (_, atoms) = self.sizes.iter_mut().next_back()?;
1338 let res = atoms.pop().unwrap();
1339 let vars = self.atom_info[res].clone();
1340 for new_var in vars.difference(&self.cover).map(Variable::from_usize) {
1344 for subatom in &self.var_info[new_var].occurrences {
1345 let cur_set = &mut self.atom_info[subatom.atom];
1346 let old_size = cur_set.count_ones(..);
1347 cur_set.difference_with(&vars);
1348 let new_size = cur_set.count_ones(..);
1349 if old_size == new_size {
1350 continue;
1351 }
1352 if let Some(old_size_set) = self.sizes.get_mut(&old_size) {
1353 old_size_set.swap_remove(&subatom.atom);
1354 if old_size_set.is_empty() {
1355 self.sizes.remove(&old_size);
1356 }
1357 }
1358 if new_size > 0 {
1359 self.sizes.entry(new_size).or_default().insert(subatom.atom);
1360 }
1361 }
1362 }
1363 self.cover.union_with(&vars);
1364 Some(res)
1365 }
1366}
1367
1368fn plan_headers<'a, 'b>(
1371 ctx: &'b PlanningContext<'a>,
1372) -> (
1373 Vec<JoinHeader>,
1374 DenseIdMap<
1375 AtomId,
1376 (
1377 usize, &'b Pooled<Vec<Constraint>>,
1379 ),
1380 >,
1381) {
1382 let mut header = Vec::new();
1383 let mut remaining_constraints: DenseIdMap<AtomId, (usize, &Pooled<Vec<Constraint>>)> =
1384 Default::default();
1385
1386 for (atom, atom_info) in ctx.atoms.iter() {
1387 remaining_constraints.insert(
1388 atom,
1389 (
1390 atom_info.constraints.approx_size(),
1391 &atom_info.constraints.slow,
1392 ),
1393 );
1394 if !atom_info.constraints.fast.is_empty() {
1395 header.push(JoinHeader {
1396 atom,
1397 constraints: Pooled::cloned(&atom_info.constraints.fast),
1398 subset: atom_info.constraints.subset.clone(),
1399 });
1400 }
1401 }
1402
1403 (header, remaining_constraints)
1404}
1405
1406fn plan_stages(ctx: &PlanningContext, strat: PlanStrategy) -> (Vec<JoinHeader>, Vec<JoinStage>) {
1410 let (header, remaining_constraints) = plan_headers(ctx);
1411 let mut instrs = Vec::new();
1412 let mut state = PlanningState::new(ctx.vars.n_ids(), ctx.atoms.n_ids());
1413
1414 match strat {
1415 PlanStrategy::PureSize | PlanStrategy::MinCover => {
1416 plan_free_join(ctx, &mut state, strat, &remaining_constraints, &mut instrs)
1417 }
1418 PlanStrategy::Gj => plan_gj(ctx, &mut state, &remaining_constraints, &mut instrs),
1419 };
1420
1421 (header, instrs)
1422}
1423
1424fn plan_free_join(
1426 ctx: &PlanningContext,
1427 state: &mut PlanningState,
1428 strat: PlanStrategy,
1429 remaining_constraints: &DenseIdMap<AtomId, (usize, &Pooled<Vec<Constraint>>)>,
1430 stages: &mut Vec<JoinStage>,
1431) {
1432 let mut size_info = Vec::<(AtomId, usize)>::new();
1433
1434 match strat {
1435 PlanStrategy::PureSize => {
1436 for (atom, (size, _)) in remaining_constraints.iter() {
1437 size_info.push((atom, *size));
1438 }
1439 }
1440 PlanStrategy::MinCover => {
1441 let mut eligible_covers = HashSet::default();
1442 let mut queue = BucketQueue::new(&ctx.vars, &ctx.atoms);
1443 while let Some(atom) = queue.pop_min() {
1444 eligible_covers.insert(atom);
1445 }
1446 for (atom, (size, _)) in remaining_constraints
1447 .iter()
1448 .filter(|(atom, _)| eligible_covers.contains(atom))
1449 {
1450 size_info.push((atom, *size));
1451 }
1452 }
1453 PlanStrategy::Gj => unreachable!(),
1454 };
1455
1456 size_info.sort_by_key(|(_, size)| *size);
1457 let mut atoms = size_info.iter().map(|(atom, _)| *atom);
1458
1459 while let Some(info) = get_next_freejoin_stage(ctx, state, &mut atoms) {
1460 let stage = compile_stage(ctx, state, info);
1461 stages.push(stage);
1462 }
1463}
1464
1465fn get_next_freejoin_stage(
1468 ctx: &PlanningContext,
1469 state: &mut PlanningState,
1470 ordering: &mut impl Iterator<Item = AtomId>,
1471) -> Option<StageInfo> {
1472 let mut scratch_subatom: HashMap<AtomId, SmallVec<[ColumnId; 2]>> = Default::default();
1473
1474 loop {
1475 let mut covered = false;
1476 let atom = ordering.next()?;
1477 let atom_info = &ctx.atoms[atom];
1478 let mut cover = SubAtom::new(atom);
1479 let mut vars = SmallVec::<[Variable; 1]>::new();
1480
1481 for (ix, var) in atom_info.var_columns.iter() {
1482 if state.is_var_used(var) {
1483 continue;
1484 }
1485 covered = true;
1487 state.mark_var_used(var);
1488 vars.push(var);
1489 cover.vars.push(ix);
1490
1491 for subatom in ctx.vars[var].occurrences.iter() {
1492 if subatom.atom == atom {
1493 continue;
1494 }
1495 scratch_subatom
1496 .entry(subatom.atom)
1497 .or_default()
1498 .extend(subatom.vars.iter().copied());
1499 }
1500 }
1501
1502 if !covered {
1503 continue;
1505 }
1506
1507 let mut filters = Vec::new();
1508 for (atom, cols) in scratch_subatom.drain() {
1509 let mut form_key = SmallVec::<[ColumnId; 2]>::new();
1510 for var_ix in &cols {
1511 let var = ctx.atoms[atom].get_var(*var_ix).unwrap();
1512 let cover_col = vars.iter().position(|v| *v == var).unwrap();
1514 form_key.push(ColumnId::from_usize(cover_col));
1515 }
1516 filters.push((SubAtom { atom, vars: cols }, form_key));
1517 }
1518
1519 return Some(StageInfo {
1520 cover,
1521 vars,
1522 filters,
1523 });
1524 }
1525}
1526
1527fn plan_gj(
1533 ctx: &PlanningContext,
1534 state: &mut PlanningState,
1535 _remaining_constraints: &DenseIdMap<AtomId, (usize, &Pooled<Vec<Constraint>>)>,
1536 stages: &mut Vec<JoinStage>,
1537) {
1538 let mut planned_vars = Vec::with_capacity(ctx.vars.n_ids());
1539 let mut atoms_hit = AtomSet::with_capacity(ctx.atoms.n_ids());
1540 for (var, var_info) in ctx.vars.iter() {
1541 let n_occs = var_info.occurrences.len();
1542 if n_occs == 0 {
1543 continue;
1545 }
1546 if n_occs == 1 && !var_info.used_in_rhs {
1547 continue;
1549 }
1550 for subatom in var_info.occurrences.iter() {
1551 atoms_hit.set(subatom.atom.index(), true);
1552 }
1553 planned_vars.push(var);
1554 }
1555 for (var, var_info) in ctx.vars.iter() {
1556 if var_info.occurrences.len() == 1 && !var_info.used_in_rhs {
1557 let subatom = &var_info.occurrences[0];
1560 if !atoms_hit.contains(subatom.atom.index()) {
1561 atoms_hit.set(subatom.atom.index(), true);
1562 planned_vars.push(var);
1563 }
1564 }
1565 }
1566 for var in planned_vars {
1567 let occ = ctx.vars[var].occurrences[0].clone();
1568 let mut info = StageInfo {
1569 cover: occ,
1570 vars: smallvec![var],
1571 filters: Default::default(),
1572 };
1573 for occ in &ctx.vars[var].occurrences[1..] {
1574 info.filters
1575 .push((occ.clone(), smallvec![ColumnId::new(0); occ.vars.len()]));
1576 }
1577
1578 stages.push(compile_stage(ctx, state, info));
1579 }
1580 fuse_single_scans(stages);
1581}
1582
1583fn compile_stage(
1585 ctx: &PlanningContext,
1586 state: &mut PlanningState,
1587 StageInfo {
1588 cover,
1589 vars,
1590 filters,
1591 }: StageInfo,
1592) -> JoinStage {
1593 fn take_atom_constraints_if_new(
1594 ctx: &PlanningContext,
1595 state: &mut PlanningState,
1596 atom: AtomId,
1597 ) -> Vec<Constraint> {
1598 if state.is_atom_constrained(atom) {
1599 Default::default()
1600 } else {
1601 state.mark_atom_constrained(atom);
1602 ctx.atoms[atom].constraints.slow.clone()
1603 }
1604 }
1605
1606 if vars.len() == 1 && !filters.is_empty() {
1608 let scans = SmallVec::<[SingleScanSpec; 3]>::from_iter(
1609 iter::once(&cover)
1610 .chain(filters.iter().map(|(x, _)| x))
1611 .map(|subatom| {
1612 let atom = subatom.atom;
1613 SingleScanSpec {
1614 atom,
1615 column: subatom.vars[0],
1616 cs: take_atom_constraints_if_new(ctx, state, atom),
1617 }
1618 }),
1619 );
1620
1621 return JoinStage::Intersect {
1622 var: vars[0],
1623 scans,
1624 };
1625 }
1626
1627 let atom = cover.atom;
1629
1630 let cover_spec = ScanSpec {
1631 to_index: cover,
1632 constraints: take_atom_constraints_if_new(ctx, state, atom),
1633 };
1634
1635 let mut bind = SmallVec::new();
1636 for var in vars {
1637 bind.push((ctx.atoms[atom].get_col(var).unwrap(), var));
1638 }
1639
1640 let mut to_intersect = Vec::with_capacity(filters.len());
1641 for (subatom, key_spec) in filters {
1642 let atom = subatom.atom;
1643 let scan = ScanSpec {
1644 to_index: subatom,
1645 constraints: take_atom_constraints_if_new(ctx, state, atom),
1646 };
1647 to_intersect.push((scan, key_spec));
1648 }
1649
1650 JoinStage::FusedIntersect {
1651 cover: cover_spec,
1652 bind,
1653 to_intersect,
1654 }
1655}