1use std::{
3 mem,
4 sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7 },
8};
9
10use crate::{
11 common::IndexSet,
12 hash_index::IndexCatalog,
13 numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id},
14};
15use egglog_concurrency::{NotificationList, ResettableOnceLock};
16use rayon::prelude::*;
17use smallvec::SmallVec;
18
19use crate::{
20 BaseValues, ContainerRebuildSummary, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
21 action::{
22 Bindings, DbView,
23 mask::{Mask, MaskIter, ValueSource},
24 },
25 dependency_graph::DependencyGraph,
26 hash_index::{ColumnIndex, Index, IndexBase},
27 offsets::Subset,
28 parallel_heuristics::parallelize_db_level_op,
29 pool::{Pool, Pooled, with_pool_set},
30 query::{Query, RuleSetBuilder},
31 table_spec::{
32 ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
33 },
34};
35
36use self::plan::Plan;
37use crate::action::ExecutionState;
38
39pub(crate) mod execute;
40pub(crate) mod frame_update;
41pub(crate) mod plan;
42
43define_id!(
44 pub AtomId,
45 u32,
46 "A component of a query consisting of a function and a list of variables or constants"
47);
48define_id!(pub Variable, u32, "a variable in a query", pretty "Var");
49
50impl Variable {
51 pub fn placeholder() -> Variable {
52 Variable::new(!0)
53 }
54}
55
56define_id!(pub TableId, u32, "a table in the database");
57
58impl TableId {
59 pub fn dummy() -> TableId {
60 TableId::new(u32::MAX)
61 }
62
63 pub fn is_dummy(&self) -> bool {
64 self.rep == u32::MAX
65 }
66}
67
68define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
69
70#[derive(Debug)]
71pub(crate) struct ProcessedConstraints {
72 pub(crate) subset: Subset,
75 pub(crate) fast: Pooled<Vec<Constraint>>,
77 pub(crate) slow: Pooled<Vec<Constraint>>,
79}
80
81impl Clone for ProcessedConstraints {
82 fn clone(&self) -> Self {
83 ProcessedConstraints {
84 subset: self.subset.clone(),
85 fast: Pooled::cloned(&self.fast),
86 slow: Pooled::cloned(&self.slow),
87 }
88 }
89}
90
91impl ProcessedConstraints {
92 fn approx_size(&self) -> usize {
94 self.subset.size()
95 }
96
97 pub(crate) fn dummy() -> ProcessedConstraints {
98 ProcessedConstraints {
99 subset: Subset::empty(),
100 fast: Pooled::new(Vec::new()),
101 slow: Pooled::new(Vec::new()),
102 }
103 }
104}
105
106#[derive(Clone, Debug, PartialEq, Eq)]
107pub(crate) struct SubAtom {
108 pub(crate) atom: AtomId,
109 pub(crate) vars: SmallVec<[ColumnId; 2]>,
110}
111
112impl SubAtom {
113 pub(crate) fn new(atom: AtomId) -> SubAtom {
114 SubAtom {
115 atom,
116 vars: Default::default(),
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
122pub(crate) struct VarInfo {
123 pub(crate) occurrences: Vec<SubAtom>,
124 pub(crate) used_in_rhs: bool,
127 pub(crate) defined_in_rhs: bool,
128 pub(crate) name: Option<Arc<str>>,
129}
130
131pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
132pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
133
134pub struct TableInfo {
135 pub(crate) name: Option<Arc<str>>,
136 pub(crate) spec: TableSpec,
137 pub(crate) table: WrappedTable,
138 pub(crate) indexes: IndexCatalog<SmallVec<[ColumnId; 4]>, HashIndex>,
139 pub(crate) column_indexes: IndexCatalog<ColumnId, HashColumnIndex>,
140}
141
142impl TableInfo {
143 pub fn table(&self) -> &WrappedTable {
144 &self.table
145 }
146
147 pub fn name(&self) -> Option<&str> {
148 self.name.as_deref()
149 }
150
151 pub fn spec(&self) -> &TableSpec {
152 &self.spec
153 }
154}
155
156impl Clone for TableInfo {
157 fn clone(&self) -> Self {
158 fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
159 map: &IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>>,
160 table: WrappedTableRef,
161 ) -> IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>> {
162 map.map(|table_ref| {
163 let (k, v) = table_ref;
164 let v: Index<TI> = v
165 .get_or_update(|index| {
166 index.refresh(table);
167 })
168 .clone();
169 (k.clone(), Arc::new(ResettableOnceLock::new(v)))
170 })
171 }
172 TableInfo {
173 name: self.name.clone(),
174 spec: self.spec.clone(),
175 table: self.table.dyn_clone(),
176 indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
177 column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
178 }
179 }
180}
181
182define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
183define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
184
185pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
191 fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
194}
195
196pub fn make_external_func<
198 F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
199>(
200 f: F,
201) -> impl ExternalFunction {
202 #[derive(Clone)]
203 struct Wrapped<F>(F);
204 impl<F> ExternalFunction for Wrapped<F>
205 where
206 F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
207 {
208 fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
209 (self.0)(state, args)
210 }
211 }
212 Wrapped(f)
213}
214
215pub(crate) fn invoke_batch(
217 this: &dyn ExternalFunction,
218 state: &mut ExecutionState,
219 mask: &mut Mask,
220 bindings: &mut Bindings,
221 args: &[QueryEntry],
222 out_var: Variable,
223) {
224 let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool());
225 let mut out = pool.get();
226 out.reserve(mask.len());
227 for_each_binding_with_mask!(mask, args, bindings, |iter| {
228 iter.fill_vec(&mut out, Value::stale, |_, args| {
229 this.invoke(state, args.as_slice())
230 });
231 });
232 bindings.insert(out_var, &out);
233}
234
235pub(crate) fn invoke_batch_assign(
241 this: &dyn ExternalFunction,
242 state: &mut ExecutionState,
243 mask: &mut Mask,
244 bindings: &mut Bindings,
245 args: &[QueryEntry],
246 out_var: Variable,
247) {
248 let mut out = bindings.take(out_var).expect("out_var must be bound");
249 for_each_binding_with_mask!(mask, args, bindings, |iter| {
250 iter.assign_vec_and_retain(&mut out.vals, |_, args| this.invoke(state, &args))
251 });
252 bindings.replace(out);
253}
254
255dyn_clone::clone_trait_object!(ExternalFunction);
257
258pub(crate) type ExternalFunctions =
259 DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunction>>;
260
261#[derive(Default)]
262pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
263
264impl Clone for Counters {
265 fn clone(&self) -> Counters {
266 let mut map = DenseIdMap::new();
267 for (k, v) in self.0.iter() {
268 map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)));
270 }
271 Counters(map)
272 }
273}
274
275impl Counters {
276 pub(crate) fn read(&self, ctr: CounterId) -> usize {
277 self.0[ctr].load(Ordering::Acquire)
278 }
279 pub(crate) fn inc(&self, ctr: CounterId) -> usize {
280 self.0[ctr].fetch_add(1, Ordering::Release)
283 }
284}
285
286#[derive(Clone, Default)]
290pub struct Database {
291 pub(crate) tables: DenseIdMap<TableId, TableInfo>,
294 pub(crate) counters: Counters,
299 pub(crate) external_functions: ExternalFunctions,
300 container_values: ContainerValues,
301 notification_list: NotificationList<TableId>,
304 deps: DependencyGraph,
306 base_values: BaseValues,
307 total_size_estimate: usize,
312}
313
314impl Database {
315 pub fn new() -> Database {
320 Database::default()
321 }
322
323 pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
325 RuleSetBuilder::new(self)
326 }
327
328 pub fn add_external_function(
330 &mut self,
331 f: Box<dyn ExternalFunction + 'static>,
332 ) -> ExternalFunctionId {
333 self.external_functions.push(f)
334 }
335
336 pub fn free_external_function(&mut self, id: ExternalFunctionId) {
338 self.external_functions.take(id);
339 }
340
341 pub fn base_values(&self) -> &BaseValues {
342 &self.base_values
343 }
344
345 pub fn base_values_mut(&mut self) -> &mut BaseValues {
346 &mut self.base_values
347 }
348
349 pub fn container_values(&self) -> &ContainerValues {
350 &self.container_values
351 }
352
353 pub fn container_values_mut(&mut self) -> &mut ContainerValues {
354 &mut self.container_values
355 }
356
357 pub fn rebuild_containers(&mut self, table_id: TableId) -> ContainerRebuildSummary {
358 let mut containers = mem::take(&mut self.container_values);
359 let table = &self.tables[table_id].table;
360 let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
361 self.container_values = containers;
362 res
363 }
364
365 pub fn apply_rebuild(
372 &mut self,
373 func_id: TableId,
374 to_rebuild: &[TableId],
375 next_ts: Value,
376 ) -> bool {
377 let func = self.tables.take(func_id).unwrap();
378 self.run_on_tables(to_rebuild, |_, info, view| {
379 info.table.apply_rebuild(
380 func_id,
381 &func.table,
382 next_ts,
383 &mut ExecutionState::new(*view, Default::default()),
384 )
385 });
386 self.tables.insert(func_id, func);
387 self.merge_all()
388 }
389
390 pub fn refresh_rows_for_values(
391 &mut self,
392 to_refresh: &[TableId],
393 dirty_ids: &[Value],
394 next_ts: Value,
395 ) -> bool {
396 if dirty_ids.is_empty() {
397 return false;
398 }
399 self.run_on_tables(to_refresh, |_, info, _| {
407 info.table.refresh_rows_for_values(dirty_ids, next_ts)
408 });
409 self.merge_all()
410 }
411
412 fn run_on_tables(
413 &mut self,
414 table_ids: &[TableId],
415 run: impl for<'a> Fn(TableId, &mut TableInfo, &DbView<'a>) -> bool + Sync,
416 ) {
417 if parallelize_db_level_op(self.total_size_estimate) {
418 let mut tables = Vec::with_capacity(table_ids.len());
419 for id in table_ids {
420 tables.push((*id, self.tables.take(*id).unwrap()));
421 }
422 let view = self.read_only_view();
423 tables.par_iter_mut().for_each(|(id, info)| {
424 if run(*id, info, &view) {
425 self.notification_list.notify(*id);
426 }
427 });
428 for (id, info) in tables {
429 self.tables.insert(id, info);
430 }
431 } else {
432 for id in table_ids {
433 let mut info = self.tables.take(*id).unwrap();
434 let changed = {
435 let view = self.read_only_view();
436 run(*id, &mut info, &view)
437 };
438 if changed {
439 self.notification_list.notify(*id);
440 }
441 self.tables.insert(*id, info);
442 }
443 }
444 }
445
446 pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
448 let mut state = ExecutionState::new(self.read_only_view(), Default::default());
449 f(&mut state)
450 }
451
452 pub fn with_execution_state_tracked<R>(
456 &self,
457 f: impl FnOnce(&mut ExecutionState) -> R,
458 ) -> (R, bool) {
459 let mut state = ExecutionState::new(self.read_only_view(), Default::default());
460 let result = f(&mut state);
461 (result, state.changed)
462 }
463
464 pub(crate) fn read_only_view(&self) -> DbView<'_> {
465 DbView {
466 table_info: &self.tables,
467 counters: &self.counters,
468 external_funcs: &self.external_functions,
469 bases: &self.base_values,
470 containers: &self.container_values,
471 notification_list: &self.notification_list,
472 }
473 }
474
475 pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
478 let table_info = self
479 .tables
480 .get(table)
481 .expect("table must be declared in the current database");
482 let table = &table_info.table;
483 if let Some(c) = c {
484 if let Some(sub) = table.fast_subset(&c) {
485 sub.size()
488 } else {
489 table.refine_one(table.refine_live(table.all()), &c).size()
490 }
491 } else {
492 table.len()
493 }
494 }
495
496 pub fn add_counter(&mut self) -> CounterId {
500 self.counters.0.push(AtomicUsize::new(0))
501 }
502
503 pub fn inc_counter(&self, counter: CounterId) -> usize {
505 self.counters.inc(counter)
506 }
507
508 pub fn read_counter(&self, counter: CounterId) -> usize {
510 self.counters.read(counter)
511 }
512
513 pub fn merge_all(&mut self) -> bool {
520 let mut ever_changed = false;
521 let do_parallel = parallelize_db_level_op(self.total_size_estimate);
522 let mut to_merge = IndexSet::default();
523 loop {
524 to_merge.clear();
525 let to_merge_vec = self.notification_list.reset();
526 if to_merge_vec.len() < 4 {
527 ever_changed |= self.merge_simple(to_merge_vec);
528 break;
529 }
530 for table in to_merge_vec {
531 to_merge.insert(table);
532 }
533
534 let mut changed = false;
535 let mut tables_merging = DenseIdMap::<
536 TableId,
537 (
538 Option<TableInfo>,
540 DenseIdMap<TableId, Box<dyn MutationBuffer>>,
543 ),
544 >::with_capacity(self.tables.n_ids());
545 for stratum in self.deps.strata() {
546 for table in stratum.intersection(&to_merge).copied() {
548 let mut bufs = DenseIdMap::default();
549 for dep in self.deps.write_deps(table) {
550 if let Some(info) = self.tables.get(dep) {
551 bufs.insert(dep, info.table.new_buffer());
552 }
553 }
554 tables_merging.insert(table, (None, bufs));
555 }
556 for table in stratum.intersection(&to_merge).copied() {
559 tables_merging[table].0 = Some(self.tables.unwrap_val(table));
560 }
561 let db = self.read_only_view();
562 changed |= if do_parallel {
563 tables_merging
564 .par_iter_mut()
565 .map(|(_, (info, buffers))| {
566 let mut es = ExecutionState::new(db, mem::take(buffers));
567 info.as_mut().unwrap().table.merge(&mut es).added || es.changed
568 })
569 .max()
570 .unwrap_or(false)
571 } else {
572 tables_merging
573 .iter_mut()
574 .map(|(_, (info, buffers))| {
575 let mut es = ExecutionState::new(db, mem::take(buffers));
576 info.as_mut().unwrap().table.merge(&mut es).added || es.changed
577 })
578 .max()
579 .unwrap_or(false)
580 };
581 for (id, (table, _)) in tables_merging.drain() {
582 self.tables.insert(id, table.unwrap());
583 }
584 }
585 ever_changed |= changed;
586 }
587 let mut size_estimate = 0;
589 for (_, info) in self.tables.iter_mut() {
590 info.column_indexes.update(|_, ti| {
591 Arc::get_mut(ti).unwrap().reset();
592 });
593 info.indexes.update(|_, ti| {
594 Arc::get_mut(ti).unwrap().reset();
595 });
596 size_estimate += info.table.len();
597 }
598 self.total_size_estimate = size_estimate;
599 ever_changed
600 }
601
602 fn merge_simple(&mut self, mut to_merge: SmallVec<[TableId; 4]>) -> bool {
606 let mut changed = false;
607 while !to_merge.is_empty() {
608 for table_id in to_merge.iter().copied() {
609 let mut info = self.tables.unwrap_val(table_id);
610 let mut es = ExecutionState::new(self.read_only_view(), Default::default());
611 changed |= info.table.merge(&mut es).added || es.changed;
612 self.tables.insert(table_id, info);
613 }
614 to_merge = self.notification_list.reset();
615 }
616 changed
617 }
618
619 pub fn merge_table(&mut self, table: TableId) -> bool {
626 let mut info = self.tables.unwrap_val(table);
627 self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
628 let table_changed = info.table.merge(&mut ExecutionState::new(
629 self.read_only_view(),
630 Default::default(),
631 ));
632 self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
633 self.tables.insert(table, info);
634 table_changed.added
635 }
636
637 pub fn next_table_id(&self) -> TableId {
642 self.tables.next_id()
643 }
644
645 pub fn add_table<T: Table + Sized + 'static>(
650 &mut self,
651 table: T,
652 read_deps: impl IntoIterator<Item = TableId>,
653 write_deps: impl IntoIterator<Item = TableId>,
654 ) -> TableId {
655 self.add_table_impl(table, None, read_deps, write_deps)
656 }
657
658 pub fn add_table_named<T: Table + Sized + 'static>(
659 &mut self,
660 table: T,
661 name: Arc<str>,
662 read_deps: impl IntoIterator<Item = TableId>,
663 write_deps: impl IntoIterator<Item = TableId>,
664 ) -> TableId {
665 self.add_table_impl(table, Some(name), read_deps, write_deps)
666 }
667
668 fn add_table_impl<T: Table + Sized + 'static>(
669 &mut self,
670 table: T,
671 name: Option<Arc<str>>,
672 read_deps: impl IntoIterator<Item = TableId>,
673 write_deps: impl IntoIterator<Item = TableId>,
674 ) -> TableId {
675 let spec = table.spec();
676 let table = WrappedTable::new(table);
677 let res = self.tables.push(TableInfo {
678 name,
679 spec,
680 table,
681 indexes: IndexCatalog::new(),
682 column_indexes: IndexCatalog::new(),
683 });
684 self.deps.add_table(res, read_deps, write_deps);
685 res
686 }
687
688 pub fn get_table(&self, table: TableId) -> &WrappedTable {
703 &self
704 .tables
705 .get(table)
706 .expect("must access a table that has been declared in this database")
707 .table
708 }
709
710 pub fn get_table_info(&self, table: TableId) -> &TableInfo {
715 self.tables
716 .get(table)
717 .expect("must access a table that has been declared in this database")
718 }
719
720 pub fn new_buffer(&self, id: TableId) -> Box<dyn MutationBuffer> {
726 self.notification_list.notify(id);
727 self.get_table(id).new_buffer()
728 }
729
730 pub(crate) fn process_constraints(
731 &self,
732 table: TableId,
733 cs: &[Constraint],
734 ) -> ProcessedConstraints {
735 let table_info = &self.tables[table];
736 let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
737 slow.retain(|c| {
738 let (col, val) = match c {
739 Constraint::EqConst { col, val } => (*col, *val),
740 Constraint::Eq { .. }
741 | Constraint::LtConst { .. }
742 | Constraint::GtConst { .. }
743 | Constraint::LeConst { .. }
744 | Constraint::GeConst { .. } => return true,
745 };
746 if *table_info
749 .spec
750 .uncacheable_columns
751 .get(col)
752 .unwrap_or(&false)
753 {
754 return true;
755 }
756 fast.push(c.clone());
759 let index = get_column_index_from_tableinfo(table_info, col);
760 match index.get().unwrap().get_subset(&val) {
761 Some(s) => {
762 with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
763 }
764 None => {
765 subset = Subset::empty();
767 }
768 }
769 false
771 });
772 ProcessedConstraints { subset, fast, slow }
773 }
774
775 pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
782 &mut *self
783 .tables
784 .get_mut(id)
785 .expect("must access a table that has been declared in this database")
786 .table
787 }
788
789 pub fn clear_table(&mut self, table: TableId) {
807 let info = self
808 .tables
809 .get_mut(table)
810 .expect("must access a table that has been declared in this database");
811 let prev_len = info.table.len();
812 info.table.clear();
813 info.column_indexes.update(|_, ti| {
818 if let Some(arc) = Arc::get_mut(ti) {
819 arc.reset();
820 }
821 });
822 info.indexes.update(|_, ti| {
823 if let Some(arc) = Arc::get_mut(ti) {
824 arc.reset();
825 }
826 });
827 self.total_size_estimate = self.total_size_estimate.wrapping_sub(prev_len);
828 }
829
830 pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
831 plan::plan_query(query, ColumnCardEst::new(self))
832 }
833}
834
835impl Drop for Database {
836 fn drop(&mut self) {
837 with_pool_set(PoolSet::clear);
841 rayon::broadcast(|_| with_pool_set(PoolSet::clear));
842 }
843}
844
845fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
850 let index: Arc<_> = table_info.indexes.get_or_insert(cols.into(), || {
851 Arc::new(ResettableOnceLock::new(Index::new(
852 cols.to_vec(),
853 TupleIndex::new(cols.len()),
854 )))
855 });
856 index.get_or_update(|index| {
857 index.refresh(table_info.table.as_ref());
858 });
859 debug_assert!(
860 !index
861 .get()
862 .unwrap()
863 .needs_refresh(table_info.table.as_ref())
864 );
865 index
866}
867
868fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
872 let index: Arc<_> = table_info.column_indexes.get_or_insert(col, || {
873 Arc::new(ResettableOnceLock::new(Index::new(
874 vec![col],
875 ColumnIndex::new(),
876 )))
877 });
878 index.get_or_update(|index| {
879 index.refresh(table_info.table.as_ref());
880 });
881 debug_assert!(
882 !index
883 .get()
884 .unwrap()
885 .needs_refresh(table_info.table.as_ref())
886 );
887 index
888}
889
890#[derive(Clone)]
891pub struct ColumnCardEst<'a> {
892 db: &'a Database,
893}
894
895impl ColumnCardEst<'_> {
896 pub fn new(db: &Database) -> ColumnCardEst<'_> {
897 ColumnCardEst { db }
898 }
899
900 pub fn col_uniqueness(&self, table: TableId, col: ColumnId) -> ColUniqueness {
901 let col_idx = get_column_index_from_tableinfo(&self.db.tables[table], col);
902 let table = &self.db.tables[table].table;
903 ColUniqueness {
904 col_size: col_idx.get().unwrap().len(),
905 table_size: table.len(),
906 }
907 }
908}
909
910impl std::fmt::Debug for ColumnCardEst<'_> {
911 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
912 f.debug_struct("ColumnCardEst").finish_non_exhaustive()
913 }
914}
915
916#[derive(Copy, Clone, Debug)]
928pub struct ColUniqueness {
929 table_size: usize,
930 col_size: usize,
931}
932
933impl Default for ColUniqueness {
934 fn default() -> ColUniqueness {
935 ColUniqueness {
936 table_size: 1,
937 col_size: 1,
938 }
939 }
940}
941
942impl ColUniqueness {
943 #[allow(dead_code)] fn scale(&self, subset_size: usize) -> ColUniqueness {
945 if self.table_size == 0 || subset_size == 0 {
946 return ColUniqueness {
947 table_size: 0,
948 col_size: 0,
949 };
950 }
951 ColUniqueness {
952 table_size: subset_size,
953 col_size: self.col_size.saturating_mul(subset_size) / self.table_size,
954 }
955 }
956 fn join(&self, other: &ColUniqueness) -> ColUniqueness {
957 ColUniqueness {
958 table_size: self.table_size.saturating_mul(other.table_size),
959 col_size: self.col_size.max(other.col_size),
960 }
961 }
962
963 #[allow(dead_code)] fn col_size(&self) -> usize {
965 self.col_size
966 }
967}
968
969impl PartialEq for ColUniqueness {
970 fn eq(&self, other: &Self) -> bool {
971 self.cmp(other) == std::cmp::Ordering::Equal
972 }
973}
974
975impl Eq for ColUniqueness {}
976
977impl PartialOrd for ColUniqueness {
978 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
979 Some(self.cmp(other))
980 }
981}
982
983impl Ord for ColUniqueness {
984 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
985 (self.table_size.saturating_mul(other.col_size))
986 .cmp(&(other.table_size.saturating_mul(self.col_size)))
987 }
988}