egglog_core_relations/hash_index/
mod.rs

1//! Hash-based secondary indexes.
2use std::{
3    cmp,
4    hash::{Hash, Hasher},
5    mem,
6    sync::{Arc, Mutex},
7};
8
9use crate::{
10    common::IndexMap,
11    numeric_id::{IdVec, NumericId, define_id},
12};
13use egglog_concurrency::{Notification, ReadOptimizedLock};
14use hashbrown::HashTable;
15use indexmap::map::Entry;
16use once_cell::sync::Lazy;
17use rayon::iter::ParallelIterator;
18use rustc_hash::FxHasher;
19
20use crate::{
21    OffsetRange, Subset,
22    common::{ShardData, ShardId, Value},
23    offsets::{RowId, SortedOffsetSlice, SubsetRef},
24    parallel_heuristics::parallelize_index_construction,
25    pool::{Pooled, with_pool_set},
26    row_buffer::{RowBuffer, TaggedRowBuffer},
27    table_spec::{ColumnId, Generation, Offset, TableVersion, WrappedTableRef},
28};
29
30#[cfg(test)]
31mod tests;
32
33#[derive(Clone)]
34pub(crate) struct TableEntry<T> {
35    hash: u64,
36    /// Points into `keys`
37    key: RowId,
38    vals: T,
39}
40
41#[derive(Clone)]
42pub(crate) struct Index<TI> {
43    key: Vec<ColumnId>,
44    updated_to: TableVersion,
45    table: TI,
46}
47
48impl<TI: IndexBase> Index<TI> {
49    pub(crate) fn new(key: Vec<ColumnId>, table: TI) -> Self {
50        Index {
51            key,
52            updated_to: TableVersion {
53                major: Generation::new(0),
54                minor: Offset::new(0),
55            },
56            table,
57        }
58    }
59
60    /// Get the nonempty subset of rows associated with this key, if there is
61    /// one.
62    pub(crate) fn get_subset<'a>(&'a self, key: &'a TI::Key) -> Option<SubsetRef<'a>> {
63        self.table.get_subset(key)
64    }
65
66    pub(crate) fn needs_refresh(&self, table: WrappedTableRef) -> bool {
67        table.version() != self.updated_to
68    }
69
70    pub(crate) fn refresh(&mut self, table: WrappedTableRef) {
71        let cur_version = table.version();
72        if cur_version == self.updated_to {
73            return;
74        }
75        let is_full = cur_version.major != self.updated_to.major;
76        let subset = if is_full {
77            self.table.clear();
78            table.all()
79        } else {
80            table.updates_since(self.updated_to.minor)
81        };
82        if parallelize_index_construction(subset.size()) {
83            self.table.merge_parallel(&self.key, table, subset.as_ref());
84        } else if is_full {
85            self.table.rebuild_full(&self.key, table, subset.as_ref());
86        } else {
87            self.refresh_serial(table, subset);
88        }
89
90        self.updated_to = cur_version;
91    }
92
93    /// Update the contents of the index to the current version of the table.
94    ///
95    /// The index is guaranteed to be up to date until `merge` is called on the
96    /// table again.
97    pub(crate) fn refresh_serial(&mut self, table: WrappedTableRef, subset: Subset) {
98        let mut buf = TaggedRowBuffer::new(self.key.len());
99        let mut cur = Offset::new(0);
100        loop {
101            buf.clear();
102            if let Some(next) =
103                table.scan_project(subset.as_ref(), &self.key, cur, 1024, &[], &mut buf)
104            {
105                cur = next;
106                self.table.merge_rows(&buf);
107            } else {
108                self.table.merge_rows(&buf);
109                break;
110            }
111        }
112    }
113
114    pub(crate) fn for_each(&self, f: impl FnMut(&TI::Key, SubsetRef)) {
115        self.table.for_each(f);
116    }
117
118    pub(crate) fn len(&self) -> usize {
119        self.table.len()
120    }
121}
122
123pub(crate) struct SubsetTable {
124    keys: RowBuffer,
125    hash: Pooled<HashTable<TableEntry<BufferedSubset>>>,
126}
127
128impl Clone for SubsetTable {
129    fn clone(&self) -> Self {
130        SubsetTable {
131            keys: self.keys.clone(),
132            hash: Pooled::cloned(&self.hash),
133        }
134    }
135}
136
137impl SubsetTable {
138    fn new(key_arity: usize) -> SubsetTable {
139        SubsetTable {
140            keys: RowBuffer::new(key_arity),
141            hash: with_pool_set(|ps| ps.get()),
142        }
143    }
144}
145
146pub(crate) trait IndexBase {
147    /// The type of keys for this index.  Keys can have validity constraints
148    /// (e.g. the arity of a slice for `Key = [Value]`). If keys are invalid,
149    /// these methods can panic.
150    type Key: ?Sized;
151
152    /// The write-side keys for an index. This is generally the same as `Key`, but Column-level
153    /// indexes allow for multiple values (e.g. a subset of a row) to be provided, allowing the
154    /// index to effectively cover multiple columns. This is useful for rebuilding.
155    type WriteKey: ?Sized;
156
157    /// Remove any existing entries in the index.
158    fn clear(&mut self);
159    /// Get the subset corresponding to this key, if there is one.
160    fn get_subset(&self, key: &Self::Key) -> Option<SubsetRef<'_>>;
161    /// Add the given key and row id to the table.
162    fn add_row(&mut self, key: &Self::WriteKey, row: RowId);
163    /// Merge the contents of the [`TaggedRowBuffer`] into the table.
164    fn merge_rows(&mut self, buf: &TaggedRowBuffer);
165    /// Call `f` over the elements of the index.
166    fn for_each(&self, f: impl FnMut(&Self::Key, SubsetRef));
167    /// The number of keys in the index.
168    fn len(&self) -> usize;
169
170    fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef);
171
172    /// Bulk-rebuild this index from scratch (called on major version change after clear()).
173    /// The default implementation batches via `scan_project`+`merge_rows`. Implementations
174    /// can override this for more efficient bulk construction.
175    fn rebuild_full(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
176        let mut buf = TaggedRowBuffer::new(cols.len());
177        let mut cur = Offset::new(0);
178        loop {
179            buf.clear();
180            if let Some(next) = table.scan_project(subset, cols, cur, 1024, &[], &mut buf) {
181                cur = next;
182                self.merge_rows(&buf);
183            } else {
184                self.merge_rows(&buf);
185                break;
186            }
187        }
188    }
189}
190
191struct ColumnIndexShard {
192    table: Pooled<IndexMap<Value, BufferedSubset>>,
193    subsets: SubsetBuffer,
194}
195
196impl Clone for ColumnIndexShard {
197    fn clone(&self) -> Self {
198        ColumnIndexShard {
199            table: Pooled::cloned(&self.table),
200            subsets: self.subsets.clone(),
201        }
202    }
203}
204
205#[derive(Clone)]
206pub struct ColumnIndex {
207    // A specialized index used when we are indexing on a single column.
208    shard_data: ShardData,
209    shards: IdVec<ShardId, ColumnIndexShard>,
210}
211
212impl IndexBase for ColumnIndex {
213    type Key = Value;
214    type WriteKey = [Value];
215    fn clear(&mut self) {
216        for (_, shard) in self.shards.iter_mut() {
217            for (_, subset) in shard.table.drain(..) {
218                match subset {
219                    BufferedSubset::Dense(_) => {}
220                    BufferedSubset::Sparse(buffered_vec) => {
221                        shard.subsets.return_vec(buffered_vec);
222                    }
223                }
224            }
225        }
226    }
227
228    fn get_subset<'a>(&'a self, key: &Value) -> Option<SubsetRef<'a>> {
229        let shard = self.shard_data.get_shard(key, &self.shards);
230        shard.table.get(key).map(|x| x.as_ref(&shard.subsets))
231    }
232    fn add_row(&mut self, vals: &[Value], row: RowId) {
233        // SAFETY: everything in `table` comes from `subsets`.
234        for key in vals {
235            let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
236            unsafe {
237                shard
238                    .table
239                    .entry(*key)
240                    .or_insert_with(BufferedSubset::empty)
241                    .add_row_sorted(row, &mut shard.subsets);
242            }
243        }
244    }
245    fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
246        for (src_id, key) in buf.iter() {
247            self.add_row(key, src_id);
248        }
249    }
250
251    fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
252        for (subsets, (k, v)) in self
253            .shards
254            .iter()
255            .flat_map(|(_, shard)| shard.table.iter().map(|x| (&shard.subsets, x)))
256        {
257            f(k, v.as_ref(subsets));
258        }
259    }
260
261    fn len(&self) -> usize {
262        self.shards.iter().map(|(_, shard)| shard.table.len()).sum()
263    }
264
265    fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
266        const BATCH_SIZE: usize = 1024;
267        let shard_data = self.shard_data;
268        let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
269            shard_data.n_shards(),
270        );
271        queues.resize_with(shard_data.n_shards(), || {
272            Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
273        });
274        let split_buf = |buf: TaggedRowBuffer| {
275            let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
276            split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(1));
277            for (row_id, keys) in buf.iter() {
278                for key in keys {
279                    shard_data
280                        .get_shard_mut(*key, &mut split)
281                        .add_row(row_id, &[*key]);
282                }
283            }
284            for (shard_id, buf) in split.drain() {
285                if buf.is_empty() {
286                    continue;
287                }
288                let first = buf.get_row(RowId::new(0)).0;
289                queues[shard_id].lock().unwrap().push((first, buf));
290            }
291        };
292
293        run_in_thread_pool_and_block(&THREAD_POOL, || {
294            rayon::in_place_scope(|inner| {
295                let mut cur = Offset::new(0);
296                loop {
297                    let mut buf = TaggedRowBuffer::new(cols.len());
298                    if let Some(next) =
299                        table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
300                    {
301                        cur = next;
302                        inner.spawn(move |_| split_buf(buf));
303                    } else {
304                        inner.spawn(move |_| split_buf(buf));
305                        break;
306                    }
307                }
308            });
309
310            self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
311                // Sort the vector by start row id to ensure we populate subsets in sorted order.
312                let mut vec = queues[shard_id].lock().unwrap();
313                vec.sort_by_key(|(start, _)| *start);
314                for (_, buf) in vec.drain(..) {
315                    for (row_id, key) in buf.iter() {
316                        debug_assert_eq!(key.len(), 1);
317                        match shard.table.entry(key[0]) {
318                            Entry::Occupied(mut occ) => {
319                                // SAFETY: all of the buffered vectors in this map come from `subsets`.
320                                unsafe {
321                                    occ.get_mut().add_row_sorted(row_id, &mut shard.subsets);
322                                }
323                            }
324                            Entry::Vacant(v) => {
325                                v.insert(BufferedSubset::singleton(row_id));
326                            }
327                        }
328                    }
329                }
330            });
331        });
332    }
333
334    /// Sort-based full rebuild: collect all (value, row_id) pairs, sort by (value, row_id),
335    /// then build each key's subset with a single pre-sized allocation. Compared to `merge_rows`,
336    /// this eliminates the doubling memmoves from `push_vec` that occur in the row-at-a-time `add_row` path.
337    ///
338    /// Supports multiple columns (e.g. rebuild_index covering all value columns): pairs from
339    /// all columns are merged into one sorted list, so each value maps to the union of rows
340    /// containing it in any of the covered columns.
341    fn rebuild_full(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
342        let n = table.all().size() * cols.len();
343
344        let mut pairs: Vec<(Value, RowId)> = Vec::with_capacity(n);
345        for &col in cols {
346            table.for_each_col(subset, col, &mut |row_id, val| {
347                pairs.push((val, row_id));
348            });
349        }
350
351        radix_sort_pairs_by_value(&mut pairs);
352        if cols.len() > 1 {
353            // Remove duplicates (same value in multiple columns of the same row).
354            pairs.dedup();
355        }
356
357        let mut i = 0;
358        while i < pairs.len() {
359            let key = pairs[i].0;
360            let start = i;
361            let mut first = pairs[i].1;
362            let mut last = pairs[i].1;
363            while i < pairs.len() && pairs[i].0 == key {
364                last = cmp::max(last, pairs[i].1);
365                first = cmp::min(first, pairs[i].1);
366                i += 1;
367            }
368            let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
369            let count = i - start;
370            let buffered = if last.rep() - first.rep() == (count - 1) as u32 {
371                // If the row ids are contiguous, we can represent the subset as a dense range
372                // to avoid allocations
373                BufferedSubset::Dense(OffsetRange::new(first, last.inc()))
374            } else {
375                let bv = shard
376                    .subsets
377                    .new_vec(pairs[start..i].iter().map(|&(_, r)| r));
378                BufferedSubset::Sparse(bv)
379            };
380            shard.table.insert(key, buffered);
381        }
382    }
383}
384
385/// This function is an alternative for [`rayon::ThreadPool::install`] that doesn't steal work from
386/// the callee's current thread pool while waiting for `f` to finish.
387///
388/// We do this to avoid deadlocks. The whole purpose of using a separate threadpool in this module
389/// is to allow for sufficient parallelism while holding a lock on the main threadpool. That means
390/// we are not worried about an outer lock tying up a thread in the main pool.
391///
392/// On the other hand, it _is_ a bad idea to steal work on a rayon thread pool with some locks
393/// held. In particular, if another task on the thread pool _itself_ attempts to aquire the same
394/// lock, this can cause a deadlock. We saw this in the tests for this crate. The relevant lock
395/// are those around individual indexes stored in the database-level index cache.
396fn run_in_thread_pool_and_block<'a>(pool: &rayon::ThreadPool, f: impl FnMut() + Send + 'a) {
397    // NB: We don't need the heap allocations here. But we are only calling this function if
398    // we are about to do a bunch of work, so clarify is probably going to be better than (even
399    // more) unsafe code.
400
401    // Alright, here we go: pretend `f` has `'static` lifetime because we are passing it to
402    // `spawn`.
403    trait LifetimeWork<'a>: FnMut() + Send + 'a {}
404
405    impl<'a, F: FnMut() + Send + 'a> LifetimeWork<'a> for F {}
406    let as_lifetime: Box<dyn LifetimeWork<'a>> = Box::new(f);
407    let mut casted_away = unsafe {
408        // SAFETY: `casted_away` will be dropped at the end of this method. The notification used
409        // below will ensure it does not escape.
410        mem::transmute::<Box<dyn LifetimeWork<'a>>, Box<dyn LifetimeWork<'static>>>(as_lifetime)
411    };
412    let n = Arc::new(Notification::new());
413    let inner = n.clone();
414    pool.spawn(move || {
415        casted_away();
416        mem::drop(casted_away);
417        inner.notify();
418    });
419    n.wait()
420}
421
422/// Adaptive LSB radix sort for (Value, RowId) pairs, sorting by the Value field.
423///
424/// Chooses 1–4 passes of 8-bit radix sort based on the observed maximum Value, so that
425/// only the actually-used bit range is processed.  Within each Value group the original
426/// order (scan order = RowId-ascending) is preserved by the LSB stability guarantee,
427/// satisfying the add_row_sorted invariant without an explicit RowId sort.
428///
429/// Falls back to `sort_unstable()` for n < 64 where radix setup overhead dominates.
430fn radix_sort_pairs_by_value(pairs: &mut Vec<(Value, RowId)>) {
431    let n = pairs.len();
432    if n < 64 {
433        pairs.sort_unstable();
434        return;
435    }
436
437    let max_val = pairs.iter().map(|&(v, _)| v.rep()).max().unwrap_or(0);
438    let n_passes: u32 = if max_val < 256 {
439        1
440    } else if max_val < 65_536 {
441        2
442    } else if max_val < (1 << 24) {
443        3
444    } else {
445        4
446    };
447
448    let null_pair = (Value::new_const(0), RowId::new_const(0));
449    let mut buf: Vec<(Value, RowId)> = vec![null_pair; n];
450
451    let mut src: &mut Vec<(Value, RowId)> = pairs;
452    let mut dst: &mut Vec<(Value, RowId)> = &mut buf;
453
454    for pass in 0..n_passes {
455        let shift = pass * 8;
456        let mut count = [0u32; 256];
457
458        // Count occurrences of the relevant byte of each Value.
459        for pair in src.iter() {
460            let bucket = (pair.0.rep() >> shift) & 0xFF;
461            count[bucket as usize] += 1;
462        }
463
464        // Convert counts to exclusive prefix sums (start positions per bucket).
465        let mut prefix = 0u32;
466        for c in &mut count {
467            let prev = *c;
468            *c = prefix;
469            prefix += prev;
470        }
471
472        // Stable scatter: write each element to its bucket's next position.
473        for &pair in src.iter() {
474            let bucket = ((pair.0.rep() >> shift) & 0xFF) as usize;
475            dst[count[bucket] as usize] = pair;
476            count[bucket] += 1;
477        }
478
479        core::mem::swap(&mut src, &mut dst);
480    }
481
482    // After `n_passes` swaps, `src` points to the sorted data.
483    // Odd passes: src == buf.as_mut_ptr(); copy back to pairs.
484    // Even passes: src == pairs.as_mut_ptr(); already in place.
485    if n_passes % 2 == 1 {
486        dst.copy_from_slice(src);
487    }
488}
489
490impl ColumnIndex {
491    pub(crate) fn new() -> ColumnIndex {
492        with_pool_set(|ps| {
493            let shard_data = ShardData::new(num_shards());
494            let mut shards = IdVec::with_capacity(shard_data.n_shards());
495            shards.resize_with(shard_data.n_shards(), || ColumnIndexShard {
496                table: ps.get(),
497                subsets: SubsetBuffer::default(),
498            });
499            ColumnIndex { shard_data, shards }
500        })
501    }
502
503    /// Pre-reserve capacity in each shard's HashMap for `n` rows total.
504    /// Eliminates hashbrown rehashing during add_row for the small-subset path.
505    pub(crate) fn reserve_for_n_rows(&mut self, n: usize) {
506        let n_shards = self.shards.len();
507        let per_shard = n / n_shards + 2;
508        for (_, shard) in self.shards.iter_mut() {
509            shard.table.reserve(per_shard);
510        }
511    }
512
513    /// Build a single-column index for `subset` of `table`. Picks between a
514    /// sort-based bulk path and a per-row scan based on subset size: large
515    /// subsets amortize the sort overhead, small ones avoid the buffer copy.
516    pub(crate) fn build_for_subset(
517        table: WrappedTableRef,
518        subset: SubsetRef,
519        col: ColumnId,
520    ) -> ColumnIndex {
521        const SORT_BULK_THRESHOLD: usize = 512;
522        let mut res = ColumnIndex::new();
523        if subset.size() >= SORT_BULK_THRESHOLD {
524            res.rebuild_full(&[col], table, subset);
525        } else {
526            res.reserve_for_n_rows(subset.size());
527            table.for_each_col(subset, col, &mut |row_id, val| {
528                res.add_row(&[val], row_id);
529            });
530        }
531        res
532    }
533}
534
535#[derive(Clone)]
536struct TupleIndexShard {
537    table: SubsetTable,
538    subsets: SubsetBuffer,
539}
540
541/// A mapping from keys to subsets of rows.
542#[derive(Clone)]
543pub struct TupleIndex {
544    // NB: we could store RowBuffers inline and then have indexes reference
545    // (u32, RowId) instead of RowId. Trades copying off for indirections.
546    shard_data: ShardData,
547    shards: IdVec<ShardId, TupleIndexShard>,
548}
549
550impl TupleIndex {
551    pub(crate) fn new(key_arity: usize) -> TupleIndex {
552        let shard_data = ShardData::new(num_shards());
553        let mut shards = IdVec::with_capacity(shard_data.n_shards());
554        shards.resize_with(shard_data.n_shards(), || TupleIndexShard {
555            table: SubsetTable::new(key_arity),
556            subsets: SubsetBuffer::default(),
557        });
558        TupleIndex { shard_data, shards }
559    }
560}
561
562impl IndexBase for TupleIndex {
563    type Key = [Value];
564    type WriteKey = Self::Key;
565
566    fn clear(&mut self) {
567        for (_, shard) in self.shards.iter_mut() {
568            shard.table.keys.clear();
569            for entry in shard.table.hash.drain() {
570                match entry.vals {
571                    BufferedSubset::Dense(_) => {}
572                    BufferedSubset::Sparse(v) => {
573                        shard.subsets.return_vec(v);
574                    }
575                }
576            }
577        }
578    }
579
580    fn get_subset<'a>(&'a self, key: &[Value]) -> Option<SubsetRef<'a>> {
581        let hash = hash_key(key);
582        let shard = &self.shards[self.shard_data.shard_id(hash)];
583        let entry = shard.table.hash.find(hash, |entry| {
584            // SAFETY: entry.key was stored by add_row, which returns a valid RowId.
585            entry.hash == hash && unsafe { shard.table.keys.get_row_unchecked(entry.key) } == key
586        })?;
587        Some(entry.vals.as_ref(&shard.subsets))
588    }
589
590    fn add_row(&mut self, key: &[Value], row: RowId) {
591        use hashbrown::hash_table::Entry;
592        let hash = hash_key(key);
593        let shard = &mut self.shards[self.shard_data.shard_id(hash)];
594        let table_entry = shard.table.hash.entry(
595            hash,
596            // SAFETY: entry.key was stored by add_row, which returns a valid RowId.
597            |entry| {
598                entry.hash == hash
599                    && unsafe { shard.table.keys.get_row_unchecked(entry.key) } == key
600            },
601            |ent| ent.hash,
602        );
603        match table_entry {
604            Entry::Occupied(mut occ) => {
605                // SAFETY: everything in `table_entry` comes from `vals`.
606                unsafe {
607                    occ.get_mut().vals.add_row_sorted(row, &mut shard.subsets);
608                }
609            }
610            Entry::Vacant(v) => {
611                let key_id = shard.table.keys.add_row(key);
612                let subset = BufferedSubset::singleton(row);
613                v.insert(TableEntry {
614                    hash,
615                    key: key_id,
616                    vals: subset,
617                });
618            }
619        }
620    }
621
622    fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
623        for (src_id, key) in buf.iter() {
624            self.add_row(key, src_id);
625        }
626    }
627    fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
628        for (_, shard) in self.shards.iter() {
629            for entry in shard.table.hash.iter() {
630                // SAFETY: entry.key was stored by add_row, so it is always in-bounds.
631                let key = unsafe { shard.table.keys.get_row_unchecked(entry.key) };
632                f(key, entry.vals.as_ref(&shard.subsets));
633            }
634        }
635    }
636
637    fn len(&self) -> usize {
638        self.shards
639            .iter()
640            .map(|(_, shard)| shard.table.hash.len())
641            .sum()
642    }
643
644    fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
645        // The structure here is similar to the implementation for ColumnIndex, with
646        // slightly more bookkeeping needed to handle arbitrary-arity keys.
647
648        const BATCH_SIZE: usize = 1024;
649        let shard_data = self.shard_data;
650        let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
651            shard_data.n_shards(),
652        );
653        queues.resize_with(shard_data.n_shards(), || {
654            Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
655        });
656        let split_buf = |buf: TaggedRowBuffer| {
657            let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
658            split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(cols.len()));
659            for (row_id, key) in buf.iter() {
660                shard_data
661                    .get_shard_mut(key, &mut split)
662                    .add_row(row_id, key);
663            }
664            for (shard_id, buf) in split.drain() {
665                if buf.is_empty() {
666                    continue;
667                }
668                let first = buf.get_row(RowId::new(0)).0;
669                queues[shard_id].lock().unwrap().push((first, buf));
670            }
671        };
672        run_in_thread_pool_and_block(&THREAD_POOL, || {
673            rayon::scope(|scope| {
674                let mut cur = Offset::new(0);
675                loop {
676                    let mut buf = TaggedRowBuffer::new(cols.len());
677                    if let Some(next) =
678                        table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
679                    {
680                        cur = next;
681                        scope.spawn(move |_| split_buf(buf));
682                    } else {
683                        scope.spawn(move |_| split_buf(buf));
684                        break;
685                    }
686                }
687            });
688            self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
689                use hashbrown::hash_table::Entry;
690                // Sort the vector by start row id to ensure we populate subsets in sorted order.
691                let mut vec = queues[shard_id].lock().unwrap();
692                vec.sort_by_key(|(start, _)| *start);
693                for (_, buf) in vec.drain(..) {
694                    for (row_id, key) in buf.iter() {
695                        let hash = hash_key(key);
696                        let table_entry = shard.table.hash.entry(
697                            hash,
698                            // SAFETY: entry.key was stored by add_row, which returns a valid RowId.
699                            |entry| {
700                                entry.hash == hash
701                                    && unsafe { shard.table.keys.get_row_unchecked(entry.key) }
702                                        == key
703                            },
704                            |ent| ent.hash,
705                        );
706                        match table_entry {
707                            Entry::Occupied(mut occ) => {
708                                // SAFETY: everything in `table_entry` comes from `vals`.
709                                unsafe {
710                                    occ.get_mut()
711                                        .vals
712                                        .add_row_sorted(row_id, &mut shard.subsets);
713                                }
714                            }
715                            Entry::Vacant(v) => {
716                                let key_id = shard.table.keys.add_row(key);
717                                let subset = BufferedSubset::singleton(row_id);
718                                v.insert(TableEntry {
719                                    hash,
720                                    key: key_id,
721                                    vals: subset,
722                                });
723                            }
724                        }
725                    }
726                }
727            });
728        });
729    }
730}
731
732fn hash_key(key: &[Value]) -> u64 {
733    let mut hasher = FxHasher::default();
734    key.hash(&mut hasher);
735    hasher.finish()
736}
737
738/// A map from access patterns to indices.
739///
740/// Implemented as an read-optimized key-value arrays, which should be faster
741/// than concurrent hashmaps as long as # indices is smaller than say 64.
742///
743/// For simplicity we assume the index can be cloned cheaply, e.g., it's behind an [`Arc`].
744#[derive(Default)]
745pub struct IndexCatalog<K: Clone + std::hash::Hash + Eq, I: Clone> {
746    data: ReadOptimizedLock<Vec<(K, I)>>,
747}
748
749impl<K, I: Clone> IndexCatalog<K, I>
750where
751    K: Clone + std::hash::Hash + Eq,
752{
753    pub fn new() -> Self {
754        IndexCatalog {
755            data: ReadOptimizedLock::new(Vec::new()),
756        }
757    }
758
759    pub fn map(&self, f: impl Fn(&(K, I)) -> (K, I)) -> Self {
760        let vec = self.data.read().iter().map(f).collect();
761        IndexCatalog {
762            data: ReadOptimizedLock::new(vec),
763        }
764    }
765
766    pub fn update(&mut self, f: impl Fn(&K, &mut I)) {
767        for (k, i) in self.data.as_mut_ref() {
768            f(k, i)
769        }
770    }
771
772    pub fn get_or_insert(&self, k: K, init: impl FnOnce() -> I) -> I {
773        let data = self.data.read();
774        let entry = data.iter().find(|(k1, _)| k1 == &k);
775        if let Some(entry) = entry {
776            entry.1.clone()
777        } else {
778            drop(data);
779            let mut data = self.data.lock();
780            if let Some(entry) = data.iter().find(|(k1, _)| k1 == &k) {
781                entry.1.clone()
782            } else {
783                let index = init();
784                data.push((k, index.clone()));
785                index
786            }
787        }
788    }
789}
790
791define_id!(BufferIndex, u32, "an index into a subset buffer");
792
793/// A shared pool of row ids used to store sorted offset vectors with a common
794/// lifetime.
795///
796/// This is used as the backing store for subsets stored in indexes. While
797/// definitely saves some allocations, the primary use for SubsetBuffer is to
798/// make deallocation faster: with a standard [`crate::offsets::Subset`]
799/// structure stored in the index, dropping requires an O(n) traversal of the
800/// index. SubsetBuffer allows deallocation to happen in constant time (given
801/// our use of memory pools).
802struct SubsetBuffer {
803    buf: Pooled<Vec<RowId>>,
804    free_list: FreeList,
805}
806
807impl Clone for SubsetBuffer {
808    fn clone(&self) -> Self {
809        SubsetBuffer {
810            buf: Pooled::cloned(&self.buf),
811            free_list: self.free_list.clone(),
812        }
813    }
814}
815
816impl Default for SubsetBuffer {
817    fn default() -> SubsetBuffer {
818        with_pool_set(|ps| SubsetBuffer {
819            buf: ps.get(),
820            free_list: Default::default(),
821        })
822    }
823}
824
825impl SubsetBuffer {
826    fn new_vec(&mut self, rows: impl ExactSizeIterator<Item = RowId>) -> BufferedVec {
827        let len = rows.len();
828        if let Some(v) = self.free_list.get_size_class(len).pop() {
829            return self.fill_at(v, rows);
830        }
831        let start = BufferIndex::from_usize(self.buf.len());
832        self.buf.resize(
833            start.index() + len.next_power_of_two(),
834            RowId::new(u32::MAX),
835        );
836        self.fill_at(start, rows)
837    }
838
839    fn fill_at(
840        &mut self,
841        start: BufferIndex,
842        rows: impl ExactSizeIterator<Item = RowId>,
843    ) -> BufferedVec {
844        let mut cur = start;
845        for i in rows {
846            self.buf[cur.index()] = i;
847            cur = cur.inc();
848        }
849        BufferedVec(start, cur)
850    }
851
852    fn return_vec(&mut self, vec: BufferedVec) {
853        self.free_list.get_size_class(vec.len()).push(vec.0);
854    }
855
856    fn push_vec(&mut self, vec: BufferedVec, row: RowId) -> BufferedVec {
857        debug_assert!(
858            vec.is_empty() || self.buf[vec.1.index() - 1] <= row,
859            "vec={vec:?}, row={row:?}, last_elt={:?}",
860            self.buf[vec.1.index() - 1]
861        );
862        if !vec.len().is_power_of_two() {
863            self.buf[vec.1.index()] = row;
864            return BufferedVec(vec.0, vec.1.inc());
865        }
866
867        let res = if let Some(v) = self.free_list.get_size_class(vec.len() + 1).pop() {
868            self.buf
869                .copy_within(vec.0.index()..vec.1.index(), v.index());
870            self.buf[v.index() + vec.len()] = row;
871            BufferedVec(v, BufferIndex::from_usize(v.index() + vec.len() + 1))
872        } else {
873            let start = self.buf.len();
874            self.buf.resize(
875                start + (vec.len() + 1).next_power_of_two(),
876                RowId::new(u32::MAX),
877            );
878            self.buf.copy_within(vec.0.index()..vec.1.index(), start);
879            self.buf[start + vec.len()] = row;
880            let end = start + vec.len() + 1;
881            BufferedVec(BufferIndex::from_usize(start), BufferIndex::from_usize(end))
882        };
883        self.return_vec(vec);
884        res
885    }
886
887    fn make_ref<'a>(&'a self, vec: &BufferedVec) -> SubsetRef<'a> {
888        // SAFETY: if `vec` is a valid index into self.buf, it will be sorted.
889        //
890        // NB: we do not guarantee this in the type signature of BufferedVec,
891        // etc. But this is indeed safe given the usage within this module.
892        let res = SubsetRef::Sparse(unsafe {
893            SortedOffsetSlice::new_unchecked(&self.buf[vec.0.index()..vec.1.index()])
894        });
895        #[cfg(debug_assertions)]
896        {
897            use crate::offsets::Offsets;
898            res.offsets(|x| assert_ne!(x.rep(), u32::MAX))
899        }
900        res
901    }
902}
903
904/// A sorted vector of offsets stored in a [`SubsetBuffer`].
905///
906/// Note: this implements `Clone` to facilitate cloning entire indexes, but this is a _shallow_
907/// clone, making the clone operation work akin to slices in Golang. In particular: code that
908/// pushes to a clone of a `BufferedVec` can affect the original, and vice versa.
909///
910/// Business logic in this module probably shouldn't call clone explicitly. The implicit uses of
911/// clone (by other generated `Clone` implementations) are fine because they clone the
912/// `SubsetBuffer` that the `BufferedVec` points to at the same time that the vector is cloned.
913#[derive(Debug, Clone)]
914pub(crate) struct BufferedVec(BufferIndex, BufferIndex);
915
916impl Default for BufferedVec {
917    fn default() -> Self {
918        BufferedVec(BufferIndex::new(0), BufferIndex::new(0))
919    }
920}
921
922impl BufferedVec {
923    fn is_empty(&self) -> bool {
924        self.0 == self.1
925    }
926    fn len(&self) -> usize {
927        self.1.index() - self.0.index()
928    }
929}
930
931#[derive(Clone)]
932pub(crate) enum BufferedSubset {
933    Dense(OffsetRange),
934    Sparse(BufferedVec),
935}
936
937impl BufferedSubset {
938    /// *Safety:*  callers must ensure that `self` is either dense, or comes from `buf`.
939    unsafe fn add_row_sorted(&mut self, row: RowId, buf: &mut SubsetBuffer) {
940        match self {
941            BufferedSubset::Dense(range) => {
942                if range.end == range.start {
943                    range.start = row;
944                    range.end = row.inc();
945                    return;
946                }
947                if range.end == row {
948                    range.end = row.inc();
949                    return;
950                }
951                let mut v = buf.new_vec((range.start.rep()..range.end.rep()).map(RowId::new));
952                v = buf.push_vec(v, row);
953                *self = BufferedSubset::Sparse(v);
954            }
955            BufferedSubset::Sparse(vec) => *vec = buf.push_vec(mem::take(vec), row),
956        }
957    }
958
959    fn empty() -> Self {
960        BufferedSubset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
961    }
962
963    fn singleton(row: RowId) -> Self {
964        BufferedSubset::Dense(OffsetRange::new(row, row.inc()))
965    }
966
967    fn as_ref<'a>(&self, buf: &'a SubsetBuffer) -> SubsetRef<'a> {
968        match self {
969            BufferedSubset::Dense(range) => SubsetRef::Dense(*range),
970            BufferedSubset::Sparse(vec) => buf.make_ref(vec),
971        }
972    }
973}
974
975fn num_shards() -> usize {
976    let n_threads = rayon::current_num_threads();
977    if n_threads == 1 { 1 } else { n_threads * 2 }
978}
979
980/// A thread pool specifically for parallel hash index construction.
981///
982/// We use a separate thread pool here because callers can construct an index under a lock,
983/// and we do not want to take a long-running lock in the global thread pool without another
984/// way to get parallelism.
985///
986/// Earlier solutions using rayon::yield_now() were unreliable.
987static THREAD_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
988    rayon::ThreadPoolBuilder::new()
989        .num_threads(rayon::current_num_threads())
990        .build()
991        .unwrap()
992});
993
994/// A simple free list used to reuse slots in a [`SubsetBuffer`].
995///
996/// This free list works as a map from power-of-two size classes to a vector of offsets that point
997/// to the beginning of an unused vector.
998///
999/// Size classes are indexed by their log2 value (i.e., size_class = 2^idx), so a 32-entry
1000/// array covers all power-of-two sizes from 1 (idx=0) up to 2^31. This replaces the
1001/// previous HashMap with an O(1) array index + trailing_zeros().
1002#[derive(Clone, Default)]
1003pub(super) struct FreeList {
1004    data: [Vec<BufferIndex>; 32],
1005}
1006
1007impl FreeList {
1008    fn get_size_class(&mut self, size: usize) -> &mut Vec<BufferIndex> {
1009        let size_class = size.next_power_of_two();
1010        let idx = size_class.trailing_zeros() as usize;
1011        &mut self.data[idx]
1012    }
1013}