1use std::{
3 hash::{Hash, Hasher},
4 mem,
5 sync::{Arc, Mutex},
6};
7
8use crate::numeric_id::{IdVec, NumericId, define_id};
9use egglog_concurrency::{Notification, ReadOptimizedLock};
10use hashbrown::HashTable;
11use once_cell::sync::Lazy;
12use rayon::iter::ParallelIterator;
13use rustc_hash::FxHasher;
14
15use crate::{
16 OffsetRange, Subset,
17 common::{HashMap, IndexMap, ShardData, ShardId, Value},
18 offsets::{RowId, SortedOffsetSlice, SubsetRef},
19 parallel_heuristics::parallelize_index_construction,
20 pool::{Pooled, with_pool_set},
21 row_buffer::{RowBuffer, TaggedRowBuffer},
22 table_spec::{ColumnId, Generation, Offset, TableVersion, WrappedTableRef},
23};
24
25#[cfg(test)]
26mod tests;
27
28#[derive(Clone)]
29pub(crate) struct TableEntry<T> {
30 hash: u64,
31 key: RowId,
33 vals: T,
34}
35
36#[derive(Clone)]
37pub(crate) struct Index<TI> {
38 key: Vec<ColumnId>,
39 updated_to: TableVersion,
40 table: TI,
41}
42
43impl<TI: IndexBase> Index<TI> {
44 pub(crate) fn new(key: Vec<ColumnId>, table: TI) -> Self {
45 Index {
46 key,
47 updated_to: TableVersion {
48 major: Generation::new(0),
49 minor: Offset::new(0),
50 },
51 table,
52 }
53 }
54
55 pub(crate) fn get_subset<'a>(&'a self, key: &'a TI::Key) -> Option<SubsetRef<'a>> {
58 self.table.get_subset(key)
59 }
60
61 pub(crate) fn needs_refresh(&self, table: WrappedTableRef) -> bool {
62 table.version() != self.updated_to
63 }
64
65 pub(crate) fn refresh(&mut self, table: WrappedTableRef) {
66 let cur_version = table.version();
67 if cur_version == self.updated_to {
68 return;
69 }
70 let subset = if cur_version.major != self.updated_to.major {
71 self.table.clear();
72 table.all()
73 } else {
74 table.updates_since(self.updated_to.minor)
75 };
76 if parallelize_index_construction(subset.size()) {
77 self.table.merge_parallel(&self.key, table, subset.as_ref());
78 } else {
79 self.refresh_serial(table, subset);
80 }
81
82 self.updated_to = cur_version;
83 }
84
85 pub(crate) fn refresh_serial(&mut self, table: WrappedTableRef, subset: Subset) {
90 let mut buf = TaggedRowBuffer::new(self.key.len());
91 let mut cur = Offset::new(0);
92 loop {
93 buf.clear();
94 if let Some(next) =
95 table.scan_project(subset.as_ref(), &self.key, cur, 1024, &[], &mut buf)
96 {
97 cur = next;
98 self.table.merge_rows(&buf);
99 } else {
100 self.table.merge_rows(&buf);
101 break;
102 }
103 }
104 }
105
106 pub(crate) fn for_each(&self, f: impl FnMut(&TI::Key, SubsetRef)) {
107 self.table.for_each(f);
108 }
109
110 pub(crate) fn len(&self) -> usize {
111 self.table.len()
112 }
113}
114
115pub(crate) struct SubsetTable {
116 keys: RowBuffer,
117 hash: Pooled<HashTable<TableEntry<BufferedSubset>>>,
118}
119
120impl Clone for SubsetTable {
121 fn clone(&self) -> Self {
122 SubsetTable {
123 keys: self.keys.clone(),
124 hash: Pooled::cloned(&self.hash),
125 }
126 }
127}
128
129impl SubsetTable {
130 fn new(key_arity: usize) -> SubsetTable {
131 SubsetTable {
132 keys: RowBuffer::new(key_arity),
133 hash: with_pool_set(|ps| ps.get()),
134 }
135 }
136}
137
138pub(crate) trait IndexBase {
139 type Key: ?Sized;
143
144 type WriteKey: ?Sized;
148
149 fn clear(&mut self);
151 fn get_subset(&self, key: &Self::Key) -> Option<SubsetRef<'_>>;
153 fn add_row(&mut self, key: &Self::WriteKey, row: RowId);
155 fn merge_rows(&mut self, buf: &TaggedRowBuffer);
157 fn for_each(&self, f: impl FnMut(&Self::Key, SubsetRef));
159 fn len(&self) -> usize;
161
162 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef);
163}
164
165struct ColumnIndexShard {
166 table: Pooled<IndexMap<Value, BufferedSubset>>,
167 subsets: SubsetBuffer,
168}
169
170impl Clone for ColumnIndexShard {
171 fn clone(&self) -> Self {
172 ColumnIndexShard {
173 table: Pooled::cloned(&self.table),
174 subsets: self.subsets.clone(),
175 }
176 }
177}
178
179#[derive(Clone)]
180pub struct ColumnIndex {
181 shard_data: ShardData,
183 shards: IdVec<ShardId, ColumnIndexShard>,
184}
185
186impl IndexBase for ColumnIndex {
187 type Key = Value;
188 type WriteKey = [Value];
189 fn clear(&mut self) {
190 for (_, shard) in self.shards.iter_mut() {
191 for (_, subset) in shard.table.drain(..) {
192 match subset {
193 BufferedSubset::Dense(_) => {}
194 BufferedSubset::Sparse(buffered_vec) => {
195 shard.subsets.return_vec(buffered_vec);
196 }
197 }
198 }
199 }
200 }
201
202 fn get_subset<'a>(&'a self, key: &Value) -> Option<SubsetRef<'a>> {
203 let shard = self.shard_data.get_shard(key, &self.shards);
204 shard.table.get(key).map(|x| x.as_ref(&shard.subsets))
205 }
206 fn add_row(&mut self, vals: &[Value], row: RowId) {
207 for key in vals {
209 let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
210 unsafe {
211 shard
212 .table
213 .entry(*key)
214 .or_insert_with(BufferedSubset::empty)
215 .add_row_sorted(row, &mut shard.subsets);
216 }
217 }
218 }
219 fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
220 for (src_id, key) in buf.iter() {
221 self.add_row(key, src_id);
222 }
223 }
224 fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
225 for (subsets, (k, v)) in self
226 .shards
227 .iter()
228 .flat_map(|(_, shard)| shard.table.iter().map(|x| (&shard.subsets, x)))
229 {
230 f(k, v.as_ref(subsets));
231 }
232 }
233 fn len(&self) -> usize {
234 self.shards.iter().map(|(_, shard)| shard.table.len()).sum()
235 }
236
237 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
238 const BATCH_SIZE: usize = 1024;
239 let shard_data = self.shard_data;
240 let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
241 shard_data.n_shards(),
242 );
243 queues.resize_with(shard_data.n_shards(), || {
244 Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
245 });
246 let split_buf = |buf: TaggedRowBuffer| {
247 let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
248 split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(1));
249 for (row_id, keys) in buf.non_stale() {
250 for key in keys {
251 shard_data
252 .get_shard_mut(*key, &mut split)
253 .add_row(row_id, &[*key]);
254 }
255 }
256 for (shard_id, buf) in split.drain() {
257 if buf.is_empty() {
258 continue;
259 }
260 let first = buf.get_row(RowId::new(0)).0;
261 queues[shard_id].lock().unwrap().push((first, buf));
262 }
263 };
264
265 run_in_thread_pool_and_block(&THREAD_POOL, || {
266 rayon::in_place_scope(|inner| {
267 let mut cur = Offset::new(0);
268 loop {
269 let mut buf = TaggedRowBuffer::new(cols.len());
270 if let Some(next) =
271 table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
272 {
273 cur = next;
274 inner.spawn(move |_| split_buf(buf));
275 } else {
276 inner.spawn(move |_| split_buf(buf));
277 break;
278 }
279 }
280 });
281
282 self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
283 use indexmap::map::Entry;
284 let mut vec = queues[shard_id].lock().unwrap();
286 vec.sort_by_key(|(start, _)| *start);
287 for (_, buf) in vec.drain(..) {
288 for (row_id, key) in buf.non_stale() {
289 debug_assert_eq!(key.len(), 1);
290 match shard.table.entry(key[0]) {
291 Entry::Occupied(mut occ) => {
292 unsafe {
294 occ.get_mut().add_row_sorted(row_id, &mut shard.subsets);
295 }
296 }
297 Entry::Vacant(v) => {
298 v.insert(BufferedSubset::singleton(row_id));
299 }
300 }
301 }
302 }
303 });
304 });
305 }
306}
307
308fn run_in_thread_pool_and_block<'a>(pool: &rayon::ThreadPool, f: impl FnMut() + Send + 'a) {
320 trait LifetimeWork<'a>: FnMut() + Send + 'a {}
327
328 impl<'a, F: FnMut() + Send + 'a> LifetimeWork<'a> for F {}
329 let as_lifetime: Box<dyn LifetimeWork<'a>> = Box::new(f);
330 let mut casted_away = unsafe {
331 mem::transmute::<Box<dyn LifetimeWork<'a>>, Box<dyn LifetimeWork<'static>>>(as_lifetime)
334 };
335 let n = Arc::new(Notification::new());
336 let inner = n.clone();
337 pool.spawn(move || {
338 casted_away();
339 mem::drop(casted_away);
340 inner.notify();
341 });
342 n.wait()
343}
344
345impl ColumnIndex {
346 pub(crate) fn new() -> ColumnIndex {
347 with_pool_set(|ps| {
348 let shard_data = ShardData::new(num_shards());
349 let mut shards = IdVec::with_capacity(shard_data.n_shards());
350 shards.resize_with(shard_data.n_shards(), || ColumnIndexShard {
351 table: ps.get(),
352 subsets: SubsetBuffer::default(),
353 });
354 ColumnIndex { shard_data, shards }
355 })
356 }
357}
358
359#[derive(Clone)]
360struct TupleIndexShard {
361 table: SubsetTable,
362 subsets: SubsetBuffer,
363}
364
365#[derive(Clone)]
367pub struct TupleIndex {
368 shard_data: ShardData,
371 shards: IdVec<ShardId, TupleIndexShard>,
372}
373
374impl TupleIndex {
375 pub(crate) fn new(key_arity: usize) -> TupleIndex {
376 let shard_data = ShardData::new(num_shards());
377 let mut shards = IdVec::with_capacity(shard_data.n_shards());
378 shards.resize_with(shard_data.n_shards(), || TupleIndexShard {
379 table: SubsetTable::new(key_arity),
380 subsets: SubsetBuffer::default(),
381 });
382 TupleIndex { shard_data, shards }
383 }
384}
385
386impl IndexBase for TupleIndex {
387 type Key = [Value];
388 type WriteKey = Self::Key;
389
390 fn clear(&mut self) {
391 for (_, shard) in self.shards.iter_mut() {
392 shard.table.keys.clear();
393 for entry in shard.table.hash.drain() {
394 match entry.vals {
395 BufferedSubset::Dense(_) => {}
396 BufferedSubset::Sparse(v) => {
397 shard.subsets.return_vec(v);
398 }
399 }
400 }
401 }
402 }
403
404 fn get_subset<'a>(&'a self, key: &[Value]) -> Option<SubsetRef<'a>> {
405 let hash = hash_key(key);
406 let shard = &self.shards[self.shard_data.shard_id(hash)];
407 let entry = shard.table.hash.find(hash, |entry| {
408 entry.hash == hash && shard.table.keys.get_row(entry.key) == key
409 })?;
410 Some(entry.vals.as_ref(&shard.subsets))
411 }
412
413 fn add_row(&mut self, key: &[Value], row: RowId) {
414 use hashbrown::hash_table::Entry;
415 let hash = hash_key(key);
416 let shard = &mut self.shards[self.shard_data.shard_id(hash)];
417 let table_entry = shard.table.hash.entry(
418 hash,
419 |entry| entry.hash == hash && shard.table.keys.get_row(entry.key) == key,
420 |ent| ent.hash,
421 );
422 match table_entry {
423 Entry::Occupied(mut occ) => {
424 unsafe {
426 occ.get_mut().vals.add_row_sorted(row, &mut shard.subsets);
427 }
428 }
429 Entry::Vacant(v) => {
430 let key_id = shard.table.keys.add_row(key);
431 let subset = BufferedSubset::singleton(row);
432 v.insert(TableEntry {
433 hash,
434 key: key_id,
435 vals: subset,
436 });
437 }
438 }
439 }
440
441 fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
442 for (src_id, key) in buf.iter() {
443 self.add_row(key, src_id);
444 }
445 }
446 fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
447 for (_, shard) in self.shards.iter() {
448 for entry in shard.table.hash.iter() {
449 let key = shard.table.keys.get_row(entry.key);
450 f(key, entry.vals.as_ref(&shard.subsets));
451 }
452 }
453 }
454
455 fn len(&self) -> usize {
456 self.shards
457 .iter()
458 .map(|(_, shard)| shard.table.hash.len())
459 .sum()
460 }
461
462 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
463 const BATCH_SIZE: usize = 1024;
467 let shard_data = self.shard_data;
468 let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
469 shard_data.n_shards(),
470 );
471 queues.resize_with(shard_data.n_shards(), || {
472 Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
473 });
474 let split_buf = |buf: TaggedRowBuffer| {
475 let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
476 split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(cols.len()));
477 for (row_id, key) in buf.non_stale() {
478 shard_data
479 .get_shard_mut(key, &mut split)
480 .add_row(row_id, key);
481 }
482 for (shard_id, buf) in split.drain() {
483 if buf.is_empty() {
484 continue;
485 }
486 let first = buf.get_row(RowId::new(0)).0;
487 queues[shard_id].lock().unwrap().push((first, buf));
488 }
489 };
490 run_in_thread_pool_and_block(&THREAD_POOL, || {
491 rayon::scope(|scope| {
492 let mut cur = Offset::new(0);
493 loop {
494 let mut buf = TaggedRowBuffer::new(cols.len());
495 if let Some(next) =
496 table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
497 {
498 cur = next;
499 scope.spawn(move |_| split_buf(buf));
500 } else {
501 scope.spawn(move |_| split_buf(buf));
502 break;
503 }
504 }
505 });
506 self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
507 use hashbrown::hash_table::Entry;
508 let mut vec = queues[shard_id].lock().unwrap();
510 vec.sort_by_key(|(start, _)| *start);
511 for (_, buf) in vec.drain(..) {
512 for (row_id, key) in buf.non_stale() {
513 let hash = hash_key(key);
514 let table_entry = shard.table.hash.entry(
515 hash,
516 |entry| {
517 entry.hash == hash && shard.table.keys.get_row(entry.key) == key
518 },
519 |ent| ent.hash,
520 );
521 match table_entry {
522 Entry::Occupied(mut occ) => {
523 unsafe {
525 occ.get_mut()
526 .vals
527 .add_row_sorted(row_id, &mut shard.subsets);
528 }
529 }
530 Entry::Vacant(v) => {
531 let key_id = shard.table.keys.add_row(key);
532 let subset = BufferedSubset::singleton(row_id);
533 v.insert(TableEntry {
534 hash,
535 key: key_id,
536 vals: subset,
537 });
538 }
539 }
540 }
541 }
542 });
543 });
544 }
545}
546
547fn hash_key(key: &[Value]) -> u64 {
548 let mut hasher = FxHasher::default();
549 key.hash(&mut hasher);
550 hasher.finish()
551}
552
553#[derive(Default)]
560pub struct IndexCatalog<K: Clone + std::hash::Hash + Eq, I: Clone> {
561 data: ReadOptimizedLock<Vec<(K, I)>>,
562}
563
564impl<K, I: Clone> IndexCatalog<K, I>
565where
566 K: Clone + std::hash::Hash + Eq,
567{
568 pub fn new() -> Self {
569 IndexCatalog {
570 data: ReadOptimizedLock::new(Vec::new()),
571 }
572 }
573
574 pub fn map(&self, f: impl Fn(&(K, I)) -> (K, I)) -> Self {
575 let vec = self.data.read().iter().map(f).collect();
576 IndexCatalog {
577 data: ReadOptimizedLock::new(vec),
578 }
579 }
580
581 pub fn update(&mut self, f: impl Fn(&K, &mut I)) {
582 for (k, i) in self.data.as_mut_ref() {
583 f(k, i)
584 }
585 }
586
587 pub fn get_or_insert(&self, k: K, init: impl FnOnce() -> I) -> I {
588 let data = self.data.read();
589 let entry = data.iter().find(|(k1, _)| k1 == &k);
590 if let Some(entry) = entry {
591 entry.1.clone()
592 } else {
593 drop(data);
594 let mut data = self.data.lock();
595 if let Some(entry) = data.iter().find(|(k1, _)| k1 == &k) {
596 entry.1.clone()
597 } else {
598 let index = init();
599 data.push((k, index.clone()));
600 index
601 }
602 }
603 }
604}
605
606define_id!(BufferIndex, u32, "an index into a subset buffer");
607
608struct SubsetBuffer {
618 buf: Pooled<Vec<RowId>>,
619 free_list: FreeList,
620}
621
622impl Clone for SubsetBuffer {
623 fn clone(&self) -> Self {
624 SubsetBuffer {
625 buf: Pooled::cloned(&self.buf),
626 free_list: self.free_list.clone(),
627 }
628 }
629}
630
631impl Default for SubsetBuffer {
632 fn default() -> SubsetBuffer {
633 with_pool_set(|ps| SubsetBuffer {
634 buf: ps.get(),
635 free_list: Default::default(),
636 })
637 }
638}
639
640impl SubsetBuffer {
641 fn new_vec(&mut self, rows: impl ExactSizeIterator<Item = RowId>) -> BufferedVec {
642 let len = rows.len();
643 if let Some(v) = self.free_list.get_size_class(len).pop() {
644 return self.fill_at(v, rows);
645 }
646 let start = BufferIndex::from_usize(self.buf.len());
647 self.buf.resize(
648 start.index() + len.next_power_of_two(),
649 RowId::new(u32::MAX),
650 );
651 self.fill_at(start, rows)
652 }
653
654 fn fill_at(
655 &mut self,
656 start: BufferIndex,
657 rows: impl ExactSizeIterator<Item = RowId>,
658 ) -> BufferedVec {
659 let mut cur = start;
660 for i in rows {
661 self.buf[cur.index()] = i;
662 cur = cur.inc();
663 }
664 BufferedVec(start, cur)
665 }
666
667 fn return_vec(&mut self, vec: BufferedVec) {
668 self.free_list.get_size_class(vec.len()).push(vec.0);
669 }
670
671 fn push_vec(&mut self, vec: BufferedVec, row: RowId) -> BufferedVec {
672 assert!(
673 vec.is_empty() || self.buf[vec.1.index() - 1] <= row,
674 "vec={vec:?}, row={row:?}, last_elt={:?}",
675 self.buf[vec.1.index() - 1]
676 );
677 if !vec.len().is_power_of_two() {
678 self.buf[vec.1.index()] = row;
679 return BufferedVec(vec.0, vec.1.inc());
680 }
681
682 let res = if let Some(v) = self.free_list.get_size_class(vec.len() + 1).pop() {
683 self.buf
684 .copy_within(vec.0.index()..vec.1.index(), v.index());
685 self.buf[v.index() + vec.len()] = row;
686 BufferedVec(v, BufferIndex::from_usize(v.index() + vec.len() + 1))
687 } else {
688 let start = self.buf.len();
689 self.buf.resize(
690 start + (vec.len() + 1).next_power_of_two(),
691 RowId::new(u32::MAX),
692 );
693 self.buf.copy_within(vec.0.index()..vec.1.index(), start);
694 self.buf[start + vec.len()] = row;
695 let end = start + vec.len() + 1;
696 BufferedVec(BufferIndex::from_usize(start), BufferIndex::from_usize(end))
697 };
698 self.return_vec(vec);
699 res
700 }
701
702 fn make_ref<'a>(&'a self, vec: &BufferedVec) -> SubsetRef<'a> {
703 let res = SubsetRef::Sparse(unsafe {
708 SortedOffsetSlice::new_unchecked(&self.buf[vec.0.index()..vec.1.index()])
709 });
710 #[cfg(debug_assertions)]
711 {
712 use crate::offsets::Offsets;
713 res.offsets(|x| assert_ne!(x.rep(), u32::MAX))
714 }
715 res
716 }
717}
718
719#[derive(Debug, Clone)]
729pub(crate) struct BufferedVec(BufferIndex, BufferIndex);
730
731impl Default for BufferedVec {
732 fn default() -> Self {
733 BufferedVec(BufferIndex::new(0), BufferIndex::new(0))
734 }
735}
736
737impl BufferedVec {
738 fn is_empty(&self) -> bool {
739 self.0 == self.1
740 }
741 fn len(&self) -> usize {
742 self.1.index() - self.0.index()
743 }
744}
745
746#[derive(Clone)]
747pub(crate) enum BufferedSubset {
748 Dense(OffsetRange),
749 Sparse(BufferedVec),
750}
751
752impl BufferedSubset {
753 unsafe fn add_row_sorted(&mut self, row: RowId, buf: &mut SubsetBuffer) {
755 match self {
756 BufferedSubset::Dense(range) => {
757 if range.end == range.start {
758 range.start = row;
759 range.end = row.inc();
760 return;
761 }
762 if range.end == row {
763 range.end = row.inc();
764 return;
765 }
766 let mut v = buf.new_vec((range.start.rep()..range.end.rep()).map(RowId::new));
767 v = buf.push_vec(v, row);
768 *self = BufferedSubset::Sparse(v);
769 }
770 BufferedSubset::Sparse(vec) => *vec = buf.push_vec(mem::take(vec), row),
771 }
772 }
773
774 fn empty() -> Self {
775 BufferedSubset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
776 }
777
778 fn singleton(row: RowId) -> Self {
779 BufferedSubset::Dense(OffsetRange::new(row, row.inc()))
780 }
781
782 fn as_ref<'a>(&self, buf: &'a SubsetBuffer) -> SubsetRef<'a> {
783 match self {
784 BufferedSubset::Dense(range) => SubsetRef::Dense(*range),
785 BufferedSubset::Sparse(vec) => buf.make_ref(vec),
786 }
787 }
788}
789
790fn num_shards() -> usize {
791 let n_threads = rayon::current_num_threads();
792 if n_threads == 1 { 1 } else { n_threads * 2 }
793}
794
795static THREAD_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
803 rayon::ThreadPoolBuilder::new()
804 .num_threads(rayon::current_num_threads())
805 .build()
806 .unwrap()
807});
808
809#[derive(Default, Clone)]
814pub(super) struct FreeList {
815 data: HashMap<usize, Vec<BufferIndex>>,
816}
817impl FreeList {
818 fn get_size_class(&mut self, size: usize) -> &mut Vec<BufferIndex> {
819 let size_class = size.next_power_of_two();
820 self.data.entry(size_class).or_default()
821 }
822}