egglog_core_relations/action/
mod.rs

1//! Instructions that are executed on the results of a query.
2//!
3//! This allows us to execute the "right-hand-side" of a rule. The
4//! implementation here is optimized to execute on a batch of rows at a time.
5use std::{
6    ops::Deref,
7    sync::{
8        Arc,
9        atomic::{AtomicBool, Ordering},
10    },
11};
12
13use crate::{
14    common::HashMap,
15    free_join::{invoke_batch, invoke_batch_assign},
16    numeric_id::{DenseIdMap, NumericId},
17};
18use egglog_concurrency::NotificationList;
19use smallvec::SmallVec;
20
21use crate::{
22    BaseValues, ContainerValues, ExternalFunctionId, WrappedTable,
23    common::Value,
24    free_join::{CounterId, Counters, ExternalFunctions, TableId, TableInfo, Variable},
25    pool::{Clear, Pooled, with_pool_set},
26    table_spec::{ColumnId, MutationBuffer},
27};
28
29use self::mask::{Mask, MaskIter, ValueSource};
30
31#[macro_use]
32pub(crate) mod mask;
33
34#[cfg(test)]
35mod tests;
36
37/// A representation of a value within a query or rule.
38///
39/// A QueryEntry is either a variable bound in a join, or an untyped constant.
40#[derive(Copy, Clone, Debug)]
41pub enum QueryEntry {
42    Var(Variable),
43    Const(Value),
44}
45
46impl From<Variable> for QueryEntry {
47    fn from(var: Variable) -> Self {
48        QueryEntry::Var(var)
49    }
50}
51
52impl From<Value> for QueryEntry {
53    fn from(val: Value) -> Self {
54        QueryEntry::Const(val)
55    }
56}
57
58/// A value that can be written to a table in an action.
59#[derive(Debug, Clone, Copy)]
60pub enum WriteVal {
61    /// A variable or a constant.
62    QueryEntry(QueryEntry),
63    /// A fresh value from the given counter.
64    IncCounter(CounterId),
65    /// The value of the current row index.
66    CurrentVal(usize),
67}
68
69impl<T> From<T> for WriteVal
70where
71    T: Into<QueryEntry>,
72{
73    fn from(val: T) -> Self {
74        WriteVal::QueryEntry(val.into())
75    }
76}
77
78impl From<CounterId> for WriteVal {
79    fn from(ctr: CounterId) -> Self {
80        WriteVal::IncCounter(ctr)
81    }
82}
83
84/// A value that can be written to the database during a merge action.
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
86pub enum MergeVal {
87    /// A fresh value from the given counter.
88    Counter(CounterId),
89    /// A standard constant value.
90    Constant(Value),
91}
92
93impl From<CounterId> for MergeVal {
94    fn from(ctr: CounterId) -> Self {
95        MergeVal::Counter(ctr)
96    }
97}
98
99impl From<Value> for MergeVal {
100    fn from(val: Value) -> Self {
101        MergeVal::Constant(val)
102    }
103}
104
105/// Bindings store a sequence of values for a given set of variables.
106///
107/// The intent of bindings is to store a sequence of mappings from [`Variable`] to [`Value`], in a
108/// struct-of-arrays style that is better laid out for processing bindings in batches.
109pub(crate) struct Bindings {
110    matches: usize,
111    /// The maximum number of calls to `push` that we can receive before we clear the
112    /// [`Bindings`].
113    // This is used to preallocate chunks of the flat `data` vector.
114    max_batch_size: usize,
115    data: Pooled<Vec<Value>>,
116    /// Points into `data`. `data[vars[var].. vars[var]+matches]` contains the values for `data`.
117    var_offsets: DenseIdMap<Variable, usize>,
118}
119
120impl std::ops::Index<Variable> for Bindings {
121    type Output = [Value];
122    fn index(&self, var: Variable) -> &[Value] {
123        self.get(var).unwrap()
124    }
125}
126
127impl std::fmt::Debug for Bindings {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        let mut table = f.debug_map();
130        for (var, start) in self.var_offsets.iter() {
131            table.entry(&var, &&self.data[*start..*start + self.matches]);
132        }
133        table.finish()
134    }
135}
136
137impl Bindings {
138    pub(crate) fn new(max_batch_size: usize) -> Self {
139        Bindings {
140            matches: 0,
141            max_batch_size,
142            data: Default::default(),
143            var_offsets: DenseIdMap::new(),
144        }
145    }
146    fn assert_invariant(&self) {
147        #[cfg(debug_assertions)]
148        {
149            assert!(self.matches <= self.max_batch_size);
150            for (var, start) in self.var_offsets.iter() {
151                assert!(
152                    start + self.matches <= self.data.len(),
153                    "Variable {:?} starts at {}, but data only has {} elements",
154                    var,
155                    start,
156                    self.data.len()
157                );
158            }
159        }
160    }
161
162    pub(crate) fn clear(&mut self) {
163        self.matches = 0;
164        self.var_offsets.clear();
165        self.data.clear();
166        self.assert_invariant();
167    }
168
169    fn get(&self, var: Variable) -> Option<&[Value]> {
170        let start = self.var_offsets.get(var)?;
171        Some(&self.data[*start..*start + self.matches])
172    }
173
174    fn add_mapping(&mut self, var: Variable, vals: &[Value]) {
175        let start = self.data.len();
176        self.data.extend_from_slice(vals);
177        // We have a flat representation of the data, meaning that writing more than
178        // `max_batch_size` values to `var` could overwrite values for a different variable, which
179        // would produce some mysterious results that are hard to debug.
180        debug_assert!(vals.len() <= self.max_batch_size);
181        if vals.len() < self.max_batch_size {
182            let target_len = self.data.len() + self.max_batch_size - vals.len();
183            self.data.resize(target_len, Value::stale());
184        }
185        self.var_offsets.insert(var, start);
186    }
187
188    pub(crate) fn insert(&mut self, var: Variable, vals: &[Value]) {
189        if self.var_offsets.n_ids() == 0 {
190            self.matches = vals.len();
191        } else {
192            assert_eq!(self.matches, vals.len());
193        }
194        self.add_mapping(var, vals);
195        self.assert_invariant();
196    }
197
198    /// Push a new set of bindings for the given variables.
199    ///
200    /// # Safety:
201    /// This method assumes that all calls to `push`:
202    /// * Have a mapping for every member of `used_vars`.
203    /// * Are passed the same `used_vars`.
204    ///
205    /// It is unsafe to avoid bounds-checking. This method is called extremely frequently and the
206    /// overhead of boundschecking is noticeable.
207    pub(crate) unsafe fn push(
208        &mut self,
209        map: &DenseIdMap<Variable, Value>,
210        used_vars: &[Variable],
211    ) {
212        if self.matches != 0 {
213            assert!(self.matches < self.max_batch_size);
214            #[cfg(debug_assertions)]
215            {
216                for var in used_vars {
217                    assert!(
218                        self.var_offsets.get(*var).is_some(),
219                        "Variable {:?} not found in bindings {:?}",
220                        var,
221                        self.var_offsets
222                    );
223                }
224            }
225            for var in used_vars {
226                let var = var.index();
227                // Safe version: this degrades some benchmarks by ~6%
228                // let start = self.var_offsets.raw()[var].unwrap();
229                // self.data[start + self.matches] = map.raw()[var].unwrap();
230                unsafe {
231                    let start = self.var_offsets.raw().get_unchecked(var).unwrap_unchecked();
232                    *self.data.get_unchecked_mut(start + self.matches) =
233                        map.raw().get_unchecked(var).unwrap_unchecked();
234                }
235            }
236        } else {
237            for (var, val) in map.iter() {
238                self.add_mapping(var, &[*val]);
239            }
240        }
241
242        self.matches += 1;
243        self.assert_invariant();
244    }
245
246    /// A method that removes the bindings for the given variable and allows for its values to be
247    /// used independently from the [`Bindings`] struct. This is helpful when an operation needs to
248    /// mutably borrow the values for one value while reading the values for another.
249    ///
250    /// To add the values back, use [`Bindings::replace`].
251    pub(crate) fn take(&mut self, var: Variable) -> Option<ExtractedBinding> {
252        let mut vals: Pooled<Vec<Value>> = with_pool_set(|ps| ps.get());
253        vals.extend_from_slice(self.get(var)?);
254        let start = self.var_offsets.take(var)?;
255        Some(ExtractedBinding {
256            var,
257            offset: start,
258            vals,
259        })
260    }
261
262    /// Replace a binding extracted with [`Bindings::take`].
263    ///
264    /// # Panics
265    /// This method will panic if the length of the values in `bdg` does not match the current
266    /// number of matches in `Bindings`. It may panic if `bdg` was extracted from a different
267    /// [`Bindings`] than the one it is being replaced in.
268    pub(crate) fn replace(&mut self, bdg: ExtractedBinding) {
269        // Replace the binding with the new values.
270        let ExtractedBinding {
271            var,
272            offset,
273            mut vals,
274        } = bdg;
275        assert_eq!(vals.len(), self.matches);
276        self.data
277            .splice(offset..offset + self.matches, vals.drain(..));
278        self.var_offsets.insert(var, offset);
279    }
280}
281
282/// A binding that has been extracted from a [`Bindings`] struct via the [`Bindings::take`] method.
283///
284/// This allows for a variable's contents to be read while the [`Bindings`] struct has been
285/// borrowed mutably. The contents will not be readable until [`Bindings::replace`] is called.
286pub(crate) struct ExtractedBinding {
287    var: Variable,
288    offset: usize,
289    pub(crate) vals: Pooled<Vec<Value>>,
290}
291
292#[derive(Default)]
293pub(crate) struct PredictedVals {
294    #[allow(clippy::type_complexity)]
295    data: HashMap<(TableId, SmallVec<[Value; 3]>), Pooled<Vec<Value>>>,
296}
297
298impl Clear for PredictedVals {
299    fn reuse(&self) -> bool {
300        self.data.capacity() > 0
301    }
302    fn clear(&mut self) {
303        self.data.clear()
304    }
305    fn bytes(&self) -> usize {
306        self.data.capacity()
307            * (std::mem::size_of::<(TableId, SmallVec<[Value; 3]>)>()
308                + std::mem::size_of::<Pooled<Vec<Value>>>())
309    }
310}
311
312impl PredictedVals {
313    pub(crate) fn get_val(
314        &mut self,
315        table: TableId,
316        key: &[Value],
317        default: impl FnOnce() -> Pooled<Vec<Value>>,
318    ) -> impl Deref<Target = Pooled<Vec<Value>>> + '_ {
319        self.data
320            .entry((table, SmallVec::from_slice(key)))
321            .or_insert_with(default)
322    }
323}
324
325#[derive(Copy, Clone)]
326pub(crate) struct DbView<'a> {
327    pub(crate) table_info: &'a DenseIdMap<TableId, TableInfo>,
328    pub(crate) counters: &'a Counters,
329    pub(crate) external_funcs: &'a ExternalFunctions,
330    pub(crate) bases: &'a BaseValues,
331    pub(crate) containers: &'a ContainerValues,
332    pub(crate) notification_list: &'a NotificationList<TableId>,
333}
334
335/// A handle on a database that may be in the process of running a rule.
336///
337/// An ExecutionState grants immutable access to the (much of) the database, and also provides a
338/// limited API to mutate database contents.
339///
340/// A few important notes:
341///
342/// ## Some tables may be missing
343/// Callers external to this crate cannot construct an `ExecutionState` directly. Depending on the
344/// context, some tables may not be available. In particular, when running [`crate::Table::merge`]
345/// operations, only a table's read-side dependencies are available for reading (sim. for writing).
346/// This allows tables that do not need access to one another to be merged in parallel.
347///
348/// When executing a rule, ExecutionState has access to all tables.
349///
350/// ## Limited Mutability
351/// Callers can only stage insertsions and deletions to tables. These changes are not applied until
352/// the next call to `merge` on the underlying table.
353///
354/// ## Predicted Values
355/// ExecutionStates provide a means of synchronizing the results of a pending write across
356/// different executions of a rule. This is particularly important in the case where the result of
357/// an operation (such as "lookup or insert new id" operatiosn) is a fresh id. A common
358/// ExecutionState ensures that future lookups will see the same id (even across calls to
359/// [`ExecutionState::clone`]).
360pub struct ExecutionState<'a> {
361    pub(crate) predicted: PredictedVals,
362    pub(crate) db: DbView<'a>,
363    buffers: MutationBuffers<'a>,
364    /// Whether any mutations have been staged via this ExecutionState.
365    pub(crate) changed: bool,
366    /// Atomic flag for early stopping of rule execution.
367    /// This flag is shared across all handles (clones) of this ExecutionState.
368    stop_match: Arc<AtomicBool>,
369}
370
371/// A basic wrapper around an map from table id to a mutation buffer for that table that also
372/// tracks if a table has been modified.
373struct MutationBuffers<'a> {
374    notify_list: &'a NotificationList<TableId>,
375    buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
376}
377
378impl Clone for MutationBuffers<'_> {
379    fn clone(&self) -> Self {
380        let mut res = MutationBuffers::new(self.notify_list, Default::default());
381        for (id, buf) in self.buffers.iter() {
382            res.buffers.insert(id, buf.fresh_handle());
383        }
384        res
385    }
386}
387
388impl<'a> MutationBuffers<'a> {
389    fn new(
390        notify_list: &'a NotificationList<TableId>,
391        buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
392    ) -> MutationBuffers<'a> {
393        MutationBuffers {
394            notify_list,
395            buffers,
396        }
397    }
398    fn lazy_init(&mut self, table_id: TableId, f: impl FnOnce() -> Box<dyn MutationBuffer>) {
399        self.buffers.get_or_insert(table_id, f);
400    }
401    fn stage_insert(&mut self, table_id: TableId, row: &[Value]) {
402        self.buffers[table_id].stage_insert(row);
403        self.notify_list.notify(table_id);
404    }
405
406    fn stage_remove(&mut self, table_id: TableId, key: &[Value]) {
407        self.buffers[table_id].stage_remove(key);
408        self.notify_list.notify(table_id);
409    }
410}
411
412impl Clone for ExecutionState<'_> {
413    fn clone(&self) -> Self {
414        ExecutionState {
415            predicted: Default::default(),
416            db: self.db,
417            buffers: self.buffers.clone(),
418            changed: false,
419            stop_match: Arc::clone(&self.stop_match),
420        }
421    }
422}
423
424impl<'a> ExecutionState<'a> {
425    pub(crate) fn new(
426        db: DbView<'a>,
427        buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
428    ) -> Self {
429        ExecutionState {
430            predicted: Default::default(),
431            db,
432            buffers: MutationBuffers::new(db.notification_list, buffers),
433            changed: false,
434            stop_match: Arc::new(AtomicBool::new(false)),
435        }
436    }
437
438    /// Stage an insertion of the given row into `table`.
439    ///
440    /// If you are using `egglog`, consider using `egglog_bridge::TableAction`.
441    pub fn stage_insert(&mut self, table: TableId, row: &[Value]) {
442        self.buffers
443            .lazy_init(table, || self.db.table_info[table].table.new_buffer());
444        self.buffers.stage_insert(table, row);
445        self.changed = true;
446    }
447
448    /// Stage a removal of the given row from `table` if it is present.
449    ///
450    /// If you are using `egglog`, consider using `egglog_bridge::TableAction`.
451    pub fn stage_remove(&mut self, table: TableId, key: &[Value]) {
452        self.buffers
453            .lazy_init(table, || self.db.table_info[table].table.new_buffer());
454        self.buffers.stage_remove(table, key);
455        self.changed = true;
456    }
457
458    /// Call an external function.
459    pub fn call_external_func(
460        &mut self,
461        func: ExternalFunctionId,
462        args: &[Value],
463    ) -> Option<Value> {
464        self.db.external_funcs[func].invoke(self, args)
465    }
466
467    pub fn inc_counter(&self, ctr: CounterId) -> usize {
468        self.db.counters.inc(ctr)
469    }
470
471    pub fn read_counter(&self, ctr: CounterId) -> usize {
472        self.db.counters.read(ctr)
473    }
474
475    /// Iterate over the identifiers of all tables visible to this execution state.
476    pub fn table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
477        self.db.table_info.iter().map(|(id, _)| id)
478    }
479
480    /// Get an immutable reference to the table with id `table`.
481    /// Dangerous: Reading from a table during action execution may break the semi-naive evaluation
482    pub fn get_table(&self, table: TableId) -> &'a WrappedTable {
483        &self.db.table_info[table].table
484    }
485
486    /// Get the human-readable name for a table, if one exists.
487    pub fn table_name(&self, table: TableId) -> Option<&'a str> {
488        self.db.table_info[table].name()
489    }
490
491    pub fn base_values(&self) -> &BaseValues {
492        self.db.bases
493    }
494
495    pub fn container_values(&self) -> &'a ContainerValues {
496        self.db.containers
497    }
498
499    /// Get the _current_ value for a given key in `table`, or otherwise insert
500    /// the unique _next_ value.
501    ///
502    /// Insertions into tables are not performed immediately, but rules and
503    /// merge functions sometimes need to get the result of an insertion. For
504    /// such cases, executions keep a cache of "predicted" values for a given
505    /// mapping that manage the insertions, etc.
506    ///
507    /// If you are using `egglog`, consider using `egglog_bridge::TableAction`.
508    pub fn predict_val(
509        &mut self,
510        table: TableId,
511        key: &[Value],
512        vals: impl ExactSizeIterator<Item = MergeVal>,
513    ) -> Pooled<Vec<Value>> {
514        if let Some(row) = self.db.table_info[table].table.get_row(key) {
515            return row.vals;
516        }
517        Pooled::cloned(
518            self.predicted
519                .get_val(table, key, || {
520                    Self::construct_new_row(
521                        &self.db,
522                        &mut self.buffers,
523                        &mut self.changed,
524                        table,
525                        key,
526                        vals,
527                    )
528                })
529                .deref(),
530        )
531    }
532
533    fn construct_new_row(
534        db: &DbView,
535        buffers: &mut MutationBuffers,
536        changed: &mut bool,
537        table: TableId,
538        key: &[Value],
539        vals: impl ExactSizeIterator<Item = MergeVal>,
540    ) -> Pooled<Vec<Value>> {
541        with_pool_set(|ps| {
542            let mut new = ps.get::<Vec<Value>>();
543            new.reserve(key.len() + vals.len());
544            new.extend_from_slice(key);
545            for val in vals {
546                new.push(match val {
547                    MergeVal::Counter(ctr) => Value::from_usize(db.counters.inc(ctr)),
548                    MergeVal::Constant(c) => c,
549                })
550            }
551            buffers.lazy_init(table, || db.table_info[table].table.new_buffer());
552            buffers.stage_insert(table, &new);
553            *changed = true;
554            new
555        })
556    }
557
558    /// A variant of [`ExecutionState::predict_val`] that avoids materializing the full row, and
559    /// instead only returns the value in the given column.
560    pub fn predict_col(
561        &mut self,
562        table: TableId,
563        key: &[Value],
564        vals: impl ExactSizeIterator<Item = MergeVal>,
565        col: ColumnId,
566    ) -> Value {
567        if let Some(val) = self.db.table_info[table].table.get_row_column(key, col) {
568            return val;
569        }
570        self.predicted.get_val(table, key, || {
571            Self::construct_new_row(
572                &self.db,
573                &mut self.buffers,
574                &mut self.changed,
575                table,
576                key,
577                vals,
578            )
579        })[col.index()]
580    }
581
582    /// Trigger early stopping by setting the stop_match flag.
583    /// This causes rule execution to halt at the next opportunity.
584    ///
585    /// Uses Release ordering to ensure all prior writes are visible to threads that observe this flag.
586    pub fn trigger_early_stop(&self) {
587        self.stop_match.store(true, Ordering::Release);
588    }
589
590    /// Check if early stopping has been requested.
591    ///
592    /// Uses Acquire ordering to ensure we see all writes that happened before the flag was set.
593    pub fn should_stop(&self) -> bool {
594        self.stop_match.load(Ordering::Acquire)
595    }
596}
597
598impl ExecutionState<'_> {
599    /// Returns the number of matches that make it to the end of the instructions
600    pub(crate) fn run_instrs(&mut self, instrs: &[Instr], bindings: &mut Bindings) -> usize {
601        if bindings.var_offsets.next_id().rep() == 0 {
602            // If we have no variables, we want to run the rules once.
603            bindings.matches = 1;
604        }
605
606        // Vectorized execution for larger batch sizes
607        let mut mask = with_pool_set(|ps| Mask::new(0..bindings.matches, ps));
608        for instr in instrs {
609            if mask.is_empty() {
610                return 0;
611            }
612            self.run_instr(&mut mask, instr, bindings);
613        }
614        mask.count_ones()
615    }
616    fn run_instr(&mut self, mask: &mut Mask, inst: &Instr, bindings: &mut Bindings) {
617        fn assert_impl(
618            bindings: &mut Bindings,
619            mask: &mut Mask,
620            l: &QueryEntry,
621            r: &QueryEntry,
622            op: impl Fn(Value, Value) -> bool,
623        ) {
624            match (l, r) {
625                (QueryEntry::Var(v1), QueryEntry::Var(v2)) => {
626                    mask.iter(&bindings[*v1])
627                        .zip(&bindings[*v2])
628                        .retain(|(v1, v2)| op(*v1, *v2));
629                }
630                (QueryEntry::Var(v), QueryEntry::Const(c))
631                | (QueryEntry::Const(c), QueryEntry::Var(v)) => {
632                    mask.iter(&bindings[*v]).retain(|v| op(*v, *c));
633                }
634                (QueryEntry::Const(c1), QueryEntry::Const(c2)) => {
635                    if !op(*c1, *c2) {
636                        mask.clear();
637                    }
638                }
639            }
640        }
641
642        match inst {
643            Instr::LookupOrInsertDefault {
644                table: table_id,
645                args,
646                default,
647                dst_col,
648                dst_var,
649            } => {
650                let pool = with_pool_set(|ps| ps.get_pool::<Vec<Value>>().clone());
651                self.buffers.lazy_init(*table_id, || {
652                    self.db.table_info[*table_id].table.new_buffer()
653                });
654                let table = &self.db.table_info[*table_id].table;
655                // Do two passes over the current vector. First, do a round of lookups. Then, for
656                // any offsets where the lookup failed, insert the default value.
657                let mut mask_copy = mask.clone();
658                table.lookup_row_vectorized(&mut mask_copy, bindings, args, *dst_col, *dst_var);
659                mask_copy.symmetric_difference(mask);
660                if mask_copy.is_empty() {
661                    return;
662                }
663                let mut out = bindings.take(*dst_var).unwrap();
664                for_each_binding_with_mask!(mask_copy, args.as_slice(), bindings, |iter| {
665                    iter.assign_vec(&mut out.vals, |offset, key| {
666                        // First, check if the entry is already in the table:
667                        // if let Some(row) = table.get_row_column(&key, *dst_col) {
668                        //     return row;
669                        // }
670                        // If not, insert the default value.
671                        //
672                        // We avoid doing this more than once by using the
673                        // `predicted` map.
674                        let prediction_key = (
675                            *table_id,
676                            SmallVec::<[Value; 3]>::from_slice(key.as_slice()),
677                        );
678                        let buffers = &mut self.buffers;
679                        // Bind some mutable references because the closure passed
680                        // to or_insert_with is `move`.
681                        let ctrs = &self.db.counters;
682                        let bindings = &bindings;
683                        let pool = pool.clone();
684                        let row =
685                            self.predicted
686                                .data
687                                .entry(prediction_key)
688                                .or_insert_with(move || {
689                                    let mut row = pool.get();
690                                    row.extend_from_slice(key.as_slice());
691                                    // Extend the key with the default values.
692                                    row.reserve(default.len());
693                                    for val in default {
694                                        let val = match val {
695                                            WriteVal::QueryEntry(QueryEntry::Const(c)) => *c,
696                                            WriteVal::QueryEntry(QueryEntry::Var(v)) => {
697                                                bindings[*v][offset]
698                                            }
699                                            WriteVal::IncCounter(ctr) => {
700                                                Value::from_usize(ctrs.inc(*ctr))
701                                            }
702                                            WriteVal::CurrentVal(ix) => row[*ix],
703                                        };
704                                        row.push(val)
705                                    }
706                                    // Insert it into the table.
707                                    buffers.stage_insert(*table_id, &row);
708                                    row
709                                });
710                        row[dst_col.index()]
711                    });
712                });
713                bindings.replace(out);
714            }
715            Instr::LookupWithDefault {
716                table,
717                args,
718                dst_col,
719                dst_var,
720                default,
721            } => {
722                let table = &self.db.table_info[*table].table;
723                table.lookup_with_default_vectorized(
724                    mask, bindings, args, *dst_col, *default, *dst_var,
725                );
726            }
727            Instr::Lookup {
728                table,
729                args,
730                dst_col,
731                dst_var,
732            } => {
733                let table = &self.db.table_info[*table].table;
734                table.lookup_row_vectorized(mask, bindings, args, *dst_col, *dst_var);
735            }
736
737            Instr::LookupWithFallback {
738                table: table_id,
739                table_key,
740                func,
741                func_args,
742                dst_col,
743                dst_var,
744            } => {
745                let table = &self.db.table_info[*table_id].table;
746                let mut lookup_result = mask.clone();
747                table.lookup_row_vectorized(
748                    &mut lookup_result,
749                    bindings,
750                    table_key,
751                    *dst_col,
752                    *dst_var,
753                );
754                let mut to_call_func = lookup_result.clone();
755                to_call_func.symmetric_difference(mask);
756                if to_call_func.is_empty() {
757                    return;
758                }
759
760                // Call the given external function on all entries where the lookup failed.
761                invoke_batch_assign(
762                    self.db.external_funcs[*func].as_ref(),
763                    self,
764                    &mut to_call_func,
765                    bindings,
766                    func_args,
767                    *dst_var,
768                );
769                // The new mask should be the lanes where the lookup succeeded or where `func`
770                // succeeded.
771                lookup_result.union(&to_call_func);
772                *mask = lookup_result;
773            }
774            Instr::Insert { table, vals } => {
775                for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
776                    iter.for_each(|vals| {
777                        self.stage_insert(*table, vals.as_slice());
778                    })
779                });
780            }
781            Instr::InsertIfEq { table, l, r, vals } => match (l, r) {
782                (QueryEntry::Var(v1), QueryEntry::Var(v2)) => {
783                    for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
784                        iter.zip(&bindings[*v1])
785                            .zip(&bindings[*v2])
786                            .for_each(|((vals, v1), v2)| {
787                                if v1 == v2 {
788                                    self.stage_insert(*table, &vals);
789                                }
790                            })
791                    })
792                }
793                (QueryEntry::Var(v), QueryEntry::Const(c))
794                | (QueryEntry::Const(c), QueryEntry::Var(v)) => {
795                    for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
796                        iter.zip(&bindings[*v]).for_each(|(vals, cond)| {
797                            if cond == c {
798                                self.stage_insert(*table, &vals);
799                            }
800                        })
801                    })
802                }
803                (QueryEntry::Const(c1), QueryEntry::Const(c2)) => {
804                    if c1 == c2 {
805                        for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| iter
806                            .for_each(|vals| {
807                                self.stage_insert(*table, &vals);
808                            }))
809                    }
810                }
811            },
812            Instr::Remove { table, args } => {
813                for_each_binding_with_mask!(mask, args.as_slice(), bindings, |iter| {
814                    iter.for_each(|args| {
815                        self.stage_remove(*table, args.as_slice());
816                    })
817                });
818            }
819            Instr::External { func, args, dst } => {
820                invoke_batch(
821                    self.db.external_funcs[*func].as_ref(),
822                    self,
823                    mask,
824                    bindings,
825                    args,
826                    *dst,
827                );
828            }
829            Instr::ExternalWithFallback {
830                f1,
831                args1,
832                f2,
833                args2,
834                dst,
835            } => {
836                let mut f1_result = mask.clone();
837                invoke_batch(
838                    self.db.external_funcs[*f1].as_ref(),
839                    self,
840                    &mut f1_result,
841                    bindings,
842                    args1,
843                    *dst,
844                );
845                let mut to_call_f2 = f1_result.clone();
846                to_call_f2.symmetric_difference(mask);
847                if to_call_f2.is_empty() {
848                    return;
849                }
850                // Call the given external function on all entries where the first call failed.
851                invoke_batch_assign(
852                    self.db.external_funcs[*f2].as_ref(),
853                    self,
854                    &mut to_call_f2,
855                    bindings,
856                    args2,
857                    *dst,
858                );
859                // The new mask should be the lanes where either `f1` or `f2` succeeded.
860                f1_result.union(&to_call_f2);
861                *mask = f1_result;
862            }
863            Instr::AssertAnyNe { ops, divider } => {
864                for_each_binding_with_mask!(mask, ops.as_slice(), bindings, |iter| {
865                    iter.retain(|vals| {
866                        vals[0..*divider]
867                            .iter()
868                            .zip(&vals[*divider..])
869                            .any(|(l, r)| l != r)
870                    })
871                })
872            }
873            Instr::AssertEq(l, r) => assert_impl(bindings, mask, l, r, |l, r| l == r),
874            Instr::AssertNe(l, r) => assert_impl(bindings, mask, l, r, |l, r| l != r),
875            Instr::ReadCounter { counter, dst } => {
876                let mut vals = with_pool_set(|ps| ps.get::<Vec<Value>>());
877                let ctr_val = Value::from_usize(self.read_counter(*counter));
878                vals.resize(bindings.matches, ctr_val);
879                bindings.insert(*dst, &vals);
880            }
881        }
882    }
883}
884
885#[derive(Debug, Clone)]
886pub(crate) enum Instr {
887    /// Look up the value of the given table, inserting a new entry with a
888    /// default value if it is not there.
889    LookupOrInsertDefault {
890        table: TableId,
891        args: Vec<QueryEntry>,
892        default: Vec<WriteVal>,
893        dst_col: ColumnId,
894        dst_var: Variable,
895    },
896
897    /// Look up the value of the given table; if the value is not there, use the
898    /// given default.
899    LookupWithDefault {
900        table: TableId,
901        args: Vec<QueryEntry>,
902        dst_col: ColumnId,
903        dst_var: Variable,
904        default: QueryEntry,
905    },
906
907    /// Look up a value of the given table, halting execution if it is not
908    /// there.
909    Lookup {
910        table: TableId,
911        args: Vec<QueryEntry>,
912        dst_col: ColumnId,
913        dst_var: Variable,
914    },
915
916    /// Look up the given key in the table: if the value is not present in the given table, then
917    /// call the given external function with the given arguments. If the external function returns
918    /// a value, that value is returned in the given `dst_var`. If the lookup fails and the
919    /// external function does not return a value, then execution is halted.
920    LookupWithFallback {
921        table: TableId,
922        table_key: Vec<QueryEntry>,
923        func: ExternalFunctionId,
924        func_args: Vec<QueryEntry>,
925        dst_col: ColumnId,
926        dst_var: Variable,
927    },
928
929    /// Insert the given return value value with the provided arguments into the
930    /// table.
931    Insert {
932        table: TableId,
933        vals: Vec<QueryEntry>,
934    },
935
936    /// Insert `vals` into `table` if `l` and `r` are equal.
937    InsertIfEq {
938        table: TableId,
939        l: QueryEntry,
940        r: QueryEntry,
941        vals: Vec<QueryEntry>,
942    },
943
944    /// Remove the entry corresponding to `args` in `func`.
945    Remove {
946        table: TableId,
947        args: Vec<QueryEntry>,
948    },
949
950    /// Bind the result of the external function to a variable.
951    External {
952        func: ExternalFunctionId,
953        args: Vec<QueryEntry>,
954        dst: Variable,
955    },
956
957    /// Bind the result of the external function to a variable. If the first external function
958    /// fails, then use the second external function. If both fail, execution is haulted, (as in a
959    /// single failure of `External`).
960    ExternalWithFallback {
961        f1: ExternalFunctionId,
962        args1: Vec<QueryEntry>,
963        f2: ExternalFunctionId,
964        args2: Vec<QueryEntry>,
965        dst: Variable,
966    },
967
968    /// Continue execution iff the two variables are equal.
969    AssertEq(QueryEntry, QueryEntry),
970
971    /// Continue execution iff the two variables are not equal.
972    AssertNe(QueryEntry, QueryEntry),
973
974    /// For the two slices: ops[0..divider] and ops[divider..], continue
975    /// execution iff there is one pair of values at the same offset that are
976    /// not equal.
977    AssertAnyNe {
978        ops: Vec<QueryEntry>,
979        divider: usize,
980    },
981
982    /// Read the value of a counter and write it to the given variable.
983    ReadCounter {
984        /// The counter to broadcast.
985        counter: CounterId,
986        /// The variable to write the value to.
987        dst: Variable,
988    },
989}