1use std::{
8 any::Any,
9 marker::PhantomData,
10 ops::{Deref, DerefMut},
11};
12
13use crate::numeric_id::{DenseIdMap, NumericId, define_id};
14use smallvec::SmallVec;
15
16use crate::{
17 QueryEntry, TableId, Variable,
18 action::{
19 Bindings, ExecutionState,
20 mask::{Mask, MaskIter, ValueSource},
21 },
22 common::Value,
23 hash_index::{ColumnIndex, IndexBase, TupleIndex},
24 offsets::{RowId, Subset, SubsetRef},
25 pool::{PoolSet, Pooled, with_pool_set},
26 row_buffer::{RowBuffer, TaggedRowBuffer},
27};
28
29define_id!(pub ColumnId, u32, "a particular column in a table");
30define_id!(
31 pub Generation,
32 u64,
33 "the current version of a table -- used to invalidate any existing RowIds"
34);
35define_id!(
36 pub Offset,
37 u64,
38 "an opaque offset token -- used to encode iterations over a table (within a generation). These always start at 0."
39);
40
41#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct TableVersion {
44 pub major: Generation,
46 pub minor: Offset,
50 }
52
53#[derive(Clone)]
54pub struct TableSpec {
55 pub n_keys: usize,
57
58 pub n_vals: usize,
62
63 pub uncacheable_columns: DenseIdMap<ColumnId, bool>,
68
69 pub allows_delete: bool,
74}
75
76impl TableSpec {
77 pub fn arity(&self) -> usize {
79 self.n_keys + self.n_vals
80 }
81}
82
83#[derive(Eq, PartialEq, Copy, Clone)]
85pub struct TableChange {
86 pub added: bool,
88 pub removed: bool,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub enum Constraint {
95 Eq { l_col: ColumnId, r_col: ColumnId },
96 EqConst { col: ColumnId, val: Value },
97 LtConst { col: ColumnId, val: Value },
98 GtConst { col: ColumnId, val: Value },
99 LeConst { col: ColumnId, val: Value },
100 GeConst { col: ColumnId, val: Value },
101}
102
103pub trait Rebuilder: Send + Sync {
112 fn hint_col(&self) -> Option<ColumnId>;
115 fn rebuild_val(&self, val: Value) -> Value;
116 fn rebuild_buf(
118 &self,
119 buf: &RowBuffer,
120 start: RowId,
121 end: RowId,
122 out: &mut TaggedRowBuffer,
123 exec_state: &mut ExecutionState,
124 );
125 fn rebuild_subset(
127 &self,
128 other: WrappedTableRef,
129 subset: SubsetRef,
130 out: &mut TaggedRowBuffer,
131 exec_state: &mut ExecutionState,
132 );
133 fn rebuild_slice(&self, vals: &mut [Value]) -> bool;
135}
136
137pub struct Row {
139 pub id: RowId,
141 pub vals: Pooled<Vec<Value>>,
143}
144
145pub trait Table: Any + Send + Sync {
147 fn dyn_clone(&self) -> Box<dyn Table>;
150
151 fn rebuilder<'a>(&'a self, _cols: &[ColumnId]) -> Option<Box<dyn Rebuilder + 'a>> {
153 None
154 }
155
156 fn apply_rebuild(
165 &mut self,
166 _table_id: TableId,
167 _table: &WrappedTable,
168 _next_ts: Value,
169 _exec_state: &mut ExecutionState,
170 ) -> bool {
171 false
173 }
174
175 fn as_any(&self) -> &dyn Any;
180
181 fn spec(&self) -> TableSpec;
186
187 fn clear(&mut self);
190
191 fn all(&self) -> Subset;
195
196 fn len(&self) -> usize;
202
203 fn is_empty(&self) -> bool {
205 self.len() == 0
206 }
207
208 fn version(&self) -> TableVersion;
211
212 fn updates_since(&self, offset: Offset) -> Subset;
214
215 fn scan_generic_bounded(
224 &self,
225 subset: SubsetRef,
226 start: Offset,
227 n: usize,
228 cs: &[Constraint],
229 f: impl FnMut(RowId, &[Value]),
230 ) -> Option<Offset>
231 where
232 Self: Sized;
233
234 fn scan_generic(&self, subset: SubsetRef, mut f: impl FnMut(RowId, &[Value]))
239 where
240 Self: Sized,
241 {
242 let mut cur = Offset::new(0);
243 while let Some(next) = self.scan_generic_bounded(subset, cur, usize::MAX, &[], |id, row| {
244 f(id, row);
245 }) {
246 cur = next;
247 }
248 }
249
250 fn refine_live(&self, subset: Subset) -> Subset {
252 self.refine_one(
254 subset,
255 &Constraint::LtConst {
256 col: ColumnId::new_const(0),
257 val: Value::stale(),
258 },
259 )
260 }
261
262 fn refine_one(&self, subset: Subset, c: &Constraint) -> Subset {
266 self.refine(subset, std::slice::from_ref(c))
267 }
268
269 fn refine(&self, subset: Subset, cs: &[Constraint]) -> Subset {
273 cs.iter()
274 .fold(subset, |subset, c| self.refine_one(subset, c))
275 }
276
277 fn fast_subset(&self, _: &Constraint) -> Option<Subset> {
284 None
285 }
286
287 fn split_fast_slow(
291 &self,
292 cs: &[Constraint],
293 ) -> (
294 Subset, Pooled<Vec<Constraint>>, Pooled<Vec<Constraint>>, ) {
298 with_pool_set(|ps| {
299 let mut fast = ps.get::<Vec<Constraint>>();
300 let mut slow = ps.get::<Vec<Constraint>>();
301 let mut subset = self.all();
302 for c in cs {
303 if let Some(sub) = self.fast_subset(c) {
304 subset.intersect(sub.as_ref(), &ps.get_pool());
305 fast.push(c.clone());
306 } else {
307 slow.push(c.clone());
308 }
309 }
310 (subset, fast, slow)
311 })
312 }
313
314 fn get_row(&self, key: &[Value]) -> Option<Row>;
321
322 fn get_row_column(&self, key: &[Value], col: ColumnId) -> Option<Value> {
328 self.get_row(key).map(|row| row.vals[col.index()])
329 }
330
331 fn merge(&mut self, exec_state: &mut ExecutionState) -> TableChange;
334
335 fn new_buffer(&self) -> Box<dyn MutationBuffer>;
339}
340
341pub trait MutationBuffer: Any + Send + Sync {
347 fn stage_insert(&mut self, row: &[Value]);
351
352 fn stage_remove(&mut self, key: &[Value]);
356
357 fn fresh_handle(&self) -> Box<dyn MutationBuffer>;
359}
360
361struct WrapperImpl<T>(PhantomData<T>);
362
363pub(crate) fn wrapper<T: Table>() -> Box<dyn TableWrapper> {
364 Box::new(WrapperImpl::<T>(PhantomData))
365}
366
367impl<T: Table> TableWrapper for WrapperImpl<T> {
368 fn dyn_clone(&self) -> Box<dyn TableWrapper> {
369 Box::new(Self(PhantomData))
370 }
371 fn scan_bounded(
372 &self,
373 table: &dyn Table,
374 subset: SubsetRef,
375 start: Offset,
376 n: usize,
377 out: &mut TaggedRowBuffer,
378 ) -> Option<Offset> {
379 let table = table.as_any().downcast_ref::<T>().unwrap();
380 table.scan_generic_bounded(subset, start, n, &[], |row_id, row| {
381 out.add_row(row_id, row);
382 })
383 }
384 fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
385 let table = table.as_any().downcast_ref::<T>().unwrap();
386 let mut res = ColumnIndex::new();
387 table.scan_generic(subset, |row_id, row| {
388 res.add_row(&[row[col.index()]], row_id);
389 });
390 res
391 }
392 fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
393 let table = table.as_any().downcast_ref::<T>().unwrap();
394 let mut res = TupleIndex::new(cols.len());
395 match cols {
396 [] => {}
397 [col] => table.scan_generic(subset, |row_id, row| {
398 res.add_row(&[row[col.index()]], row_id);
399 }),
400 [x, y] => table.scan_generic(subset, |row_id, row| {
401 res.add_row(&[row[x.index()], row[y.index()]], row_id);
402 }),
403 [x, y, z] => table.scan_generic(subset, |row_id, row| {
404 res.add_row(&[row[x.index()], row[y.index()], row[z.index()]], row_id);
405 }),
406 _ => {
407 let mut scratch = SmallVec::<[Value; 8]>::new();
408 table.scan_generic(subset, |row_id, row| {
409 for col in cols {
410 scratch.push(row[col.index()]);
411 }
412 res.add_row(&scratch, row_id);
413 scratch.clear();
414 });
415 }
416 }
417 res
418 }
419 fn scan_project(
420 &self,
421 table: &dyn Table,
422 subset: SubsetRef,
423 cols: &[ColumnId],
424 start: Offset,
425 n: usize,
426 cs: &[Constraint],
427 out: &mut TaggedRowBuffer,
428 ) -> Option<Offset> {
429 let table = table.as_any().downcast_ref::<T>().unwrap();
430 match cols {
431 [] => None,
432 [col] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
433 out.add_row(id, &[row[col.index()]]);
434 }),
435 [x, y] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
436 out.add_row(id, &[row[x.index()], row[y.index()]]);
437 }),
438 [x, y, z] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
439 out.add_row(id, &[row[x.index()], row[y.index()], row[z.index()]]);
440 }),
441 _ => {
442 let mut scratch = SmallVec::<[Value; 8]>::with_capacity(cols.len());
443 table.scan_generic_bounded(subset, start, n, cs, |id, row| {
444 for col in cols {
445 scratch.push(row[col.index()]);
446 }
447 out.add_row(id, &scratch);
448 scratch.clear();
449 })
450 }
451 }
452 }
453
454 fn lookup_row_vectorized(
455 &self,
456 table: &dyn Table,
457 mask: &mut Mask,
458 bindings: &mut Bindings,
459 args: &[QueryEntry],
460 col: ColumnId,
461 out_var: Variable,
462 ) {
463 let table = table.as_any().downcast_ref::<T>().unwrap();
464 let mut out = with_pool_set(PoolSet::get::<Vec<Value>>);
465 for_each_binding_with_mask!(mask, args, bindings, |iter| {
466 iter.fill_vec(&mut out, Value::stale, |_, args| {
467 table.get_row_column(args.as_slice(), col)
468 })
469 });
470 bindings.insert(out_var, &out);
471 }
472
473 fn lookup_with_default_vectorized(
474 &self,
475 table: &dyn Table,
476 mask: &mut Mask,
477 bindings: &mut Bindings,
478 args: &[QueryEntry],
479 col: ColumnId,
480 default: QueryEntry,
481 out_var: Variable,
482 ) {
483 let table = table.as_any().downcast_ref::<T>().unwrap();
484 let mut out = with_pool_set(|ps| ps.get::<Vec<Value>>());
485 for_each_binding_with_mask!(mask, args, bindings, |iter| {
486 match default {
487 QueryEntry::Var(default) => iter.zip(&bindings[default]).fill_vec(
488 &mut out,
489 Value::stale,
490 |_, (args, default)| {
491 Some(
492 table
493 .get_row_column(args.as_slice(), col)
494 .unwrap_or(*default),
495 )
496 },
497 ),
498 QueryEntry::Const(default) => iter.fill_vec(&mut out, Value::stale, |_, args| {
499 Some(
500 table
501 .get_row_column(args.as_slice(), col)
502 .unwrap_or(default),
503 )
504 }),
505 }
506 });
507 bindings.insert(out_var, &out);
508 }
509}
510
511pub struct WrappedTable {
519 inner: Box<dyn Table>,
520 wrapper: Box<dyn TableWrapper>,
521}
522
523impl WrappedTable {
524 pub(crate) fn new<T: Table>(inner: T) -> Self {
525 let wrapper = wrapper::<T>();
526 let inner = Box::new(inner);
527 Self { inner, wrapper }
528 }
529
530 pub fn dyn_clone(&self) -> Self {
532 WrappedTable {
533 inner: self.inner.dyn_clone(),
534 wrapper: self.wrapper.dyn_clone(),
535 }
536 }
537
538 pub(crate) fn as_ref(&self) -> WrappedTableRef<'_> {
539 WrappedTableRef {
540 inner: &*self.inner,
541 wrapper: &*self.wrapper,
542 }
543 }
544
545 pub fn scan_bounded(
549 &self,
550 subset: SubsetRef,
551 start: Offset,
552 n: usize,
553 out: &mut TaggedRowBuffer,
554 ) -> Option<Offset> {
555 self.as_ref().scan_bounded(subset, start, n, out)
556 }
557
558 pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
560 self.as_ref().group_by_col(subset, col)
561 }
562
563 pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
565 self.as_ref().group_by_key(subset, cols)
566 }
567
568 pub fn scan_project(
571 &self,
572 subset: SubsetRef,
573 cols: &[ColumnId],
574 start: Offset,
575 n: usize,
576 cs: &[Constraint],
577 out: &mut TaggedRowBuffer,
578 ) -> Option<Offset> {
579 self.as_ref().scan_project(subset, cols, start, n, cs, out)
580 }
581
582 pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
584 self.as_ref().scan(subset)
585 }
586
587 pub fn len(&self) -> usize {
589 self.inner.len()
590 }
591
592 pub fn is_empty(&self) -> bool {
594 self.inner.is_empty()
595 }
596
597 pub(crate) fn lookup_row_vectorized(
598 &self,
599 mask: &mut Mask,
600 bindings: &mut Bindings,
601 args: &[QueryEntry],
602 col: ColumnId,
603 out_var: Variable,
604 ) {
605 self.as_ref()
606 .lookup_row_vectorized(mask, bindings, args, col, out_var)
607 }
608
609 #[allow(clippy::too_many_arguments)]
610 pub(crate) fn lookup_with_default_vectorized(
611 &self,
612 mask: &mut Mask,
613 bindings: &mut Bindings,
614 args: &[QueryEntry],
615 col: ColumnId,
616 default: QueryEntry,
617 out_var: Variable,
618 ) {
619 self.as_ref()
620 .lookup_with_default_vectorized(mask, bindings, args, col, default, out_var)
621 }
622}
623
624impl Deref for WrappedTable {
625 type Target = dyn Table;
626
627 fn deref(&self) -> &Self::Target {
628 &*self.inner
629 }
630}
631
632impl DerefMut for WrappedTable {
633 fn deref_mut(&mut self) -> &mut Self::Target {
634 &mut *self.inner
635 }
636}
637
638pub(crate) trait TableWrapper: Send + Sync {
639 fn dyn_clone(&self) -> Box<dyn TableWrapper>;
640 fn scan_bounded(
641 &self,
642 table: &dyn Table,
643 subset: SubsetRef,
644 start: Offset,
645 n: usize,
646 out: &mut TaggedRowBuffer,
647 ) -> Option<Offset>;
648 fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex;
649 fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex;
650
651 #[allow(clippy::too_many_arguments)]
652 fn scan_project(
653 &self,
654 table: &dyn Table,
655 subset: SubsetRef,
656 cols: &[ColumnId],
657 start: Offset,
658 n: usize,
659 cs: &[Constraint],
660 out: &mut TaggedRowBuffer,
661 ) -> Option<Offset>;
662
663 fn scan(&self, table: &dyn Table, subset: SubsetRef) -> TaggedRowBuffer {
664 let arity = table.spec().arity();
665 let mut buf = TaggedRowBuffer::new(arity);
666 assert!(
667 self.scan_bounded(table, subset, Offset::new(0), usize::MAX, &mut buf)
668 .is_none()
669 );
670 buf
671 }
672
673 #[allow(clippy::too_many_arguments)]
674 fn lookup_row_vectorized(
675 &self,
676 table: &dyn Table,
677 mask: &mut Mask,
678 bindings: &mut Bindings,
679 args: &[QueryEntry],
680 col: ColumnId,
681 out_var: Variable,
682 );
683
684 #[allow(clippy::too_many_arguments)]
685 fn lookup_with_default_vectorized(
686 &self,
687 table: &dyn Table,
688 mask: &mut Mask,
689 bindings: &mut Bindings,
690 args: &[QueryEntry],
691 col: ColumnId,
692 default: QueryEntry,
693 out_var: Variable,
694 );
695}
696
697#[derive(Clone, Copy)]
701pub struct WrappedTableRef<'a> {
702 inner: &'a dyn Table,
703 wrapper: &'a dyn TableWrapper,
704}
705
706impl WrappedTableRef<'_> {
707 pub(crate) fn with_wrapper<T: Table, R>(
708 inner: &T,
709 f: impl for<'a> FnOnce(WrappedTableRef<'a>) -> R,
710 ) -> R {
711 let wrapper = WrapperImpl::<T>(PhantomData);
712 f(WrappedTableRef {
713 inner,
714 wrapper: &wrapper,
715 })
716 }
717
718 pub fn scan_bounded(
722 &self,
723 subset: SubsetRef,
724 start: Offset,
725 n: usize,
726 out: &mut TaggedRowBuffer,
727 ) -> Option<Offset> {
728 self.wrapper.scan_bounded(self.inner, subset, start, n, out)
729 }
730
731 pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
733 self.wrapper.group_by_col(self.inner, subset, col)
734 }
735
736 pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
738 self.wrapper.group_by_key(self.inner, subset, cols)
739 }
740
741 pub fn scan_project(
744 &self,
745 subset: SubsetRef,
746 cols: &[ColumnId],
747 start: Offset,
748 n: usize,
749 cs: &[Constraint],
750 out: &mut TaggedRowBuffer,
751 ) -> Option<Offset> {
752 self.wrapper
753 .scan_project(self.inner, subset, cols, start, n, cs, out)
754 }
755
756 pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
758 self.wrapper.scan(self.inner, subset)
759 }
760
761 pub fn len(&self) -> usize {
763 self.inner.len()
764 }
765
766 pub(crate) fn lookup_row_vectorized(
767 &self,
768 mask: &mut Mask,
769 bindings: &mut Bindings,
770 args: &[QueryEntry],
771 col: ColumnId,
772 out_var: Variable,
773 ) {
774 self.wrapper
775 .lookup_row_vectorized(self.inner, mask, bindings, args, col, out_var);
776 }
777
778 #[allow(clippy::too_many_arguments)]
779 pub(crate) fn lookup_with_default_vectorized(
780 &self,
781 mask: &mut Mask,
782 bindings: &mut Bindings,
783 args: &[QueryEntry],
784 col: ColumnId,
785 default: QueryEntry,
786 out_var: Variable,
787 ) {
788 self.wrapper.lookup_with_default_vectorized(
789 self.inner, mask, bindings, args, col, default, out_var,
790 );
791 }
792}
793
794impl Deref for WrappedTableRef<'_> {
795 type Target = dyn Table;
796
797 fn deref(&self) -> &Self::Target {
798 self.inner
799 }
800}