1use std::{
3 mem,
4 sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7 },
8};
9
10use crate::{
11 common::IndexSet,
12 hash_index::IndexCatalog,
13 numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id},
14};
15use egglog_concurrency::{NotificationList, ResettableOnceLock};
16use rayon::prelude::*;
17use smallvec::SmallVec;
18
19use crate::{
20 BaseValues, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
21 action::{
22 Bindings, DbView,
23 mask::{Mask, MaskIter, ValueSource},
24 },
25 dependency_graph::DependencyGraph,
26 hash_index::{ColumnIndex, Index, IndexBase},
27 offsets::Subset,
28 parallel_heuristics::parallelize_db_level_op,
29 pool::{Pool, Pooled, with_pool_set},
30 query::{Query, RuleSetBuilder},
31 table_spec::{
32 ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
33 },
34};
35
36use self::plan::Plan;
37use crate::action::ExecutionState;
38
39pub(crate) mod execute;
40pub(crate) mod frame_update;
41pub(crate) mod plan;
42
43define_id!(
44 pub AtomId,
45 u32,
46 "A component of a query consisting of a function and a list of variables or constants"
47);
48define_id!(pub Variable, u32, "a variable in a query");
49
50impl Variable {
51 pub fn placeholder() -> Variable {
52 Variable::new(!0)
53 }
54}
55
56define_id!(pub TableId, u32, "a table in the database");
57define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
58
59#[derive(Debug)]
60pub(crate) struct ProcessedConstraints {
61 pub(crate) subset: Subset,
64 pub(crate) fast: Pooled<Vec<Constraint>>,
66 pub(crate) slow: Pooled<Vec<Constraint>>,
68}
69
70impl Clone for ProcessedConstraints {
71 fn clone(&self) -> Self {
72 ProcessedConstraints {
73 subset: self.subset.clone(),
74 fast: Pooled::cloned(&self.fast),
75 slow: Pooled::cloned(&self.slow),
76 }
77 }
78}
79
80impl ProcessedConstraints {
81 fn approx_size(&self) -> usize {
83 self.subset.size()
84 }
85}
86
87#[derive(Clone, Debug, PartialEq, Eq)]
88pub(crate) struct SubAtom {
89 pub(crate) atom: AtomId,
90 pub(crate) vars: SmallVec<[ColumnId; 2]>,
91}
92
93impl SubAtom {
94 pub(crate) fn new(atom: AtomId) -> SubAtom {
95 SubAtom {
96 atom,
97 vars: Default::default(),
98 }
99 }
100}
101
102#[derive(Debug)]
103pub(crate) struct VarInfo {
104 pub(crate) occurrences: Vec<SubAtom>,
105 pub(crate) used_in_rhs: bool,
108 pub(crate) defined_in_rhs: bool,
109 pub(crate) name: Option<Arc<str>>,
110}
111
112pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
113pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
114
115pub struct TableInfo {
116 pub(crate) name: Option<Arc<str>>,
117 pub(crate) spec: TableSpec,
118 pub(crate) table: WrappedTable,
119 pub(crate) indexes: IndexCatalog<SmallVec<[ColumnId; 4]>, HashIndex>,
120 pub(crate) column_indexes: IndexCatalog<ColumnId, HashColumnIndex>,
121}
122
123impl TableInfo {
124 pub fn table(&self) -> &WrappedTable {
125 &self.table
126 }
127
128 pub fn name(&self) -> Option<&str> {
129 self.name.as_deref()
130 }
131
132 pub fn spec(&self) -> &TableSpec {
133 &self.spec
134 }
135}
136
137impl Clone for TableInfo {
138 fn clone(&self) -> Self {
139 fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
140 map: &IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>>,
141 table: WrappedTableRef,
142 ) -> IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>> {
143 map.map(|table_ref| {
144 let (k, v) = table_ref;
145 let v: Index<TI> = v
146 .get_or_update(|index| {
147 index.refresh(table);
148 })
149 .clone();
150 (k.clone(), Arc::new(ResettableOnceLock::new(v)))
151 })
152 }
153 TableInfo {
154 name: self.name.clone(),
155 spec: self.spec.clone(),
156 table: self.table.dyn_clone(),
157 indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
158 column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
159 }
160 }
161}
162
163define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
164define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
165
166pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
172 fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
175}
176
177pub fn make_external_func<
179 F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
180>(
181 f: F,
182) -> impl ExternalFunction {
183 #[derive(Clone)]
184 struct Wrapped<F>(F);
185 impl<F> ExternalFunction for Wrapped<F>
186 where
187 F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
188 {
189 fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
190 (self.0)(state, args)
191 }
192 }
193 Wrapped(f)
194}
195
196pub(crate) fn invoke_batch(
198 this: &dyn ExternalFunction,
199 state: &mut ExecutionState,
200 mask: &mut Mask,
201 bindings: &mut Bindings,
202 args: &[QueryEntry],
203 out_var: Variable,
204) {
205 let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool().clone());
206 let mut out = pool.get();
207 out.reserve(mask.len());
208 for_each_binding_with_mask!(mask, args, bindings, |iter| {
209 iter.fill_vec(&mut out, Value::stale, |_, args| {
210 this.invoke(state, args.as_slice())
211 });
212 });
213 bindings.insert(out_var, &out);
214}
215
216pub(crate) fn invoke_batch_assign(
222 this: &dyn ExternalFunction,
223 state: &mut ExecutionState,
224 mask: &mut Mask,
225 bindings: &mut Bindings,
226 args: &[QueryEntry],
227 out_var: Variable,
228) {
229 let mut out = bindings.take(out_var).expect("out_var must be bound");
230 for_each_binding_with_mask!(mask, args, bindings, |iter| {
231 iter.assign_vec_and_retain(&mut out.vals, |_, args| this.invoke(state, &args))
232 });
233 bindings.replace(out);
234}
235
236dyn_clone::clone_trait_object!(ExternalFunction);
238
239pub(crate) type ExternalFunctions =
240 DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunction>>;
241
242#[derive(Default)]
243pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
244
245impl Clone for Counters {
246 fn clone(&self) -> Counters {
247 let mut map = DenseIdMap::new();
248 for (k, v) in self.0.iter() {
249 map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)))
251 }
252 Counters(map)
253 }
254}
255
256impl Counters {
257 pub(crate) fn read(&self, ctr: CounterId) -> usize {
258 self.0[ctr].load(Ordering::Acquire)
259 }
260 pub(crate) fn inc(&self, ctr: CounterId) -> usize {
261 self.0[ctr].fetch_add(1, Ordering::Release)
264 }
265}
266
267#[derive(Clone, Default)]
271pub struct Database {
272 pub(crate) tables: DenseIdMap<TableId, TableInfo>,
275 pub(crate) counters: Counters,
280 pub(crate) external_functions: ExternalFunctions,
281 container_values: ContainerValues,
282 notification_list: NotificationList<TableId>,
285 deps: DependencyGraph,
287 base_values: BaseValues,
288 total_size_estimate: usize,
293}
294
295impl Database {
296 pub fn new() -> Database {
301 Database::default()
302 }
303
304 pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
306 RuleSetBuilder::new(self)
307 }
308
309 pub fn add_external_function(
311 &mut self,
312 f: Box<dyn ExternalFunction + 'static>,
313 ) -> ExternalFunctionId {
314 self.external_functions.push(f)
315 }
316
317 pub fn free_external_function(&mut self, id: ExternalFunctionId) {
319 self.external_functions.take(id);
320 }
321
322 pub fn base_values(&self) -> &BaseValues {
323 &self.base_values
324 }
325
326 pub fn base_values_mut(&mut self) -> &mut BaseValues {
327 &mut self.base_values
328 }
329
330 pub fn container_values(&self) -> &ContainerValues {
331 &self.container_values
332 }
333
334 pub fn container_values_mut(&mut self) -> &mut ContainerValues {
335 &mut self.container_values
336 }
337
338 pub fn rebuild_containers(&mut self, table_id: TableId) -> bool {
339 let mut containers = mem::take(&mut self.container_values);
340 let table = &self.tables[table_id].table;
341 let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
342 self.container_values = containers;
343 res
344 }
345
346 pub fn apply_rebuild(
353 &mut self,
354 func_id: TableId,
355 to_rebuild: &[TableId],
356 next_ts: Value,
357 ) -> bool {
358 let func = self.tables.take(func_id).unwrap();
359 if parallelize_db_level_op(self.total_size_estimate) {
360 let mut tables = Vec::with_capacity(to_rebuild.len());
361 for id in to_rebuild {
362 tables.push((*id, self.tables.take(*id).unwrap()));
363 }
364 tables.par_iter_mut().for_each(|(id, info)| {
365 if info.table.apply_rebuild(
366 func_id,
367 &func.table,
368 next_ts,
369 &mut ExecutionState::new(self.read_only_view(), Default::default()),
370 ) {
371 self.notification_list.notify(*id);
372 }
373 });
374 for (id, info) in tables {
375 self.tables.insert(id, info);
376 }
377 } else {
378 for id in to_rebuild {
379 let mut info = self.tables.take(*id).unwrap();
380 if info.table.apply_rebuild(
381 func_id,
382 &func.table,
383 next_ts,
384 &mut ExecutionState::new(self.read_only_view(), Default::default()),
385 ) {
386 self.notification_list.notify(*id);
387 }
388 self.tables.insert(*id, info);
389 }
390 }
391 self.tables.insert(func_id, func);
392 self.merge_all()
393 }
394
395 pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
397 let mut state = ExecutionState::new(self.read_only_view(), Default::default());
398 f(&mut state)
399 }
400
401 pub(crate) fn read_only_view(&self) -> DbView<'_> {
402 DbView {
403 table_info: &self.tables,
404 counters: &self.counters,
405 external_funcs: &self.external_functions,
406 bases: &self.base_values,
407 containers: &self.container_values,
408 notification_list: &self.notification_list,
409 }
410 }
411
412 pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
415 let table_info = self
416 .tables
417 .get(table)
418 .expect("table must be declared in the current database");
419 let table = &table_info.table;
420 if let Some(c) = c {
421 if let Some(sub) = table.fast_subset(&c) {
422 sub.size()
425 } else {
426 table.refine_one(table.refine_live(table.all()), &c).size()
427 }
428 } else {
429 table.len()
430 }
431 }
432
433 pub fn add_counter(&mut self) -> CounterId {
437 self.counters.0.push(AtomicUsize::new(0))
438 }
439
440 pub fn inc_counter(&self, counter: CounterId) -> usize {
442 self.counters.inc(counter)
443 }
444
445 pub fn read_counter(&self, counter: CounterId) -> usize {
447 self.counters.read(counter)
448 }
449
450 pub fn merge_all(&mut self) -> bool {
457 let mut ever_changed = false;
458 let do_parallel = parallelize_db_level_op(self.total_size_estimate);
459 let mut to_merge = IndexSet::default();
460 loop {
461 to_merge.clear();
462 let to_merge_vec = self.notification_list.reset();
463 if to_merge_vec.len() < 4 {
464 ever_changed |= self.merge_simple(to_merge_vec);
465 break;
466 }
467 for table in to_merge_vec {
468 to_merge.insert(table);
469 }
470
471 let mut changed = false;
472 let mut tables_merging = DenseIdMap::<
473 TableId,
474 (
475 Option<TableInfo>,
477 DenseIdMap<TableId, Box<dyn MutationBuffer>>,
480 ),
481 >::with_capacity(self.tables.n_ids());
482 for stratum in self.deps.strata() {
483 for table in stratum.intersection(&to_merge).copied() {
485 let mut bufs = DenseIdMap::default();
486 for dep in self.deps.write_deps(table) {
487 if let Some(info) = self.tables.get(dep) {
488 bufs.insert(dep, info.table.new_buffer());
489 }
490 }
491 tables_merging.insert(table, (None, bufs));
492 }
493 for table in stratum.intersection(&to_merge).copied() {
496 tables_merging[table].0 = Some(self.tables.unwrap_val(table));
497 }
498 let db = self.read_only_view();
499 changed |= if do_parallel {
500 tables_merging
501 .par_iter_mut()
502 .map(|(_, (info, buffers))| {
503 let mut es = ExecutionState::new(db, mem::take(buffers));
504 info.as_mut().unwrap().table.merge(&mut es).added || es.changed
505 })
506 .max()
507 .unwrap_or(false)
508 } else {
509 tables_merging
510 .iter_mut()
511 .map(|(_, (info, buffers))| {
512 let mut es = ExecutionState::new(db, mem::take(buffers));
513 info.as_mut().unwrap().table.merge(&mut es).added || es.changed
514 })
515 .max()
516 .unwrap_or(false)
517 };
518 for (id, (table, _)) in tables_merging.drain() {
519 self.tables.insert(id, table.unwrap());
520 }
521 }
522 ever_changed |= changed;
523 }
524 let mut size_estimate = 0;
526 for (_, info) in self.tables.iter_mut() {
527 info.column_indexes.update(|_, ti| {
528 Arc::get_mut(ti).unwrap().reset();
529 });
530 info.indexes.update(|_, ti| {
531 Arc::get_mut(ti).unwrap().reset();
532 });
533 size_estimate += info.table.len();
534 }
535 self.total_size_estimate = size_estimate;
536 ever_changed
537 }
538
539 fn merge_simple(&mut self, mut to_merge: SmallVec<[TableId; 4]>) -> bool {
543 let mut changed = false;
544 while !to_merge.is_empty() {
545 for table_id in to_merge.iter().copied() {
546 let mut info = self.tables.unwrap_val(table_id);
547 let mut es = ExecutionState::new(self.read_only_view(), Default::default());
548 changed |= info.table.merge(&mut es).added || es.changed;
549 self.tables.insert(table_id, info);
550 }
551 to_merge = self.notification_list.reset();
552 }
553 changed
554 }
555
556 pub fn merge_table(&mut self, table: TableId) -> bool {
563 let mut info = self.tables.unwrap_val(table);
564 self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
565 let table_changed = info.table.merge(&mut ExecutionState::new(
566 self.read_only_view(),
567 Default::default(),
568 ));
569 self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
570 self.tables.insert(table, info);
571 table_changed.added
572 }
573
574 pub fn next_table_id(&self) -> TableId {
579 self.tables.next_id()
580 }
581
582 pub fn add_table<T: Table + Sized + 'static>(
587 &mut self,
588 table: T,
589 read_deps: impl IntoIterator<Item = TableId>,
590 write_deps: impl IntoIterator<Item = TableId>,
591 ) -> TableId {
592 self.add_table_impl(table, None, read_deps, write_deps)
593 }
594
595 pub fn add_table_named<T: Table + Sized + 'static>(
596 &mut self,
597 table: T,
598 name: Arc<str>,
599 read_deps: impl IntoIterator<Item = TableId>,
600 write_deps: impl IntoIterator<Item = TableId>,
601 ) -> TableId {
602 self.add_table_impl(table, Some(name), read_deps, write_deps)
603 }
604
605 fn add_table_impl<T: Table + Sized + 'static>(
606 &mut self,
607 table: T,
608 name: Option<Arc<str>>,
609 read_deps: impl IntoIterator<Item = TableId>,
610 write_deps: impl IntoIterator<Item = TableId>,
611 ) -> TableId {
612 let spec = table.spec();
613 let table = WrappedTable::new(table);
614 let res = self.tables.push(TableInfo {
615 name,
616 spec,
617 table,
618 indexes: IndexCatalog::new(),
619 column_indexes: IndexCatalog::new(),
620 });
621 self.deps.add_table(res, read_deps, write_deps);
622 res
623 }
624
625 pub fn get_table(&self, table: TableId) -> &WrappedTable {
640 &self
641 .tables
642 .get(table)
643 .expect("must access a table that has been declared in this database")
644 .table
645 }
646
647 pub fn get_table_info(&self, table: TableId) -> &TableInfo {
652 self.tables
653 .get(table)
654 .expect("must access a table that has been declared in this database")
655 }
656
657 pub fn new_buffer(&self, id: TableId) -> Box<dyn MutationBuffer> {
663 self.notification_list.notify(id);
664 self.get_table(id).new_buffer()
665 }
666
667 pub(crate) fn process_constraints(
668 &self,
669 table: TableId,
670 cs: &[Constraint],
671 ) -> ProcessedConstraints {
672 let table_info = &self.tables[table];
673 let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
674 slow.retain(|c| {
675 let (col, val) = match c {
676 Constraint::EqConst { col, val } => (*col, *val),
677 Constraint::Eq { .. }
678 | Constraint::LtConst { .. }
679 | Constraint::GtConst { .. }
680 | Constraint::LeConst { .. }
681 | Constraint::GeConst { .. } => return true,
682 };
683 if *table_info
686 .spec
687 .uncacheable_columns
688 .get(col)
689 .unwrap_or(&false)
690 {
691 return true;
692 }
693 fast.push(c.clone());
696 let index = get_column_index_from_tableinfo(table_info, col);
697 match index.get().unwrap().get_subset(&val) {
698 Some(s) => {
699 with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
700 }
701 None => {
702 subset = Subset::empty();
704 }
705 }
706 false
708 });
709 ProcessedConstraints { subset, fast, slow }
710 }
711
712 pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
719 &mut *self
720 .tables
721 .get_mut(id)
722 .expect("must access a table that has been declared in this database")
723 .table
724 }
725
726 pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
727 plan::plan_query(query)
728 }
729}
730
731impl Drop for Database {
732 fn drop(&mut self) {
733 with_pool_set(PoolSet::clear);
737 rayon::broadcast(|_| with_pool_set(PoolSet::clear));
738 }
739}
740
741fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
746 let index: Arc<_> = table_info.indexes.get_or_insert(cols.into(), || {
747 Arc::new(ResettableOnceLock::new(Index::new(
748 cols.to_vec(),
749 TupleIndex::new(cols.len()),
750 )))
751 });
752 index.get_or_update(|index| {
753 index.refresh(table_info.table.as_ref());
754 });
755 debug_assert!(
756 !index
757 .get()
758 .unwrap()
759 .needs_refresh(table_info.table.as_ref())
760 );
761 index
762}
763
764fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
768 let index: Arc<_> = table_info.column_indexes.get_or_insert(col, || {
769 Arc::new(ResettableOnceLock::new(Index::new(
770 vec![col],
771 ColumnIndex::new(),
772 )))
773 });
774 index.get_or_update(|index| {
775 index.refresh(table_info.table.as_ref());
776 });
777 debug_assert!(
778 !index
779 .get()
780 .unwrap()
781 .needs_refresh(table_info.table.as_ref())
782 );
783 index
784}