egglog_core_relations/table/
mod.rs

1//! A generic table implementation supporting sorted writes.
2//!
3//! The primary difference between this table and the `Function` implementation
4//! in egglog is that high level concepts like "timestamp" and "merge function"
5//! are abstracted away from the core functionality of the table.
6
7use std::{
8    any::Any,
9    cmp,
10    hash::Hasher,
11    mem,
12    sync::{
13        Arc, Weak,
14        atomic::{AtomicUsize, Ordering},
15    },
16};
17
18use crate::numeric_id::{DenseIdMap, NumericId};
19use crossbeam_queue::SegQueue;
20use hashbrown::HashTable;
21use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
22use rustc_hash::FxHasher;
23use sharded_hash_table::ShardedHashTable;
24
25use crate::{
26    Pooled, TableChange, TableId,
27    action::ExecutionState,
28    common::{HashMap, ShardData, ShardId, SubsetTracker, Value},
29    hash_index::{ColumnIndex, Index},
30    offsets::{OffsetRange, Offsets, RowId, Subset, SubsetRef},
31    parallel_heuristics::parallelize_table_op,
32    pool::with_pool_set,
33    row_buffer::{ParallelRowBufWriter, RowBuffer},
34    table_spec::{
35        ColumnId, Constraint, Generation, MutationBuffer, Offset, Row, Table, TableSpec,
36        TableVersion,
37    },
38};
39
40mod rebuild;
41mod sharded_hash_table;
42#[cfg(test)]
43mod tests;
44
45// NB: Having this type def lets us switch between 64 and 32 bits of hashcode.
46//
47// We should consider just using u64 everywhere though. Hashbrown doesn't play nicely with 32-bit
48// hashcodes because it uses both the high and low bits of a 64-bit code.
49
50type HashCode = u64;
51
52/// A pointer to a row in the table.
53#[derive(Clone, Debug)]
54pub(crate) struct TableEntry {
55    hashcode: HashCode,
56    row: RowId,
57}
58
59impl TableEntry {
60    fn hashcode(&self) -> u64 {
61        // We keep the cast here to make it easy to switch to HashCode=u32.
62        #[allow(clippy::unnecessary_cast)]
63        {
64            self.hashcode as u64
65        }
66    }
67}
68
69/// The core data for a table.
70///
71/// This type is a thin wrapper around `RowBuffer`. The big difference is that
72/// it keeps track of how many stale rows are present.
73#[derive(Clone)]
74struct Rows {
75    data: RowBuffer,
76    scratch: RowBuffer,
77    stale_rows: usize,
78}
79
80impl Rows {
81    fn new(data: RowBuffer) -> Rows {
82        let arity = data.arity();
83        Rows {
84            data,
85            scratch: RowBuffer::new(arity),
86            stale_rows: 0,
87        }
88    }
89    fn clear(&mut self) {
90        self.data.clear();
91        self.stale_rows = 0;
92    }
93    fn next_row(&self) -> RowId {
94        RowId::from_usize(self.data.len())
95    }
96    fn set_stale(&mut self, row: RowId) {
97        if !self.data.set_stale(row) {
98            self.stale_rows += 1;
99        }
100    }
101
102    fn get_row(&self, row: RowId) -> Option<&[Value]> {
103        let row = self.data.get_row(row);
104        if row[0].is_stale() { None } else { Some(row) }
105    }
106
107    /// A variant of `get_row` without bounds-checking on `row`.
108    unsafe fn get_row_unchecked(&self, row: RowId) -> Option<&[Value]> {
109        let row = unsafe { self.data.get_row_unchecked(row) };
110        if row[0].is_stale() { None } else { Some(row) }
111    }
112
113    fn add_row(&mut self, row: &[Value]) -> RowId {
114        if row[0].is_stale() {
115            self.stale_rows += 1;
116        }
117        self.data.add_row(row)
118    }
119
120    fn remove_stale(&mut self, remap: impl FnMut(&[Value], RowId, RowId)) {
121        self.data.remove_stale(remap);
122        self.stale_rows = 0;
123    }
124}
125
126/// The type of closures that are used to merge values in a [`SortedWritesTable`].
127///
128/// The first argument grants access to database using an [`ExecutionState`], the second argument
129/// is the current value of the tuple. The third argument is the new, or "incoming" value of the
130/// tuple. The fourth argument is a mutable reference to a vector that will be used to store the
131/// output of the merge function _if_ it changes the value of the tuple. If it does not, then the
132/// merge function should return `false`.
133pub type MergeFn =
134    dyn Fn(&mut ExecutionState, &[Value], &[Value], &mut Vec<Value>) -> bool + Send + Sync;
135
136pub struct SortedWritesTable {
137    generation: Generation,
138    data: Rows,
139    hash: ShardedHashTable<TableEntry>,
140
141    n_keys: usize,
142    n_columns: usize,
143    sort_by: Option<ColumnId>,
144    offsets: Vec<(Value, RowId)>,
145
146    pending_state: Arc<PendingState>,
147    merge: Arc<MergeFn>,
148    to_rebuild: Vec<ColumnId>,
149    rebuild_index: Index<ColumnIndex>,
150    // Used to manage incremental rebuilds.
151    subset_tracker: SubsetTracker,
152}
153
154impl Clone for SortedWritesTable {
155    fn clone(&self) -> SortedWritesTable {
156        SortedWritesTable {
157            generation: self.generation,
158            data: self.data.clone(),
159            hash: self.hash.clone(),
160            n_keys: self.n_keys,
161            n_columns: self.n_columns,
162            sort_by: self.sort_by,
163            offsets: self.offsets.clone(),
164            pending_state: Arc::new(self.pending_state.deep_copy()),
165            merge: self.merge.clone(),
166            to_rebuild: self.to_rebuild.clone(),
167            rebuild_index: Index::new(self.to_rebuild.clone(), ColumnIndex::new()),
168            subset_tracker: Default::default(),
169        }
170    }
171}
172
173/// A variant of [`RowBuffer`] that can handle arity 0.
174///
175/// We use this to handle empty keys, where the deletion API needs to handle "row buffers of empty
176/// rows". The goal here is to keep most of the API RowBuffer-centric and avoid complicating the
177/// code too much: actual code that was optimized to handle arity 0 would look a bit different.
178#[derive(Clone)]
179enum ArbitraryRowBuffer {
180    NonEmpty(RowBuffer),
181    Empty { rows: usize },
182}
183
184impl ArbitraryRowBuffer {
185    fn new(arity: usize) -> ArbitraryRowBuffer {
186        if arity == 0 {
187            ArbitraryRowBuffer::Empty { rows: 0 }
188        } else {
189            ArbitraryRowBuffer::NonEmpty(RowBuffer::new(arity))
190        }
191    }
192
193    fn add_row(&mut self, row: &[Value]) {
194        match self {
195            ArbitraryRowBuffer::NonEmpty(buf) => {
196                buf.add_row(row);
197            }
198            ArbitraryRowBuffer::Empty { rows } => {
199                *rows += 1;
200            }
201        }
202    }
203
204    fn len(&self) -> usize {
205        match self {
206            ArbitraryRowBuffer::NonEmpty(buf) => buf.len(),
207            ArbitraryRowBuffer::Empty { rows } => *rows,
208        }
209    }
210
211    fn for_each(&self, mut f: impl FnMut(&[Value])) {
212        match self {
213            ArbitraryRowBuffer::NonEmpty(buf) => {
214                for row in buf.iter() {
215                    f(row);
216                }
217            }
218            ArbitraryRowBuffer::Empty { rows } => {
219                for _ in 0..*rows {
220                    f(&[]);
221                }
222            }
223        }
224    }
225}
226
227struct Buffer {
228    pending_rows: DenseIdMap<ShardId, RowBuffer>,
229    pending_removals: DenseIdMap<ShardId, ArbitraryRowBuffer>,
230    state: Weak<PendingState>,
231    n_cols: u32,
232    n_keys: u32,
233    shard_data: ShardData,
234}
235
236impl MutationBuffer for Buffer {
237    fn stage_insert(&mut self, row: &[Value]) {
238        let (shard, _) = hash_code(self.shard_data, row, self.n_keys as _);
239        self.pending_rows
240            .get_or_insert(shard, || RowBuffer::new(self.n_cols as _))
241            .add_row(row);
242    }
243    fn stage_remove(&mut self, key: &[Value]) {
244        let (shard, _) = hash_code(self.shard_data, key, self.n_keys as _);
245        self.pending_removals
246            .get_or_insert(shard, || ArbitraryRowBuffer::new(self.n_keys as _))
247            .add_row(key);
248    }
249    fn fresh_handle(&self) -> Box<dyn MutationBuffer> {
250        Box::new(Buffer {
251            pending_rows: Default::default(),
252            pending_removals: Default::default(),
253            state: self.state.clone(),
254            n_cols: self.n_cols,
255            n_keys: self.n_keys,
256            shard_data: self.shard_data,
257        })
258    }
259}
260
261impl Drop for Buffer {
262    fn drop(&mut self) {
263        if let Some(state) = self.state.upgrade() {
264            let mut rows = 0;
265            for shard_id in 0..self.pending_rows.n_ids() {
266                let shard = ShardId::from_usize(shard_id);
267                let Some(buf) = self.pending_rows.take(shard) else {
268                    continue;
269                };
270                rows += buf.len();
271                state.pending_rows[shard].push(buf);
272            }
273            state.total_rows.fetch_add(rows, Ordering::Relaxed);
274
275            let mut rows = 0;
276            for shard_id in 0..self.pending_removals.n_ids() {
277                let shard = ShardId::from_usize(shard_id);
278                let Some(buf) = self.pending_removals.take(shard) else {
279                    continue;
280                };
281                rows += buf.len();
282                state.pending_removals[shard].push(buf);
283            }
284            state.total_removals.fetch_add(rows, Ordering::Relaxed);
285        }
286    }
287}
288
289impl Table for SortedWritesTable {
290    fn dyn_clone(&self) -> Box<dyn Table> {
291        Box::new(self.clone())
292    }
293    fn as_any(&self) -> &dyn Any {
294        self
295    }
296    fn clear(&mut self) {
297        self.pending_state.clear();
298        if self.data.data.len() == 0 {
299            return;
300        }
301        self.offsets.clear();
302        self.data.clear();
303        self.hash.clear();
304        self.generation = Generation::from_usize(self.version().major.index() + 1);
305    }
306
307    fn spec(&self) -> TableSpec {
308        TableSpec {
309            n_keys: self.n_keys,
310            n_vals: self.n_columns - self.n_keys,
311            uncacheable_columns: Default::default(),
312            allows_delete: true,
313        }
314    }
315
316    fn apply_rebuild(
317        &mut self,
318        table_id: TableId,
319        table: &crate::WrappedTable,
320        next_ts: Value,
321        exec_state: &mut ExecutionState,
322    ) -> bool {
323        self.do_rebuild(table_id, table, next_ts, exec_state)
324    }
325
326    fn version(&self) -> TableVersion {
327        TableVersion {
328            major: self.generation,
329            minor: Offset::from_usize(self.data.next_row().index()),
330        }
331    }
332
333    fn updates_since(&self, offset: Offset) -> Subset {
334        Subset::Dense(OffsetRange::new(
335            RowId::from_usize(offset.index()),
336            self.data.next_row(),
337        ))
338    }
339
340    fn all(&self) -> Subset {
341        Subset::Dense(OffsetRange::new(RowId::new(0), self.data.next_row()))
342    }
343
344    fn len(&self) -> usize {
345        self.data.data.len() - self.data.stale_rows
346    }
347
348    fn scan_generic(&self, subset: SubsetRef, mut f: impl FnMut(RowId, &[Value]))
349    where
350        Self: Sized,
351    {
352        let Some((_low, hi)) = subset.bounds() else {
353            // Empty subset
354            return;
355        };
356        assert!(
357            hi.index() <= self.data.data.len(),
358            "{} vs. {}",
359            hi.index(),
360            self.data.data.len()
361        );
362        // SAFETY: subsets are sorted, low must be at most hi, and hi is less
363        // than the length of the table.
364        subset.offsets(|row| unsafe {
365            if let Some(vals) = self.data.get_row_unchecked(row) {
366                f(row, vals)
367            }
368        })
369    }
370
371    fn scan_generic_bounded(
372        &self,
373        subset: SubsetRef,
374        start: Offset,
375        n: usize,
376        cs: &[Constraint],
377        mut f: impl FnMut(RowId, &[Value]),
378    ) -> Option<Offset>
379    where
380        Self: Sized,
381    {
382        if cs.is_empty() {
383            subset
384                .iter_bounded(start.index(), start.index() + n, |row| {
385                    let Some(entry) = self.data.get_row(row) else {
386                        return;
387                    };
388                    f(row, entry);
389                })
390                .map(Offset::from_usize)
391        } else {
392            subset
393                .iter_bounded(start.index(), start.index() + n, |row| {
394                    let Some(entry) = self.get_if(cs, row) else {
395                        return;
396                    };
397                    f(row, entry);
398                })
399                .map(Offset::from_usize)
400        }
401    }
402
403    fn fast_subset(&self, constraint: &Constraint) -> Option<Subset> {
404        let sort_by = self.sort_by?;
405        match constraint {
406            Constraint::Eq { .. } => None,
407            Constraint::EqConst { col, val } => {
408                if col == &sort_by {
409                    match self.binary_search_sort_val(*val) {
410                        Ok((found, bound)) => Some(Subset::Dense(OffsetRange::new(found, bound))),
411                        Err(_) => Some(Subset::empty()),
412                    }
413                } else {
414                    None
415                }
416            }
417            Constraint::LtConst { col, val } => {
418                if col == &sort_by {
419                    match self.binary_search_sort_val(*val) {
420                        Ok((found, _)) => {
421                            Some(Subset::Dense(OffsetRange::new(RowId::new(0), found)))
422                        }
423                        Err(next) => Some(Subset::Dense(OffsetRange::new(RowId::new(0), next))),
424                    }
425                } else {
426                    None
427                }
428            }
429            Constraint::GtConst { col, val } => {
430                if col == &sort_by {
431                    match self.binary_search_sort_val(*val) {
432                        Ok((_, bound)) => {
433                            Some(Subset::Dense(OffsetRange::new(bound, self.data.next_row())))
434                        }
435                        Err(next) => {
436                            Some(Subset::Dense(OffsetRange::new(next, self.data.next_row())))
437                        }
438                    }
439                } else {
440                    None
441                }
442            }
443            Constraint::LeConst { col, val } => {
444                if col == &sort_by {
445                    match self.binary_search_sort_val(*val) {
446                        Ok((_, bound)) => {
447                            Some(Subset::Dense(OffsetRange::new(RowId::new(0), bound)))
448                        }
449                        Err(next) => Some(Subset::Dense(OffsetRange::new(RowId::new(0), next))),
450                    }
451                } else {
452                    None
453                }
454            }
455            Constraint::GeConst { col, val } => {
456                if col == &sort_by {
457                    match self.binary_search_sort_val(*val) {
458                        Ok((found, _)) => {
459                            Some(Subset::Dense(OffsetRange::new(found, self.data.next_row())))
460                        }
461                        Err(next) => {
462                            Some(Subset::Dense(OffsetRange::new(next, self.data.next_row())))
463                        }
464                    }
465                } else {
466                    None
467                }
468            }
469        }
470    }
471
472    fn refine_one(&self, mut subset: Subset, c: &Constraint) -> Subset {
473        // NB: we aren't using any of the `fast_subset` tricks here. We may want
474        // to if the higher-level implementations end up using it directly.
475        subset.retain(|row| self.eval(std::slice::from_ref(c), row));
476        subset
477    }
478
479    fn new_buffer(&self) -> Box<dyn MutationBuffer> {
480        let n_shards = self.hash.shard_data().n_shards();
481        Box::new(Buffer {
482            pending_rows: DenseIdMap::with_capacity(n_shards),
483            pending_removals: DenseIdMap::with_capacity(n_shards),
484            state: Arc::downgrade(&self.pending_state),
485            n_keys: u32::try_from(self.n_keys).expect("n_keys should fit in u32"),
486            n_cols: u32::try_from(self.n_columns).expect("n_columns should fit in u32"),
487            shard_data: self.hash.shard_data(),
488        })
489    }
490
491    fn merge(&mut self, exec_state: &mut ExecutionState) -> TableChange {
492        let removed = self.do_delete();
493        let added = self.do_insert(exec_state);
494        self.maybe_rehash();
495        TableChange { removed, added }
496    }
497
498    fn get_row(&self, key: &[Value]) -> Option<Row> {
499        let id = get_entry(key, self.n_keys, &self.hash, |row| {
500            &self.data.get_row(row).unwrap()[0..self.n_keys] == key
501        })?;
502        let mut vals = with_pool_set(|ps| ps.get::<Vec<Value>>());
503        vals.extend_from_slice(self.data.get_row(id).unwrap());
504        Some(Row { id, vals })
505    }
506
507    fn get_row_column(&self, key: &[Value], col: ColumnId) -> Option<Value> {
508        let id = get_entry(key, self.n_keys, &self.hash, |row| {
509            &self.data.get_row(row).unwrap()[0..self.n_keys] == key
510        })?;
511        Some(self.data.get_row(id).unwrap()[col.index()])
512    }
513}
514
515impl SortedWritesTable {
516    /// Create a new [`SortedWritesTable`] with the given number of keys,
517    /// columns, and an optional sort column.
518    ///
519    /// The `merge_fn` is used to evaluate conflicts when more than one row is
520    /// inserted with the same primary key. The old and new proposed values are
521    /// passed as the second and third arguments, respectively, with the
522    /// function filling the final argument with the contents of the new row.
523    /// The return value indicates whether or not the contents of the vector
524    /// should be used.
525    ///
526    /// Merge functions can access the database via [`ExecutionState`].
527    pub fn new(
528        n_keys: usize,
529        n_columns: usize,
530        sort_by: Option<ColumnId>,
531        to_rebuild: Vec<ColumnId>,
532        merge_fn: Box<MergeFn>,
533    ) -> Self {
534        let hash = ShardedHashTable::<TableEntry>::default();
535        let shard_data = hash.shard_data();
536        let rebuild_index = Index::new(to_rebuild.clone(), ColumnIndex::new());
537        SortedWritesTable {
538            generation: Generation::new(0),
539            data: Rows::new(RowBuffer::new(n_columns)),
540            hash,
541            n_keys,
542            n_columns,
543            sort_by,
544            offsets: Default::default(),
545            pending_state: Arc::new(PendingState::new(shard_data)),
546            merge: merge_fn.into(),
547            to_rebuild,
548            rebuild_index,
549            subset_tracker: Default::default(),
550        }
551    }
552
553    /// Flush all pending removals, in parallel.
554    fn parallel_delete(&mut self) -> bool {
555        let shard_data = self.hash.shard_data();
556        let stale_delta: usize = self
557            .hash
558            .mut_shards()
559            .par_iter_mut()
560            .enumerate()
561            .filter_map(|(shard_id, shard)| {
562                let shard_id = ShardId::from_usize(shard_id);
563                if self.pending_state.pending_removals[shard_id].is_empty() {
564                    return None;
565                }
566                Some((shard_id, shard))
567            })
568            .map(|(shard_id, shard)| {
569                let queue = &self.pending_state.pending_removals[shard_id];
570                let mut marked_stale = 0;
571                while let Some(buf) = queue.pop() {
572                    buf.for_each(|to_remove| {
573                        let (actual_shard, hc) = hash_code(shard_data, to_remove, self.n_keys);
574                        assert_eq!(actual_shard, shard_id);
575                        if let Ok(entry) = shard.find_entry(hc, |entry| {
576                            entry.hashcode == (hc as _)
577                                && &self.data.get_row(entry.row).unwrap()[0..self.n_keys]
578                                    == to_remove
579                        }) {
580                            let (ent, _) = entry.remove();
581                            // SAFETY: The safety requirements of
582                            // `set_stale_shared` are that there are no
583                            // concurrent accesses to `row`. No other threads
584                            // can access this row within this method because
585                            // different `shards` partition the space
586                            // (guaranteed by the assertion above), and we
587                            // launch at most one thread per shard.
588                            marked_stale +=
589                                unsafe { !self.data.data.set_stale_shared(ent.row) } as usize;
590                        }
591                    });
592                }
593                marked_stale
594            })
595            .sum();
596        // Update the stale count with the total marked stale.
597        self.data.stale_rows += stale_delta;
598        stale_delta > 0
599    }
600    fn serial_delete(&mut self) -> bool {
601        let shard_data = self.hash.shard_data();
602        let mut changed = false;
603        self.hash
604            .mut_shards()
605            .iter_mut()
606            .enumerate()
607            .for_each(|(shard_id, shard)| {
608                let shard_id = ShardId::from_usize(shard_id);
609                let queue = &self.pending_state.pending_removals[shard_id];
610                while let Some(buf) = queue.pop() {
611                    buf.for_each(|to_remove| {
612                        let (actual_shard, hc) = hash_code(shard_data, to_remove, self.n_keys);
613                        assert_eq!(actual_shard, shard_id);
614                        if let Ok(entry) = shard.find_entry(hc, |entry| {
615                            entry.hashcode == (hc as _)
616                                && &self.data.get_row(entry.row).unwrap()[0..self.n_keys]
617                                    == to_remove
618                        }) {
619                            let (ent, _) = entry.remove();
620                            self.data.set_stale(ent.row);
621                            changed = true;
622                        }
623                    })
624                }
625            });
626        changed
627    }
628
629    fn do_delete(&mut self) -> bool {
630        let total = self.pending_state.total_removals.swap(0, Ordering::Relaxed);
631
632        if parallelize_table_op(total) {
633            self.parallel_delete()
634        } else {
635            self.serial_delete()
636        }
637    }
638
639    fn do_insert(&mut self, exec_state: &mut ExecutionState) -> bool {
640        let total = self.pending_state.total_rows.swap(0, Ordering::Relaxed);
641        self.data.data.reserve(total);
642        if parallelize_table_op(total) {
643            if let Some(col) = self.sort_by {
644                self.parallel_insert(
645                    exec_state,
646                    SortChecker {
647                        col,
648                        current: None,
649                        baseline: self.offsets.last().map(|(v, _)| *v),
650                    },
651                )
652            } else {
653                self.parallel_insert(exec_state, ())
654            }
655        } else {
656            self.serial_insert(exec_state)
657        }
658    }
659
660    fn serial_insert(&mut self, exec_state: &mut ExecutionState) -> bool {
661        let mut changed = false;
662        let n_keys = self.n_keys;
663        let mut scratch = with_pool_set(|ps| ps.get::<Vec<Value>>());
664        for (_outer_shard, queue) in self.pending_state.pending_rows.iter() {
665            if let Some(sort_by) = self.sort_by {
666                while let Some(buf) = queue.pop() {
667                    for query in buf.non_stale() {
668                        let key = &query[0..n_keys];
669                        let entry = get_entry_mut(query, n_keys, &mut self.hash, |row| {
670                            let Some(row) = self.data.get_row(row) else {
671                                return false;
672                            };
673                            &row[0..n_keys] == key
674                        });
675
676                        if let Some(row) = entry {
677                            // First case: overwriting an existing value. Apply merge
678                            // function. Insert new row and update hash table if merge
679                            // changes anything.
680                            let cur = self
681                                .data
682                                .get_row(*row)
683                                .expect("table should not point to stale entry");
684                            if (self.merge)(exec_state, cur, query, &mut scratch) {
685                                let sort_val = query[sort_by.index()];
686                                let new = self.data.add_row(&scratch);
687                                if let Some(largest) = self.offsets.last().map(|(v, _)| *v) {
688                                    assert!(
689                                        sort_val >= largest,
690                                        "inserting row that violates sort order ({sort_val:?} vs. {largest:?})"
691                                    );
692                                    if sort_val > largest {
693                                        self.offsets.push((sort_val, new));
694                                    }
695                                } else {
696                                    self.offsets.push((sort_val, new));
697                                }
698                                self.data.set_stale(*row);
699                                *row = new;
700                                changed = true;
701                            }
702                            scratch.clear();
703                        } else {
704                            let sort_val = query[sort_by.index()];
705                            // New value: update invariants.
706                            let new = self.data.add_row(query);
707                            if let Some(largest) = self.offsets.last().map(|(v, _)| *v) {
708                                assert!(
709                                    sort_val >= largest,
710                                    "inserting row that violates sort order {sort_val:?} vs. {largest:?}"
711                                );
712                                if sort_val > largest {
713                                    self.offsets.push((sort_val, new));
714                                }
715                            } else {
716                                self.offsets.push((sort_val, new));
717                            }
718                            let (shard, hc) = hash_code(self.hash.shard_data(), query, self.n_keys);
719                            debug_assert_eq!(shard, _outer_shard);
720                            self.hash.mut_shards()[shard.index()].insert_unique(
721                                hc as _,
722                                TableEntry {
723                                    hashcode: hc as _,
724                                    row: new,
725                                },
726                                TableEntry::hashcode,
727                            );
728                            changed = true;
729                        }
730                    }
731                }
732            } else {
733                // Simplified variant without the sorting constraint.
734                while let Some(buf) = queue.pop() {
735                    for query in buf.non_stale() {
736                        let key = &query[0..n_keys];
737                        let entry = get_entry_mut(query, n_keys, &mut self.hash, |row| {
738                            let Some(row) = self.data.get_row(row) else {
739                                return false;
740                            };
741                            &row[0..n_keys] == key
742                        });
743
744                        if let Some(row) = entry {
745                            let cur = self
746                                .data
747                                .get_row(*row)
748                                .expect("table should not point to stale entry");
749                            if (self.merge)(exec_state, cur, query, &mut scratch) {
750                                let new = self.data.add_row(&scratch);
751                                self.data.set_stale(*row);
752                                *row = new;
753                                changed = true;
754                            }
755                            scratch.clear();
756                        } else {
757                            // New value: update invariants.
758                            let new = self.data.add_row(query);
759                            let (shard, hc) = hash_code(self.hash.shard_data(), query, self.n_keys);
760                            debug_assert_eq!(shard, _outer_shard);
761                            self.hash.mut_shards()[shard.index()].insert_unique(
762                                hc as _,
763                                TableEntry {
764                                    hashcode: hc as _,
765                                    row: new,
766                                },
767                                TableEntry::hashcode,
768                            );
769                            changed = true;
770                        }
771                    }
772                }
773            };
774        }
775        changed
776    }
777
778    fn parallel_insert<C: OrderingChecker>(
779        &mut self,
780        exec_state: &ExecutionState,
781        checker: C,
782    ) -> bool {
783        const BATCH_SIZE: usize = 1 << 18;
784        // Parallel insert uses one giant parallel foreach. We have updates
785        // pre-sharded, and one logical thread can process updates for each
786        // shard independently. Updates happen in three phases, which comments
787        // describe below.
788        let shard_data = self.hash.shard_data();
789        let n_keys = self.n_keys;
790        let n_cols = self.n_columns;
791        let next_offset = RowId::from_usize(self.data.data.len());
792        let row_writer = self.data.data.parallel_writer();
793        let pending_adds = self
794            .hash
795            .mut_shards()
796            .par_iter_mut()
797            .enumerate()
798            .map(|(shard_id, shard)| {
799                let shard_id = ShardId::from_usize(shard_id);
800                let mut checker = checker.clone();
801                let mut exec_state = exec_state.clone();
802                let mut scratch = with_pool_set(|ps| ps.get::<Vec<Value>>());
803                let queue = &self.pending_state.pending_rows[shard_id];
804                let mut marked_stale = 0usize;
805                let mut staged = StagedOutputs::new(n_keys, n_cols, BATCH_SIZE);
806                let mut changed = false;
807                // The core flush loop: We call once `staged` reaches `BATCH_SIZE` or
808                // when we're done.
809                macro_rules! flush_staged_outputs {
810                    () => {{
811                        // Phase 2: Write the staged rows to the row writer. This only
812                        // works due to the `ParallelRowBufWriter` machinery.
813                        let (start_row, stale) = staged.write_output(&row_writer);
814                        marked_stale += stale;
815                        // Phase 3: With the values buffered in the row buffer, we can
816                        // write them back to the shard, pointed to the correct rows.
817
818                        // In the serial implementation, we do phases 2 and 3 inline with
819                        // processing the incoming mutation, but separating them out
820                        // this way allows us to do a single write to the shared row
821                        // buffer, rather than one per row, which would cause
822                        // contention.
823                        let mut cur_row = start_row;
824                        let read_handle = row_writer.read_handle();
825                        for row in staged.rows() {
826                            if row.first().map(Value::is_stale).unwrap_or(false) {
827                                cur_row = cur_row.inc();
828                                continue;
829                            }
830                            use hashbrown::hash_table::Entry;
831                            checker.check_local(row);
832                            changed = true;
833                            let key = &row[0..n_keys];
834                            let (_actual_shard, hc) = hash_code(shard_data, row, n_keys);
835                            #[cfg(any(debug_assertions, test))]
836                            {
837                                unsafe {
838                                    // read the value we wrote at this row and
839                                    // check that it matches.
840                                    assert_eq!(read_handle.get_row_unchecked(cur_row), row);
841                                }
842                            }
843                            debug_assert_eq!(_actual_shard, shard_id);
844                            match shard.entry(
845                                hc,
846                                // SAFETY: `ent` must point to a valid row
847                                |ent| unsafe {
848                                    ent.hashcode == hc as HashCode
849                                        && &read_handle.get_row_unchecked(ent.row)[0..n_keys] == key
850                                },
851                                TableEntry::hashcode,
852                            ) {
853                                Entry::Occupied(mut occ) => {
854                                    // SAFETY: `occ` must point to a valid row: we only insert valid rows
855                                    // into the map.
856                                    let cur = unsafe { read_handle.get_row_unchecked(occ.get().row) };
857
858                                    // SAFETY: The safety requirements of
859                                    // `set_stale_shared` are that there are no
860                                    // concurrent accesses to `row`. We have
861                                    // exclusive access to any row whose hash matches this
862                                    // shard.
863                                    if (self.merge)(&mut exec_state, cur, row, &mut scratch) {
864                                        unsafe {
865                                            let _was_stale = read_handle.set_stale_shared(occ.get().row);
866                                            debug_assert!(!_was_stale);
867                                        }
868                                        occ.get_mut().row = cur_row;
869                                        changed = true;
870                                    } else {
871                                        // Mark the new row as stale: we didn't end up needing it.
872                                        unsafe {
873                                            let _was_stale = read_handle.set_stale_shared(cur_row);
874                                            debug_assert!(!_was_stale);
875                                        }
876                                    }
877                                    marked_stale += 1;
878                                    scratch.clear();
879                                }
880                                Entry::Vacant(v) => {
881                                    changed = true;
882                                    v.insert(TableEntry {
883                                        hashcode: hc as HashCode,
884                                        row: cur_row,
885                                    });
886                                }
887                            }
888
889                            cur_row = cur_row.inc();
890                        }
891                        staged.clear();
892                    }};
893                }
894                // Phase 1: process all incoming updates:
895                // * Add new values to `staged`
896                // * Removing entries in `shard` and mark them as stale in
897                // `data` if they will be overwritten.
898                while let Some(buf) = queue.pop() {
899                    // We create a read_handle once per batch to avoid blocking
900                    // too many threads if someone needs to resize the row
901                    // writer.
902                    for row in buf.non_stale() {
903                        staged.insert(row, |cur, new, out| {
904                            (self.merge)(&mut exec_state, cur, new, out)
905                        });
906                        if staged.len() >= BATCH_SIZE {
907                            flush_staged_outputs!();
908                        }
909                    }
910                }
911                flush_staged_outputs!();
912                (checker, marked_stale, changed)
913            })
914            .collect_vec_list();
915        self.data.data = row_writer.finish();
916        // Now we just need to reset our invariants.
917
918        // Confirm none of the writes violated sort order and update the
919        // `offsets` vector.
920        let checker = C::check_global(pending_adds.iter().flatten().map(|(checker, _, _)| checker));
921        checker.update_offsets(next_offset, &mut self.offsets);
922
923        // Update the staleness counters.
924        self.data.stale_rows += pending_adds
925            .iter()
926            .flatten()
927            .map(|(_, stale, _)| *stale)
928            .sum::<usize>();
929
930        // Register any changes.
931        pending_adds
932            .iter()
933            .flatten()
934            .any(|(_, _, changed)| *changed)
935    }
936
937    fn binary_search_sort_val(&self, val: Value) -> Result<(RowId, RowId), RowId> {
938        debug_assert!(
939            self.offsets.windows(2).all(|x| x[0].1 < x[1].1),
940            "{:?}",
941            self.offsets
942        );
943
944        debug_assert!(
945            self.offsets.windows(2).all(|x| x[0].0 < x[1].0),
946            "{:?}",
947            self.offsets
948        );
949        match self.offsets.binary_search_by_key(&val, |(v, _)| *v) {
950            Ok(got) => Ok((
951                self.offsets[got].1,
952                self.offsets
953                    .get(got + 1)
954                    .map(|(_, r)| *r)
955                    .unwrap_or(self.data.next_row()),
956            )),
957            Err(next) => Err(self
958                .offsets
959                .get(next)
960                .map(|(_, id)| *id)
961                .unwrap_or(self.data.next_row())),
962        }
963    }
964    fn eval(&self, cs: &[Constraint], row: RowId) -> bool {
965        self.get_if(cs, row).is_some()
966    }
967
968    fn get_if(&self, cs: &[Constraint], row: RowId) -> Option<&[Value]> {
969        let row = self.data.get_row(row)?;
970        let mut res = true;
971        for constraint in cs {
972            match constraint {
973                Constraint::Eq { l_col, r_col } => res &= row[l_col.index()] == row[r_col.index()],
974                Constraint::EqConst { col, val } => res &= row[col.index()] == *val,
975                Constraint::LtConst { col, val } => res &= row[col.index()] < *val,
976                Constraint::GtConst { col, val } => res &= row[col.index()] > *val,
977                Constraint::LeConst { col, val } => res &= row[col.index()] <= *val,
978                Constraint::GeConst { col, val } => res &= row[col.index()] >= *val,
979            }
980        }
981        if res { Some(row) } else { None }
982    }
983
984    fn maybe_rehash(&mut self) {
985        if self.data.stale_rows <= cmp::max(16, self.data.data.len() / 2) {
986            return;
987        }
988
989        if parallelize_table_op(self.data.data.len()) {
990            self.parallel_rehash();
991        } else {
992            self.rehash();
993        }
994    }
995    fn parallel_rehash(&mut self) {
996        use rayon::prelude::*;
997        // Parallel rehashes go "hash-first" rather than "rows-first".
998        //
999        // We iterate over each shard and then write out new contents to a fresh row, in parallel.
1000        let Some(sort_by) = self.sort_by else {
1001            // Just do a serial rehash for now. We currently do not have a use-case for parallel
1002            // compaction of unsorted tables.
1003            //
1004            // Implementing parallel compaction for an unsorted table is much easier: each shard
1005            // can write to a contiguous chunk of the `scratch` buffer, with the offsets being
1006            // pre-chunked based on the size of each shard.
1007            self.rehash();
1008            return;
1009        };
1010        self.generation = self.generation.inc();
1011        assert!(!self.offsets.is_empty());
1012        struct TimestampStats {
1013            value: Value,
1014            count: usize,
1015            histogram: Pooled<DenseIdMap<ShardId, usize>>,
1016        }
1017        impl Default for TimestampStats {
1018            fn default() -> TimestampStats {
1019                TimestampStats {
1020                    value: Value::stale(),
1021                    count: 0,
1022                    histogram: with_pool_set(|ps| ps.get()),
1023                }
1024            }
1025        }
1026        let mut results = Vec::<TimestampStats>::with_capacity(self.offsets.len());
1027        results.resize_with(self.offsets.len() - 1, Default::default);
1028        // Use a macro rather than a lambda to avoid borrow issues.
1029        macro_rules! compute_hist {
1030            ($start_val: expr, $start_row: expr, $end_row: expr) => {{
1031                let mut histogram: Pooled<DenseIdMap<ShardId, usize>> =
1032                    with_pool_set(|ps| ps.get());
1033                let mut cur_row = $start_row;
1034                let mut count = 0;
1035                while cur_row < $end_row {
1036                    if let Some(row) = self.data.get_row(cur_row) {
1037                        count += 1;
1038                        let (shard, _) = hash_code(self.hash.shard_data(), row, self.n_keys);
1039                        *histogram.get_or_default(shard) += 1;
1040                    }
1041                    cur_row = cur_row.inc();
1042                }
1043                TimestampStats {
1044                    value: $start_val,
1045                    count,
1046                    histogram,
1047                }
1048            }};
1049        }
1050        let mut last: TimestampStats = Default::default();
1051        rayon::join(
1052            || {
1053                // This closure handles computing all timestamps but the last one.
1054                self.offsets
1055                    .windows(2)
1056                    .zip(results.iter_mut())
1057                    .par_bridge()
1058                    .for_each(|(xs, res)| {
1059                        let [(start_val, start_row), (_, end_row)] = xs else {
1060                            unreachable!()
1061                        };
1062                        *res = compute_hist!(*start_val, *start_row, *end_row);
1063                    })
1064            },
1065            || {
1066                // And here we handle the final one.
1067                let (start_val, start_row) = self.offsets.last().unwrap();
1068                let end_row = self.data.next_row();
1069                last = compute_hist!(*start_val, *start_row, end_row);
1070            },
1071        );
1072        results.push(last);
1073        // Now we need to compute cumulative statistics on the row layouts here.
1074        // We do this serially a we currently don't have a ton of use for cases with thousands
1075        // of timestamps or more. There are well-known parallel algorithms for computing these
1076        // cumulative statistics in parallel, but they aren't currently all that well-suited
1077        // for rayon at the moment.
1078        let mut prev_count = 0;
1079        self.offsets.clear();
1080        for stats in results.iter_mut() {
1081            if stats.count == 0 {
1082                continue;
1083            }
1084            self.offsets
1085                .push((stats.value, RowId::from_usize(prev_count)));
1086            let mut inner = prev_count;
1087            for (_, count) in stats.histogram.iter_mut() {
1088                // Each entry in the histogram now points to the start row for that shard's
1089                // rows for a given timestamp.
1090                let tmp = *count;
1091                *count = inner;
1092                inner += tmp;
1093            }
1094            prev_count += stats.count;
1095            debug_assert_eq!(inner, prev_count)
1096        }
1097
1098        // Now the part with some unsafe code.
1099        // We will iterate over each shard and use the statistics in `results` to guide where
1100        // each row will go.
1101        //
1102        // This involves doing unsynchronized writes to the table (ptr::copy_nonoverlapping)
1103        // followed by a set_len. The safety of these operations relies on the fact that:
1104        // * No one grabs a reference to the interior of `scratch` until these operations have
1105        //   finished.
1106        // * `scratch` does not overlap `data`.
1107        // * The sharding function completely partitions the set of objects in the table: one
1108        //   shard's writes will never stomp on those of another.
1109
1110        self.data.scratch.clear();
1111        self.data.scratch.reserve(prev_count);
1112        self.hash
1113            .mut_shards()
1114            .par_iter_mut()
1115            .with_max_len(1)
1116            .enumerate()
1117            .for_each(|(shard_id, shard)| {
1118                let shard_id = ShardId::from_usize(shard_id);
1119                let scratch_ptr = self.data.scratch.raw_rows();
1120                let mut progress =
1121                    HashMap::<Value /* timestamp */, RowId /* next row */>::default();
1122                progress.reserve(results.len());
1123                for stats in &results {
1124                    let Some(start) = stats.histogram.get(shard_id) else {
1125                        continue;
1126                    };
1127                    progress.insert(stats.value, RowId::from_usize(*start));
1128                }
1129                for TableEntry { row: row_id, .. } in shard.iter_mut() {
1130                    let row = self
1131                        .data
1132                        .get_row(*row_id)
1133                        .expect("shard should not map to a stale value");
1134                    let val = row[sort_by.index()];
1135                    let next = progress[&val];
1136                    // SAFETY: see above longer comment.
1137                    unsafe {
1138                        std::ptr::copy_nonoverlapping(
1139                            row.as_ptr(),
1140                            scratch_ptr.add(next.index() * self.n_columns) as *mut Value,
1141                            self.n_columns,
1142                        )
1143                    }
1144                    *row_id = next;
1145                    progress.insert(val, next.inc());
1146                }
1147            });
1148        // SAFETY: see above longer comment.
1149        unsafe { self.data.scratch.set_len(prev_count) };
1150        mem::swap(&mut self.data.data, &mut self.data.scratch);
1151        self.data.stale_rows = 0;
1152    }
1153    fn rehash_impl(
1154        sort_by: Option<ColumnId>,
1155        n_keys: usize,
1156        rows: &mut Rows,
1157        offsets: &mut Vec<(Value, RowId)>,
1158        hash: &mut ShardedHashTable<TableEntry>,
1159    ) {
1160        if let Some(sort_by) = sort_by {
1161            offsets.clear();
1162            rows.remove_stale(|row, old, new| {
1163                let stale_entry = get_entry_mut(row, n_keys, hash, |x| x == old)
1164                    .expect("non-stale entry not mapped in hash");
1165                *stale_entry = new;
1166                let sort_col = row[sort_by.index()];
1167                if let Some((max, _)) = offsets.last() {
1168                    if sort_col > *max {
1169                        offsets.push((sort_col, new));
1170                    }
1171                } else {
1172                    offsets.push((sort_col, new));
1173                }
1174            })
1175        } else {
1176            rows.remove_stale(|row, old, new| {
1177                let stale_entry = get_entry_mut(row, n_keys, hash, |x| x == old)
1178                    .expect("non-stale entry not mapped in hash");
1179                *stale_entry = new;
1180            })
1181        }
1182    }
1183
1184    fn rehash(&mut self) {
1185        self.generation = self.generation.inc();
1186        Self::rehash_impl(
1187            self.sort_by,
1188            self.n_keys,
1189            &mut self.data,
1190            &mut self.offsets,
1191            &mut self.hash,
1192        )
1193    }
1194}
1195
1196fn get_entry(
1197    row: &[Value],
1198    n_keys: usize,
1199    table: &ShardedHashTable<TableEntry>,
1200    test: impl Fn(RowId) -> bool,
1201) -> Option<RowId> {
1202    let (shard, hash) = hash_code(table.shard_data(), row, n_keys);
1203    table
1204        .get_shard(shard)
1205        .find(hash, |ent| {
1206            ent.hashcode == hash as HashCode && test(ent.row)
1207        })
1208        .map(|ent| ent.row)
1209}
1210
1211fn get_entry_mut<'a>(
1212    row: &[Value],
1213    n_keys: usize,
1214    table: &'a mut ShardedHashTable<TableEntry>,
1215    test: impl Fn(RowId) -> bool,
1216) -> Option<&'a mut RowId> {
1217    let (shard, hash) = hash_code(table.shard_data(), row, n_keys);
1218    table.mut_shards()[shard.index()]
1219        .find_mut(hash, |ent| {
1220            ent.hashcode == hash as HashCode && test(ent.row)
1221        })
1222        .map(|ent| &mut ent.row)
1223}
1224
1225fn hash_code(shard_data: ShardData, row: &[Value], n_keys: usize) -> (ShardId, u64) {
1226    let mut hasher = FxHasher::default();
1227    for val in &row[0..n_keys] {
1228        hasher.write_usize(val.index());
1229    }
1230    let full_code = hasher.finish();
1231    // We keep this cast here to allow for experimenting with HashCode=u32.
1232    #[allow(clippy::unnecessary_cast)]
1233    (shard_data.shard_id(full_code), full_code as HashCode as u64)
1234}
1235
1236/// A simple struct for packaging up pending mutations to a `SortedWritesTable`.
1237struct PendingState {
1238    pending_rows: DenseIdMap<ShardId, SegQueue<RowBuffer>>,
1239    pending_removals: DenseIdMap<ShardId, SegQueue<ArbitraryRowBuffer>>,
1240    total_removals: AtomicUsize,
1241    total_rows: AtomicUsize,
1242}
1243
1244impl PendingState {
1245    fn new(shard_data: ShardData) -> PendingState {
1246        let n_shards = shard_data.n_shards();
1247        let mut pending_rows = DenseIdMap::with_capacity(n_shards);
1248        let mut pending_removals = DenseIdMap::with_capacity(n_shards);
1249        for i in 0..n_shards {
1250            pending_rows.insert(ShardId::from_usize(i), SegQueue::default());
1251            pending_removals.insert(ShardId::from_usize(i), SegQueue::default());
1252        }
1253
1254        PendingState {
1255            pending_rows,
1256            pending_removals,
1257            total_removals: AtomicUsize::new(0),
1258            total_rows: AtomicUsize::new(0),
1259        }
1260    }
1261    fn clear(&self) {
1262        for (_, queue) in self.pending_rows.iter() {
1263            while queue.pop().is_some() {}
1264        }
1265
1266        for (_, queue) in self.pending_removals.iter() {
1267            while queue.pop().is_some() {}
1268        }
1269    }
1270
1271    /// This is only really used in debugging, but it's annoying enough to write
1272    /// that it may help to have around.
1273    ///
1274    /// We also, however, use it in the clone impl (which should only be called when pending state
1275    /// is empty).
1276    fn deep_copy(&self) -> PendingState {
1277        let mut pending_rows = DenseIdMap::new();
1278        let mut pending_removals = DenseIdMap::new();
1279        fn drain_queue<T>(queue: &SegQueue<T>) -> Vec<T> {
1280            let mut res = Vec::new();
1281            while let Some(x) = queue.pop() {
1282                res.push(x);
1283            }
1284            res
1285        }
1286        for (shard, queue) in self.pending_rows.iter() {
1287            let contents = drain_queue(queue);
1288            let new_queue = SegQueue::default();
1289            for x in contents {
1290                new_queue.push(x.clone());
1291                queue.push(x);
1292            }
1293            pending_rows.insert(shard, new_queue);
1294        }
1295
1296        for (shard, queue) in self.pending_removals.iter() {
1297            let contents = drain_queue(queue);
1298            let new_queue = SegQueue::default();
1299            for x in contents {
1300                new_queue.push(x.clone());
1301                queue.push(x);
1302            }
1303            pending_removals.insert(shard, new_queue);
1304        }
1305
1306        PendingState {
1307            pending_rows,
1308            pending_removals,
1309            total_removals: AtomicUsize::new(self.total_removals.load(Ordering::Acquire)),
1310            total_rows: AtomicUsize::new(self.total_rows.load(Ordering::Acquire)),
1311        }
1312    }
1313}
1314
1315/// A trait that encapsulates the logic of potentially checking that written
1316/// columns appear in sorted order.
1317///
1318/// For rows that are sorted by a column, an OrderingChecker asserts that all
1319/// new rows have the same value in that column, and that the column is greater
1320/// than or equal to the column value coming in. For rows not sorted, these
1321/// checks become no-ops.
1322trait OrderingChecker: Clone + Send + Sync {
1323    /// Check any invariants locally, updating the state of the checker when
1324    /// doing so.
1325    fn check_local(&mut self, row: &[Value]);
1326    /// Combine the states of multiple checkers, returning a new checker with
1327    /// all information assimilated. This is the checker that is suitable for
1328    /// calling `update_offsets` with.
1329    fn check_global<'a>(checkers: impl Iterator<Item = &'a Self>) -> Self
1330    where
1331        Self: 'a;
1332    /// Update the sorted offset vector with the current state of the checker.
1333    fn update_offsets(&self, start: RowId, offsets: &mut Vec<(Value, RowId)>);
1334}
1335
1336impl OrderingChecker for () {
1337    fn check_local(&mut self, _: &[Value]) {}
1338    fn check_global<'a>(_: impl Iterator<Item = &'a ()>) {}
1339    fn update_offsets(&self, _: RowId, _: &mut Vec<(Value, RowId)>) {}
1340}
1341
1342#[derive(Copy, Clone)]
1343struct SortChecker {
1344    col: ColumnId,
1345    baseline: Option<Value>,
1346    current: Option<Value>,
1347}
1348
1349impl OrderingChecker for SortChecker {
1350    fn check_local(&mut self, row: &[Value]) {
1351        let val = row[self.col.index()];
1352        if let Some(cur) = self.current {
1353            assert_eq!(
1354                cur, val,
1355                "concurrently inserting rows with different sort keys"
1356            );
1357        } else {
1358            self.current = Some(val);
1359            if let Some(baseline) = self.baseline {
1360                assert!(val >= baseline, "inserted row violates sort order");
1361            }
1362        }
1363    }
1364
1365    fn check_global<'a>(mut checkers: impl Iterator<Item = &'a Self>) -> Self {
1366        let Some(start) = checkers.next() else {
1367            return SortChecker {
1368                col: ColumnId::new(!0),
1369                baseline: None,
1370                current: None,
1371            };
1372        };
1373        let mut expected = start.current;
1374        for checker in checkers {
1375            assert_eq!(checker.baseline, start.baseline);
1376            match (&mut expected, checker.current) {
1377                (None, None) => {}
1378                (cur @ None, Some(x)) => {
1379                    *cur = Some(x);
1380                }
1381                (Some(_), None) => {}
1382                (Some(x), Some(y)) => {
1383                    assert_eq!(
1384                        *x, y,
1385                        "concurrently inserting rows with different sort keys"
1386                    );
1387                }
1388            }
1389        }
1390        SortChecker {
1391            col: start.col,
1392            baseline: start.baseline,
1393            current: expected,
1394        }
1395    }
1396
1397    fn update_offsets(&self, start: RowId, offsets: &mut Vec<(Value, RowId)>) {
1398        if let Some(cur) = self.current {
1399            if let Some((max, _)) = offsets.last() {
1400                if cur > *max {
1401                    offsets.push((cur, start));
1402                }
1403            } else {
1404                offsets.push((cur, start));
1405            }
1406        }
1407    }
1408}
1409
1410/// A type similar to a SortedWritesTable used to buffer outputs. The main thing
1411/// that StagedOutputs handles is running the merge function for a table on
1412/// multiple updates to the same key that show up in the same round of
1413/// insertions.
1414struct StagedOutputs {
1415    shard_data: ShardData,
1416    n_keys: usize,
1417    hash: Pooled<HashTable<TableEntry>>,
1418    rows: RowBuffer,
1419    n_stale: usize,
1420    scratch: Pooled<Vec<Value>>,
1421}
1422
1423impl StagedOutputs {
1424    fn rows(&self) -> impl Iterator<Item = &[Value]> {
1425        self.rows.iter()
1426    }
1427    fn new(n_keys: usize, n_cols: usize, capacity: usize) -> Self {
1428        let mut res = with_pool_set(|ps| StagedOutputs {
1429            shard_data: ShardData::new(1),
1430            n_keys,
1431            n_stale: 0,
1432            hash: ps.get(),
1433            rows: RowBuffer::new(n_cols),
1434            scratch: ps.get(),
1435        });
1436        res.hash.reserve(capacity, TableEntry::hashcode);
1437        res.rows.reserve(capacity);
1438        res
1439    }
1440    fn clear(&mut self) {
1441        self.hash.clear();
1442        self.rows.clear();
1443        self.n_stale = 0;
1444    }
1445    fn len(&self) -> usize {
1446        self.rows.len() - self.n_stale
1447    }
1448
1449    fn insert(
1450        &mut self,
1451        row: &[Value],
1452        mut merge_fn: impl FnMut(&[Value], &[Value], &mut Vec<Value>) -> bool,
1453    ) {
1454        if row[0].is_stale() {
1455            return;
1456        }
1457        use hashbrown::hash_table::Entry;
1458        let (_, hc) = hash_code(self.shard_data, row, self.n_keys);
1459        let entry = self.hash.entry(
1460            hc,
1461            |te| {
1462                te.hashcode() == hc
1463                    && self.rows.get_row(te.row)[0..self.n_keys] == row[0..self.n_keys]
1464            },
1465            TableEntry::hashcode,
1466        );
1467        match entry {
1468            Entry::Occupied(mut occupied_entry) => {
1469                let cur = self.rows.get_row(occupied_entry.get().row);
1470                if merge_fn(cur, row, &mut self.scratch) {
1471                    let new = self.rows.add_row(&self.scratch);
1472                    self.rows.set_stale(occupied_entry.get().row);
1473                    self.n_stale += 1;
1474                    occupied_entry.get_mut().row = new;
1475                }
1476                self.scratch.clear();
1477            }
1478            Entry::Vacant(vacant_entry) => {
1479                let next = self.rows.add_row(row);
1480                vacant_entry.insert(TableEntry {
1481                    hashcode: hc as _,
1482                    row: next,
1483                });
1484            }
1485        }
1486    }
1487
1488    /// Write the contents of the staged outputs to the given writer, returning the initial RowId
1489    /// of the new output. Returns the number of stale values in the buffer that was appended.
1490    fn write_output(&self, output: &ParallelRowBufWriter) -> (RowId, usize) {
1491        (output.append_contents(&self.rows), self.n_stale)
1492    }
1493}