1use std::{
8 any::Any,
9 marker::PhantomData,
10 ops::{Deref, DerefMut},
11};
12
13use crate::numeric_id::{DenseIdMap, NumericId, define_id};
14use smallvec::SmallVec;
15
16use crate::{
17 QueryEntry, TableId, Variable,
18 action::{
19 Bindings, ExecutionState,
20 mask::{Mask, MaskIter, ValueSource},
21 },
22 common::Value,
23 hash_index::{ColumnIndex, IndexBase, TupleIndex},
24 offsets::{RowId, Subset, SubsetRef},
25 pool::{PoolSet, Pooled, with_pool_set},
26 row_buffer::{RowBuffer, RowSink, TaggedRowBuffer},
27};
28
29define_id!(pub ColumnId, u32, "a particular column in a table", pretty "Col");
30define_id!(
31 pub Generation,
32 u64,
33 "the current version of a table -- used to invalidate any existing RowIds"
34);
35define_id!(
36 pub Offset,
37 u64,
38 "an opaque offset token -- used to encode iterations over a table (within a generation). These always start at 0."
39);
40
41#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct TableVersion {
44 pub major: Generation,
46 pub minor: Offset,
50 }
52
53#[derive(Clone)]
54pub struct TableSpec {
55 pub n_keys: usize,
57
58 pub n_vals: usize,
62
63 pub uncacheable_columns: DenseIdMap<ColumnId, bool>,
68
69 pub allows_delete: bool,
74}
75
76impl TableSpec {
77 pub fn arity(&self) -> usize {
79 self.n_keys + self.n_vals
80 }
81}
82
83#[derive(Eq, PartialEq, Copy, Clone)]
85pub struct TableChange {
86 pub added: bool,
88 pub removed: bool,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub enum Constraint {
95 Eq { l_col: ColumnId, r_col: ColumnId },
96 EqConst { col: ColumnId, val: Value },
97 LtConst { col: ColumnId, val: Value },
98 GtConst { col: ColumnId, val: Value },
99 LeConst { col: ColumnId, val: Value },
100 GeConst { col: ColumnId, val: Value },
101}
102
103pub trait Rebuilder: Send + Sync {
112 fn hint_col(&self) -> Option<ColumnId>;
115 fn rebuild_val(&self, val: Value) -> Value;
116 fn rebuild_buf(
118 &self,
119 buf: &RowBuffer,
120 start: RowId,
121 end: RowId,
122 out: &mut TaggedRowBuffer,
123 exec_state: &mut ExecutionState,
124 );
125 fn rebuild_subset(
127 &self,
128 other: WrappedTableRef,
129 subset: SubsetRef,
130 out: &mut TaggedRowBuffer,
131 exec_state: &mut ExecutionState,
132 );
133 fn rebuild_slice(&self, vals: &mut [Value]) -> bool;
135}
136
137pub struct Row {
139 pub id: RowId,
141 pub vals: Pooled<Vec<Value>>,
143}
144
145pub trait Table: Any + Send + Sync {
147 fn dyn_clone(&self) -> Box<dyn Table>;
150
151 fn rebuilder<'a>(&'a self, _cols: &[ColumnId]) -> Option<Box<dyn Rebuilder + 'a>> {
153 None
154 }
155
156 fn apply_rebuild(
165 &mut self,
166 _table_id: TableId,
167 _table: &WrappedTable,
168 _next_ts: Value,
169 _exec_state: &mut ExecutionState,
170 ) -> bool {
171 false
173 }
174
175 fn refresh_rows_for_values(&mut self, _dirty_ids: &[Value], _next_ts: Value) -> bool {
188 false
189 }
190
191 fn as_any(&self) -> &dyn Any;
196
197 fn spec(&self) -> TableSpec;
202
203 fn clear(&mut self);
206
207 fn all(&self) -> Subset;
211
212 fn len(&self) -> usize;
218
219 fn is_empty(&self) -> bool {
221 self.len() == 0
222 }
223
224 fn version(&self) -> TableVersion;
227
228 fn updates_since(&self, offset: Offset) -> Subset;
230
231 fn scan_generic_bounded(
240 &self,
241 subset: SubsetRef,
242 start: Offset,
243 n: usize,
244 cs: &[Constraint],
245 f: impl FnMut(RowId, &[Value]),
246 ) -> Option<Offset>
247 where
248 Self: Sized;
249
250 fn scan_generic(&self, subset: SubsetRef, mut f: impl FnMut(RowId, &[Value]))
255 where
256 Self: Sized,
257 {
258 let mut cur = Offset::new(0);
259 while let Some(next) = self.scan_generic_bounded(subset, cur, usize::MAX, &[], |id, row| {
260 f(id, row);
261 }) {
262 cur = next;
263 }
264 }
265
266 fn has_stale_rows(&self) -> bool {
270 true
271 }
272
273 fn refine_live(&self, subset: Subset) -> Subset {
275 self.refine_one(
277 subset,
278 &Constraint::LtConst {
279 col: ColumnId::new_const(0),
280 val: Value::stale(),
281 },
282 )
283 }
284
285 fn refine_one(&self, subset: Subset, c: &Constraint) -> Subset {
289 self.refine(subset, std::slice::from_ref(c))
290 }
291
292 fn refine(&self, subset: Subset, cs: &[Constraint]) -> Subset {
296 cs.iter()
297 .fold(subset, |subset, c| self.refine_one(subset, c))
298 }
299
300 fn fast_subset(&self, _: &Constraint) -> Option<Subset> {
307 None
308 }
309
310 fn split_fast_slow(
314 &self,
315 cs: &[Constraint],
316 ) -> (
317 Subset, Pooled<Vec<Constraint>>, Pooled<Vec<Constraint>>, ) {
321 with_pool_set(|ps| {
322 let mut fast = ps.get::<Vec<Constraint>>();
323 let mut slow = ps.get::<Vec<Constraint>>();
324 let mut subset = self.all();
325 for c in cs {
326 if let Some(sub) = self.fast_subset(c) {
327 subset.intersect(sub.as_ref(), &ps.get_pool());
328 fast.push(c.clone());
329 } else {
330 slow.push(c.clone());
331 }
332 }
333 (subset, fast, slow)
334 })
335 }
336
337 fn get_row(&self, key: &[Value]) -> Option<Row>;
344
345 fn get_row_column(&self, key: &[Value], col: ColumnId) -> Option<Value> {
351 self.get_row(key).map(|row| row.vals[col.index()])
352 }
353
354 fn merge(&mut self, exec_state: &mut ExecutionState) -> TableChange;
357
358 fn new_buffer(&self) -> Box<dyn MutationBuffer>;
362}
363
364pub trait MutationBuffer: Any + Send + Sync {
370 fn stage_insert(&mut self, row: &[Value]);
374
375 fn stage_remove(&mut self, key: &[Value]);
379
380 fn fresh_handle(&self) -> Box<dyn MutationBuffer>;
382}
383
384struct WrapperImpl<T>(PhantomData<T>);
385
386pub(crate) fn wrapper<T: Table>() -> Box<dyn TableWrapper> {
387 Box::new(WrapperImpl::<T>(PhantomData))
388}
389
390impl<T: Table> TableWrapper for WrapperImpl<T> {
391 fn dyn_clone(&self) -> Box<dyn TableWrapper> {
392 Box::new(Self(PhantomData))
393 }
394 fn scan_bounded(
395 &self,
396 table: &dyn Table,
397 subset: SubsetRef,
398 start: Offset,
399 n: usize,
400 out: &mut TaggedRowBuffer,
401 ) -> Option<Offset> {
402 let table = table.as_any().downcast_ref::<T>().unwrap();
403 table.scan_generic_bounded(subset, start, n, &[], |row_id, row| {
404 out.add_row(row_id, row);
405 })
406 }
407 fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
408 let wrapped = WrappedTableRef {
409 inner: table,
410 wrapper: self,
411 };
412 ColumnIndex::build_for_subset(wrapped, subset, col)
413 }
414 fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
415 let table = table.as_any().downcast_ref::<T>().unwrap();
416 let mut res = TupleIndex::new(cols.len());
417 match cols {
418 [] => {}
419 [col] => table.scan_generic(subset, |row_id, row| {
420 res.add_row(&[row[col.index()]], row_id);
421 }),
422 [x, y] => table.scan_generic(subset, |row_id, row| {
423 res.add_row(&[row[x.index()], row[y.index()]], row_id);
424 }),
425 [x, y, z] => table.scan_generic(subset, |row_id, row| {
426 res.add_row(&[row[x.index()], row[y.index()], row[z.index()]], row_id);
427 }),
428 _ => {
429 let mut scratch = SmallVec::<[Value; 8]>::new();
430 table.scan_generic(subset, |row_id, row| {
431 for col in cols {
432 scratch.push(row[col.index()]);
433 }
434 res.add_row(&scratch, row_id);
435 scratch.clear();
436 });
437 }
438 }
439 res
440 }
441 fn for_each_col(
442 &self,
443 table: &dyn Table,
444 subset: SubsetRef,
445 col: ColumnId,
446 f: &mut dyn FnMut(RowId, Value),
447 ) {
448 let table = table.as_any().downcast_ref::<T>().unwrap();
449 let col_idx = col.index();
450 table.scan_generic(subset, |row_id, row| {
451 f(row_id, row[col_idx]);
452 });
453 }
454
455 fn scan_project(
456 &self,
457 table: &dyn Table,
458 subset: SubsetRef,
459 cols: &[ColumnId],
460 start: Offset,
461 n: usize,
462 cs: &[Constraint],
463 out: &mut dyn RowSink,
464 ) -> Option<Offset> {
465 let table = table.as_any().downcast_ref::<T>().unwrap();
466 match cols {
467 [] => None,
468 [col] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
469 out.add_row(id, &[row[col.index()]]);
470 }),
471 [x, y] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
472 out.add_row(id, &[row[x.index()], row[y.index()]]);
473 }),
474 [x, y, z] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
475 out.add_row(id, &[row[x.index()], row[y.index()], row[z.index()]]);
476 }),
477 _ => {
478 let mut scratch = SmallVec::<[Value; 8]>::with_capacity(cols.len());
479 table.scan_generic_bounded(subset, start, n, cs, |id, row| {
480 for col in cols {
481 scratch.push(row[col.index()]);
482 }
483 out.add_row(id, &scratch);
484 scratch.clear();
485 })
486 }
487 }
488 }
489
490 fn lookup_row_vectorized(
491 &self,
492 table: &dyn Table,
493 mask: &mut Mask,
494 bindings: &mut Bindings,
495 args: &[QueryEntry],
496 col: ColumnId,
497 out_var: Variable,
498 ) {
499 let table = table.as_any().downcast_ref::<T>().unwrap();
500 let mut out = with_pool_set(PoolSet::get::<Vec<Value>>);
501 for_each_binding_with_mask!(mask, args, bindings, |iter| {
502 iter.fill_vec(&mut out, Value::stale, |_, args| {
503 table.get_row_column(args.as_slice(), col)
504 })
505 });
506 bindings.insert(out_var, &out);
507 }
508
509 fn lookup_with_default_vectorized(
510 &self,
511 table: &dyn Table,
512 mask: &mut Mask,
513 bindings: &mut Bindings,
514 args: &[QueryEntry],
515 col: ColumnId,
516 default: QueryEntry,
517 out_var: Variable,
518 ) {
519 let table = table.as_any().downcast_ref::<T>().unwrap();
520 let mut out = with_pool_set(|ps| ps.get::<Vec<Value>>());
521 for_each_binding_with_mask!(mask, args, bindings, |iter| {
522 match default {
523 QueryEntry::Var(default) => iter.zip(&bindings[default]).fill_vec(
524 &mut out,
525 Value::stale,
526 |_, (args, default)| {
527 Some(
528 table
529 .get_row_column(args.as_slice(), col)
530 .unwrap_or(*default),
531 )
532 },
533 ),
534 QueryEntry::Const(default) => iter.fill_vec(&mut out, Value::stale, |_, args| {
535 Some(
536 table
537 .get_row_column(args.as_slice(), col)
538 .unwrap_or(default),
539 )
540 }),
541 }
542 });
543 bindings.insert(out_var, &out);
544 }
545}
546
547pub struct WrappedTable {
555 inner: Box<dyn Table>,
556 wrapper: Box<dyn TableWrapper>,
557}
558
559impl WrappedTable {
560 pub(crate) fn new<T: Table>(inner: T) -> Self {
561 let wrapper = wrapper::<T>();
562 let inner = Box::new(inner);
563 Self { inner, wrapper }
564 }
565
566 pub fn dyn_clone(&self) -> Self {
568 WrappedTable {
569 inner: self.inner.dyn_clone(),
570 wrapper: self.wrapper.dyn_clone(),
571 }
572 }
573
574 pub(crate) fn as_ref(&self) -> WrappedTableRef<'_> {
575 WrappedTableRef {
576 inner: &*self.inner,
577 wrapper: &*self.wrapper,
578 }
579 }
580
581 pub fn scan_bounded(
585 &self,
586 subset: SubsetRef,
587 start: Offset,
588 n: usize,
589 out: &mut TaggedRowBuffer,
590 ) -> Option<Offset> {
591 self.as_ref().scan_bounded(subset, start, n, out)
592 }
593
594 pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
596 self.as_ref().group_by_col(subset, col)
597 }
598
599 pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
601 self.as_ref().group_by_key(subset, cols)
602 }
603
604 pub fn scan_project(
607 &self,
608 subset: SubsetRef,
609 cols: &[ColumnId],
610 start: Offset,
611 n: usize,
612 cs: &[Constraint],
613 out: &mut dyn RowSink,
614 ) -> Option<Offset> {
615 self.as_ref().scan_project(subset, cols, start, n, cs, out)
616 }
617
618 pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
620 self.as_ref().scan(subset)
621 }
622
623 pub fn len(&self) -> usize {
625 self.inner.len()
626 }
627
628 pub fn is_empty(&self) -> bool {
630 self.inner.is_empty()
631 }
632
633 pub(crate) fn lookup_row_vectorized(
634 &self,
635 mask: &mut Mask,
636 bindings: &mut Bindings,
637 args: &[QueryEntry],
638 col: ColumnId,
639 out_var: Variable,
640 ) {
641 self.as_ref()
642 .lookup_row_vectorized(mask, bindings, args, col, out_var)
643 }
644
645 #[allow(clippy::too_many_arguments)]
646 pub(crate) fn lookup_with_default_vectorized(
647 &self,
648 mask: &mut Mask,
649 bindings: &mut Bindings,
650 args: &[QueryEntry],
651 col: ColumnId,
652 default: QueryEntry,
653 out_var: Variable,
654 ) {
655 self.as_ref()
656 .lookup_with_default_vectorized(mask, bindings, args, col, default, out_var)
657 }
658}
659
660impl Deref for WrappedTable {
661 type Target = dyn Table;
662
663 fn deref(&self) -> &Self::Target {
664 &*self.inner
665 }
666}
667
668impl DerefMut for WrappedTable {
669 fn deref_mut(&mut self) -> &mut Self::Target {
670 &mut *self.inner
671 }
672}
673
674pub(crate) trait TableWrapper: Send + Sync {
675 fn dyn_clone(&self) -> Box<dyn TableWrapper>;
676 fn scan_bounded(
677 &self,
678 table: &dyn Table,
679 subset: SubsetRef,
680 start: Offset,
681 n: usize,
682 out: &mut TaggedRowBuffer,
683 ) -> Option<Offset>;
684 fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex;
685 fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex;
686
687 fn for_each_col(
691 &self,
692 table: &dyn Table,
693 subset: SubsetRef,
694 col: ColumnId,
695 f: &mut dyn FnMut(RowId, Value),
696 );
697
698 #[allow(clippy::too_many_arguments)]
699 fn scan_project(
700 &self,
701 table: &dyn Table,
702 subset: SubsetRef,
703 cols: &[ColumnId],
704 start: Offset,
705 n: usize,
706 cs: &[Constraint],
707 out: &mut dyn RowSink,
708 ) -> Option<Offset>;
709
710 fn scan(&self, table: &dyn Table, subset: SubsetRef) -> TaggedRowBuffer {
711 let arity = table.spec().arity();
712 let mut buf = TaggedRowBuffer::new(arity);
713 assert!(
714 self.scan_bounded(table, subset, Offset::new(0), usize::MAX, &mut buf)
715 .is_none()
716 );
717 buf
718 }
719
720 #[allow(clippy::too_many_arguments)]
721 fn lookup_row_vectorized(
722 &self,
723 table: &dyn Table,
724 mask: &mut Mask,
725 bindings: &mut Bindings,
726 args: &[QueryEntry],
727 col: ColumnId,
728 out_var: Variable,
729 );
730
731 #[allow(clippy::too_many_arguments)]
732 fn lookup_with_default_vectorized(
733 &self,
734 table: &dyn Table,
735 mask: &mut Mask,
736 bindings: &mut Bindings,
737 args: &[QueryEntry],
738 col: ColumnId,
739 default: QueryEntry,
740 out_var: Variable,
741 );
742}
743
744#[derive(Clone, Copy)]
748pub struct WrappedTableRef<'a> {
749 inner: &'a dyn Table,
750 wrapper: &'a dyn TableWrapper,
751}
752
753impl WrappedTableRef<'_> {
754 pub(crate) fn with_wrapper<T: Table, R>(
755 inner: &T,
756 f: impl for<'a> FnOnce(WrappedTableRef<'a>) -> R,
757 ) -> R {
758 let wrapper = WrapperImpl::<T>(PhantomData);
759 f(WrappedTableRef {
760 inner,
761 wrapper: &wrapper,
762 })
763 }
764
765 pub fn scan_bounded(
769 &self,
770 subset: SubsetRef,
771 start: Offset,
772 n: usize,
773 out: &mut TaggedRowBuffer,
774 ) -> Option<Offset> {
775 self.wrapper.scan_bounded(self.inner, subset, start, n, out)
776 }
777
778 pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
780 self.wrapper.group_by_col(self.inner, subset, col)
781 }
782
783 pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
785 self.wrapper.group_by_key(self.inner, subset, cols)
786 }
787
788 pub(crate) fn for_each_col(
792 &self,
793 subset: SubsetRef,
794 col: ColumnId,
795 f: &mut dyn FnMut(RowId, Value),
796 ) {
797 self.wrapper.for_each_col(self.inner, subset, col, f);
798 }
799
800 pub fn scan_project(
803 &self,
804 subset: SubsetRef,
805 cols: &[ColumnId],
806 start: Offset,
807 n: usize,
808 cs: &[Constraint],
809 out: &mut dyn RowSink,
810 ) -> Option<Offset> {
811 self.wrapper
812 .scan_project(self.inner, subset, cols, start, n, cs, out)
813 }
814
815 pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
817 self.wrapper.scan(self.inner, subset)
818 }
819
820 pub fn len(&self) -> usize {
822 self.inner.len()
823 }
824
825 pub(crate) fn lookup_row_vectorized(
826 &self,
827 mask: &mut Mask,
828 bindings: &mut Bindings,
829 args: &[QueryEntry],
830 col: ColumnId,
831 out_var: Variable,
832 ) {
833 self.wrapper
834 .lookup_row_vectorized(self.inner, mask, bindings, args, col, out_var);
835 }
836
837 #[allow(clippy::too_many_arguments)]
838 pub(crate) fn lookup_with_default_vectorized(
839 &self,
840 mask: &mut Mask,
841 bindings: &mut Bindings,
842 args: &[QueryEntry],
843 col: ColumnId,
844 default: QueryEntry,
845 out_var: Variable,
846 ) {
847 self.wrapper.lookup_with_default_vectorized(
848 self.inner, mask, bindings, args, col, default, out_var,
849 );
850 }
851}
852
853impl Deref for WrappedTableRef<'_> {
854 type Target = dyn Table;
855
856 fn deref(&self) -> &Self::Target {
857 self.inner
858 }
859}