1use std::{
3 cmp,
4 hash::{Hash, Hasher},
5 mem,
6 sync::{Arc, Mutex},
7};
8
9use crate::{
10 common::IndexMap,
11 numeric_id::{IdVec, NumericId, define_id},
12};
13use egglog_concurrency::{Notification, ReadOptimizedLock};
14use hashbrown::HashTable;
15use indexmap::map::Entry;
16use once_cell::sync::Lazy;
17use rayon::iter::ParallelIterator;
18use rustc_hash::FxHasher;
19
20use crate::{
21 OffsetRange, Subset,
22 common::{ShardData, ShardId, Value},
23 offsets::{RowId, SortedOffsetSlice, SubsetRef},
24 parallel_heuristics::parallelize_index_construction,
25 pool::{Pooled, with_pool_set},
26 row_buffer::{RowBuffer, TaggedRowBuffer},
27 table_spec::{ColumnId, Generation, Offset, TableVersion, WrappedTableRef},
28};
29
30#[cfg(test)]
31mod tests;
32
33#[derive(Clone)]
34pub(crate) struct TableEntry<T> {
35 hash: u64,
36 key: RowId,
38 vals: T,
39}
40
41#[derive(Clone)]
42pub(crate) struct Index<TI> {
43 key: Vec<ColumnId>,
44 updated_to: TableVersion,
45 table: TI,
46}
47
48impl<TI: IndexBase> Index<TI> {
49 pub(crate) fn new(key: Vec<ColumnId>, table: TI) -> Self {
50 Index {
51 key,
52 updated_to: TableVersion {
53 major: Generation::new(0),
54 minor: Offset::new(0),
55 },
56 table,
57 }
58 }
59
60 pub(crate) fn get_subset<'a>(&'a self, key: &'a TI::Key) -> Option<SubsetRef<'a>> {
63 self.table.get_subset(key)
64 }
65
66 pub(crate) fn needs_refresh(&self, table: WrappedTableRef) -> bool {
67 table.version() != self.updated_to
68 }
69
70 pub(crate) fn refresh(&mut self, table: WrappedTableRef) {
71 let cur_version = table.version();
72 if cur_version == self.updated_to {
73 return;
74 }
75 let is_full = cur_version.major != self.updated_to.major;
76 let subset = if is_full {
77 self.table.clear();
78 table.all()
79 } else {
80 table.updates_since(self.updated_to.minor)
81 };
82 if parallelize_index_construction(subset.size()) {
83 self.table.merge_parallel(&self.key, table, subset.as_ref());
84 } else if is_full {
85 self.table.rebuild_full(&self.key, table, subset.as_ref());
86 } else {
87 self.refresh_serial(table, subset);
88 }
89
90 self.updated_to = cur_version;
91 }
92
93 pub(crate) fn refresh_serial(&mut self, table: WrappedTableRef, subset: Subset) {
98 let mut buf = TaggedRowBuffer::new(self.key.len());
99 let mut cur = Offset::new(0);
100 loop {
101 buf.clear();
102 if let Some(next) =
103 table.scan_project(subset.as_ref(), &self.key, cur, 1024, &[], &mut buf)
104 {
105 cur = next;
106 self.table.merge_rows(&buf);
107 } else {
108 self.table.merge_rows(&buf);
109 break;
110 }
111 }
112 }
113
114 pub(crate) fn for_each(&self, f: impl FnMut(&TI::Key, SubsetRef)) {
115 self.table.for_each(f);
116 }
117
118 pub(crate) fn len(&self) -> usize {
119 self.table.len()
120 }
121}
122
123pub(crate) struct SubsetTable {
124 keys: RowBuffer,
125 hash: Pooled<HashTable<TableEntry<BufferedSubset>>>,
126}
127
128impl Clone for SubsetTable {
129 fn clone(&self) -> Self {
130 SubsetTable {
131 keys: self.keys.clone(),
132 hash: Pooled::cloned(&self.hash),
133 }
134 }
135}
136
137impl SubsetTable {
138 fn new(key_arity: usize) -> SubsetTable {
139 SubsetTable {
140 keys: RowBuffer::new(key_arity),
141 hash: with_pool_set(|ps| ps.get()),
142 }
143 }
144}
145
146pub(crate) trait IndexBase {
147 type Key: ?Sized;
151
152 type WriteKey: ?Sized;
156
157 fn clear(&mut self);
159 fn get_subset(&self, key: &Self::Key) -> Option<SubsetRef<'_>>;
161 fn add_row(&mut self, key: &Self::WriteKey, row: RowId);
163 fn merge_rows(&mut self, buf: &TaggedRowBuffer);
165 fn for_each(&self, f: impl FnMut(&Self::Key, SubsetRef));
167 fn len(&self) -> usize;
169
170 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef);
171
172 fn rebuild_full(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
176 let mut buf = TaggedRowBuffer::new(cols.len());
177 let mut cur = Offset::new(0);
178 loop {
179 buf.clear();
180 if let Some(next) = table.scan_project(subset, cols, cur, 1024, &[], &mut buf) {
181 cur = next;
182 self.merge_rows(&buf);
183 } else {
184 self.merge_rows(&buf);
185 break;
186 }
187 }
188 }
189}
190
191struct ColumnIndexShard {
192 table: Pooled<IndexMap<Value, BufferedSubset>>,
193 subsets: SubsetBuffer,
194}
195
196impl Clone for ColumnIndexShard {
197 fn clone(&self) -> Self {
198 ColumnIndexShard {
199 table: Pooled::cloned(&self.table),
200 subsets: self.subsets.clone(),
201 }
202 }
203}
204
205#[derive(Clone)]
206pub struct ColumnIndex {
207 shard_data: ShardData,
209 shards: IdVec<ShardId, ColumnIndexShard>,
210}
211
212impl IndexBase for ColumnIndex {
213 type Key = Value;
214 type WriteKey = [Value];
215 fn clear(&mut self) {
216 for (_, shard) in self.shards.iter_mut() {
217 for (_, subset) in shard.table.drain(..) {
218 match subset {
219 BufferedSubset::Dense(_) => {}
220 BufferedSubset::Sparse(buffered_vec) => {
221 shard.subsets.return_vec(buffered_vec);
222 }
223 }
224 }
225 }
226 }
227
228 fn get_subset<'a>(&'a self, key: &Value) -> Option<SubsetRef<'a>> {
229 let shard = self.shard_data.get_shard(key, &self.shards);
230 shard.table.get(key).map(|x| x.as_ref(&shard.subsets))
231 }
232 fn add_row(&mut self, vals: &[Value], row: RowId) {
233 for key in vals {
235 let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
236 unsafe {
237 shard
238 .table
239 .entry(*key)
240 .or_insert_with(BufferedSubset::empty)
241 .add_row_sorted(row, &mut shard.subsets);
242 }
243 }
244 }
245 fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
246 for (src_id, key) in buf.iter() {
247 self.add_row(key, src_id);
248 }
249 }
250
251 fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
252 for (subsets, (k, v)) in self
253 .shards
254 .iter()
255 .flat_map(|(_, shard)| shard.table.iter().map(|x| (&shard.subsets, x)))
256 {
257 f(k, v.as_ref(subsets));
258 }
259 }
260
261 fn len(&self) -> usize {
262 self.shards.iter().map(|(_, shard)| shard.table.len()).sum()
263 }
264
265 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
266 const BATCH_SIZE: usize = 1024;
267 let shard_data = self.shard_data;
268 let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
269 shard_data.n_shards(),
270 );
271 queues.resize_with(shard_data.n_shards(), || {
272 Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
273 });
274 let split_buf = |buf: TaggedRowBuffer| {
275 let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
276 split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(1));
277 for (row_id, keys) in buf.iter() {
278 for key in keys {
279 shard_data
280 .get_shard_mut(*key, &mut split)
281 .add_row(row_id, &[*key]);
282 }
283 }
284 for (shard_id, buf) in split.drain() {
285 if buf.is_empty() {
286 continue;
287 }
288 let first = buf.get_row(RowId::new(0)).0;
289 queues[shard_id].lock().unwrap().push((first, buf));
290 }
291 };
292
293 run_in_thread_pool_and_block(&THREAD_POOL, || {
294 rayon::in_place_scope(|inner| {
295 let mut cur = Offset::new(0);
296 loop {
297 let mut buf = TaggedRowBuffer::new(cols.len());
298 if let Some(next) =
299 table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
300 {
301 cur = next;
302 inner.spawn(move |_| split_buf(buf));
303 } else {
304 inner.spawn(move |_| split_buf(buf));
305 break;
306 }
307 }
308 });
309
310 self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
311 let mut vec = queues[shard_id].lock().unwrap();
313 vec.sort_by_key(|(start, _)| *start);
314 for (_, buf) in vec.drain(..) {
315 for (row_id, key) in buf.iter() {
316 debug_assert_eq!(key.len(), 1);
317 match shard.table.entry(key[0]) {
318 Entry::Occupied(mut occ) => {
319 unsafe {
321 occ.get_mut().add_row_sorted(row_id, &mut shard.subsets);
322 }
323 }
324 Entry::Vacant(v) => {
325 v.insert(BufferedSubset::singleton(row_id));
326 }
327 }
328 }
329 }
330 });
331 });
332 }
333
334 fn rebuild_full(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
342 let n = table.all().size() * cols.len();
343
344 let mut pairs: Vec<(Value, RowId)> = Vec::with_capacity(n);
345 for &col in cols {
346 table.for_each_col(subset, col, &mut |row_id, val| {
347 pairs.push((val, row_id));
348 });
349 }
350
351 radix_sort_pairs_by_value(&mut pairs);
352 if cols.len() > 1 {
353 pairs.dedup();
355 }
356
357 let mut i = 0;
358 while i < pairs.len() {
359 let key = pairs[i].0;
360 let start = i;
361 let mut first = pairs[i].1;
362 let mut last = pairs[i].1;
363 while i < pairs.len() && pairs[i].0 == key {
364 last = cmp::max(last, pairs[i].1);
365 first = cmp::min(first, pairs[i].1);
366 i += 1;
367 }
368 let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
369 let count = i - start;
370 let buffered = if last.rep() - first.rep() == (count - 1) as u32 {
371 BufferedSubset::Dense(OffsetRange::new(first, last.inc()))
374 } else {
375 let bv = shard
376 .subsets
377 .new_vec(pairs[start..i].iter().map(|&(_, r)| r));
378 BufferedSubset::Sparse(bv)
379 };
380 shard.table.insert(key, buffered);
381 }
382 }
383}
384
385fn run_in_thread_pool_and_block<'a>(pool: &rayon::ThreadPool, f: impl FnMut() + Send + 'a) {
397 trait LifetimeWork<'a>: FnMut() + Send + 'a {}
404
405 impl<'a, F: FnMut() + Send + 'a> LifetimeWork<'a> for F {}
406 let as_lifetime: Box<dyn LifetimeWork<'a>> = Box::new(f);
407 let mut casted_away = unsafe {
408 mem::transmute::<Box<dyn LifetimeWork<'a>>, Box<dyn LifetimeWork<'static>>>(as_lifetime)
411 };
412 let n = Arc::new(Notification::new());
413 let inner = n.clone();
414 pool.spawn(move || {
415 casted_away();
416 mem::drop(casted_away);
417 inner.notify();
418 });
419 n.wait()
420}
421
422fn radix_sort_pairs_by_value(pairs: &mut Vec<(Value, RowId)>) {
431 let n = pairs.len();
432 if n < 64 {
433 pairs.sort_unstable();
434 return;
435 }
436
437 let max_val = pairs.iter().map(|&(v, _)| v.rep()).max().unwrap_or(0);
438 let n_passes: u32 = if max_val < 256 {
439 1
440 } else if max_val < 65_536 {
441 2
442 } else if max_val < (1 << 24) {
443 3
444 } else {
445 4
446 };
447
448 let null_pair = (Value::new_const(0), RowId::new_const(0));
449 let mut buf: Vec<(Value, RowId)> = vec![null_pair; n];
450
451 let mut src: &mut Vec<(Value, RowId)> = pairs;
452 let mut dst: &mut Vec<(Value, RowId)> = &mut buf;
453
454 for pass in 0..n_passes {
455 let shift = pass * 8;
456 let mut count = [0u32; 256];
457
458 for pair in src.iter() {
460 let bucket = (pair.0.rep() >> shift) & 0xFF;
461 count[bucket as usize] += 1;
462 }
463
464 let mut prefix = 0u32;
466 for c in &mut count {
467 let prev = *c;
468 *c = prefix;
469 prefix += prev;
470 }
471
472 for &pair in src.iter() {
474 let bucket = ((pair.0.rep() >> shift) & 0xFF) as usize;
475 dst[count[bucket] as usize] = pair;
476 count[bucket] += 1;
477 }
478
479 core::mem::swap(&mut src, &mut dst);
480 }
481
482 if n_passes % 2 == 1 {
486 dst.copy_from_slice(src);
487 }
488}
489
490impl ColumnIndex {
491 pub(crate) fn new() -> ColumnIndex {
492 with_pool_set(|ps| {
493 let shard_data = ShardData::new(num_shards());
494 let mut shards = IdVec::with_capacity(shard_data.n_shards());
495 shards.resize_with(shard_data.n_shards(), || ColumnIndexShard {
496 table: ps.get(),
497 subsets: SubsetBuffer::default(),
498 });
499 ColumnIndex { shard_data, shards }
500 })
501 }
502
503 pub(crate) fn reserve_for_n_rows(&mut self, n: usize) {
506 let n_shards = self.shards.len();
507 let per_shard = n / n_shards + 2;
508 for (_, shard) in self.shards.iter_mut() {
509 shard.table.reserve(per_shard);
510 }
511 }
512
513 pub(crate) fn build_for_subset(
517 table: WrappedTableRef,
518 subset: SubsetRef,
519 col: ColumnId,
520 ) -> ColumnIndex {
521 const SORT_BULK_THRESHOLD: usize = 512;
522 let mut res = ColumnIndex::new();
523 if subset.size() >= SORT_BULK_THRESHOLD {
524 res.rebuild_full(&[col], table, subset);
525 } else {
526 res.reserve_for_n_rows(subset.size());
527 table.for_each_col(subset, col, &mut |row_id, val| {
528 res.add_row(&[val], row_id);
529 });
530 }
531 res
532 }
533}
534
535#[derive(Clone)]
536struct TupleIndexShard {
537 table: SubsetTable,
538 subsets: SubsetBuffer,
539}
540
541#[derive(Clone)]
543pub struct TupleIndex {
544 shard_data: ShardData,
547 shards: IdVec<ShardId, TupleIndexShard>,
548}
549
550impl TupleIndex {
551 pub(crate) fn new(key_arity: usize) -> TupleIndex {
552 let shard_data = ShardData::new(num_shards());
553 let mut shards = IdVec::with_capacity(shard_data.n_shards());
554 shards.resize_with(shard_data.n_shards(), || TupleIndexShard {
555 table: SubsetTable::new(key_arity),
556 subsets: SubsetBuffer::default(),
557 });
558 TupleIndex { shard_data, shards }
559 }
560}
561
562impl IndexBase for TupleIndex {
563 type Key = [Value];
564 type WriteKey = Self::Key;
565
566 fn clear(&mut self) {
567 for (_, shard) in self.shards.iter_mut() {
568 shard.table.keys.clear();
569 for entry in shard.table.hash.drain() {
570 match entry.vals {
571 BufferedSubset::Dense(_) => {}
572 BufferedSubset::Sparse(v) => {
573 shard.subsets.return_vec(v);
574 }
575 }
576 }
577 }
578 }
579
580 fn get_subset<'a>(&'a self, key: &[Value]) -> Option<SubsetRef<'a>> {
581 let hash = hash_key(key);
582 let shard = &self.shards[self.shard_data.shard_id(hash)];
583 let entry = shard.table.hash.find(hash, |entry| {
584 entry.hash == hash && unsafe { shard.table.keys.get_row_unchecked(entry.key) } == key
586 })?;
587 Some(entry.vals.as_ref(&shard.subsets))
588 }
589
590 fn add_row(&mut self, key: &[Value], row: RowId) {
591 use hashbrown::hash_table::Entry;
592 let hash = hash_key(key);
593 let shard = &mut self.shards[self.shard_data.shard_id(hash)];
594 let table_entry = shard.table.hash.entry(
595 hash,
596 |entry| {
598 entry.hash == hash
599 && unsafe { shard.table.keys.get_row_unchecked(entry.key) } == key
600 },
601 |ent| ent.hash,
602 );
603 match table_entry {
604 Entry::Occupied(mut occ) => {
605 unsafe {
607 occ.get_mut().vals.add_row_sorted(row, &mut shard.subsets);
608 }
609 }
610 Entry::Vacant(v) => {
611 let key_id = shard.table.keys.add_row(key);
612 let subset = BufferedSubset::singleton(row);
613 v.insert(TableEntry {
614 hash,
615 key: key_id,
616 vals: subset,
617 });
618 }
619 }
620 }
621
622 fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
623 for (src_id, key) in buf.iter() {
624 self.add_row(key, src_id);
625 }
626 }
627 fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
628 for (_, shard) in self.shards.iter() {
629 for entry in shard.table.hash.iter() {
630 let key = unsafe { shard.table.keys.get_row_unchecked(entry.key) };
632 f(key, entry.vals.as_ref(&shard.subsets));
633 }
634 }
635 }
636
637 fn len(&self) -> usize {
638 self.shards
639 .iter()
640 .map(|(_, shard)| shard.table.hash.len())
641 .sum()
642 }
643
644 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
645 const BATCH_SIZE: usize = 1024;
649 let shard_data = self.shard_data;
650 let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
651 shard_data.n_shards(),
652 );
653 queues.resize_with(shard_data.n_shards(), || {
654 Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
655 });
656 let split_buf = |buf: TaggedRowBuffer| {
657 let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
658 split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(cols.len()));
659 for (row_id, key) in buf.iter() {
660 shard_data
661 .get_shard_mut(key, &mut split)
662 .add_row(row_id, key);
663 }
664 for (shard_id, buf) in split.drain() {
665 if buf.is_empty() {
666 continue;
667 }
668 let first = buf.get_row(RowId::new(0)).0;
669 queues[shard_id].lock().unwrap().push((first, buf));
670 }
671 };
672 run_in_thread_pool_and_block(&THREAD_POOL, || {
673 rayon::scope(|scope| {
674 let mut cur = Offset::new(0);
675 loop {
676 let mut buf = TaggedRowBuffer::new(cols.len());
677 if let Some(next) =
678 table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
679 {
680 cur = next;
681 scope.spawn(move |_| split_buf(buf));
682 } else {
683 scope.spawn(move |_| split_buf(buf));
684 break;
685 }
686 }
687 });
688 self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
689 use hashbrown::hash_table::Entry;
690 let mut vec = queues[shard_id].lock().unwrap();
692 vec.sort_by_key(|(start, _)| *start);
693 for (_, buf) in vec.drain(..) {
694 for (row_id, key) in buf.iter() {
695 let hash = hash_key(key);
696 let table_entry = shard.table.hash.entry(
697 hash,
698 |entry| {
700 entry.hash == hash
701 && unsafe { shard.table.keys.get_row_unchecked(entry.key) }
702 == key
703 },
704 |ent| ent.hash,
705 );
706 match table_entry {
707 Entry::Occupied(mut occ) => {
708 unsafe {
710 occ.get_mut()
711 .vals
712 .add_row_sorted(row_id, &mut shard.subsets);
713 }
714 }
715 Entry::Vacant(v) => {
716 let key_id = shard.table.keys.add_row(key);
717 let subset = BufferedSubset::singleton(row_id);
718 v.insert(TableEntry {
719 hash,
720 key: key_id,
721 vals: subset,
722 });
723 }
724 }
725 }
726 }
727 });
728 });
729 }
730}
731
732fn hash_key(key: &[Value]) -> u64 {
733 let mut hasher = FxHasher::default();
734 key.hash(&mut hasher);
735 hasher.finish()
736}
737
738#[derive(Default)]
745pub struct IndexCatalog<K: Clone + std::hash::Hash + Eq, I: Clone> {
746 data: ReadOptimizedLock<Vec<(K, I)>>,
747}
748
749impl<K, I: Clone> IndexCatalog<K, I>
750where
751 K: Clone + std::hash::Hash + Eq,
752{
753 pub fn new() -> Self {
754 IndexCatalog {
755 data: ReadOptimizedLock::new(Vec::new()),
756 }
757 }
758
759 pub fn map(&self, f: impl Fn(&(K, I)) -> (K, I)) -> Self {
760 let vec = self.data.read().iter().map(f).collect();
761 IndexCatalog {
762 data: ReadOptimizedLock::new(vec),
763 }
764 }
765
766 pub fn update(&mut self, f: impl Fn(&K, &mut I)) {
767 for (k, i) in self.data.as_mut_ref() {
768 f(k, i)
769 }
770 }
771
772 pub fn get_or_insert(&self, k: K, init: impl FnOnce() -> I) -> I {
773 let data = self.data.read();
774 let entry = data.iter().find(|(k1, _)| k1 == &k);
775 if let Some(entry) = entry {
776 entry.1.clone()
777 } else {
778 drop(data);
779 let mut data = self.data.lock();
780 if let Some(entry) = data.iter().find(|(k1, _)| k1 == &k) {
781 entry.1.clone()
782 } else {
783 let index = init();
784 data.push((k, index.clone()));
785 index
786 }
787 }
788 }
789}
790
791define_id!(BufferIndex, u32, "an index into a subset buffer");
792
793struct SubsetBuffer {
803 buf: Pooled<Vec<RowId>>,
804 free_list: FreeList,
805}
806
807impl Clone for SubsetBuffer {
808 fn clone(&self) -> Self {
809 SubsetBuffer {
810 buf: Pooled::cloned(&self.buf),
811 free_list: self.free_list.clone(),
812 }
813 }
814}
815
816impl Default for SubsetBuffer {
817 fn default() -> SubsetBuffer {
818 with_pool_set(|ps| SubsetBuffer {
819 buf: ps.get(),
820 free_list: Default::default(),
821 })
822 }
823}
824
825impl SubsetBuffer {
826 fn new_vec(&mut self, rows: impl ExactSizeIterator<Item = RowId>) -> BufferedVec {
827 let len = rows.len();
828 if let Some(v) = self.free_list.get_size_class(len).pop() {
829 return self.fill_at(v, rows);
830 }
831 let start = BufferIndex::from_usize(self.buf.len());
832 self.buf.resize(
833 start.index() + len.next_power_of_two(),
834 RowId::new(u32::MAX),
835 );
836 self.fill_at(start, rows)
837 }
838
839 fn fill_at(
840 &mut self,
841 start: BufferIndex,
842 rows: impl ExactSizeIterator<Item = RowId>,
843 ) -> BufferedVec {
844 let mut cur = start;
845 for i in rows {
846 self.buf[cur.index()] = i;
847 cur = cur.inc();
848 }
849 BufferedVec(start, cur)
850 }
851
852 fn return_vec(&mut self, vec: BufferedVec) {
853 self.free_list.get_size_class(vec.len()).push(vec.0);
854 }
855
856 fn push_vec(&mut self, vec: BufferedVec, row: RowId) -> BufferedVec {
857 debug_assert!(
858 vec.is_empty() || self.buf[vec.1.index() - 1] <= row,
859 "vec={vec:?}, row={row:?}, last_elt={:?}",
860 self.buf[vec.1.index() - 1]
861 );
862 if !vec.len().is_power_of_two() {
863 self.buf[vec.1.index()] = row;
864 return BufferedVec(vec.0, vec.1.inc());
865 }
866
867 let res = if let Some(v) = self.free_list.get_size_class(vec.len() + 1).pop() {
868 self.buf
869 .copy_within(vec.0.index()..vec.1.index(), v.index());
870 self.buf[v.index() + vec.len()] = row;
871 BufferedVec(v, BufferIndex::from_usize(v.index() + vec.len() + 1))
872 } else {
873 let start = self.buf.len();
874 self.buf.resize(
875 start + (vec.len() + 1).next_power_of_two(),
876 RowId::new(u32::MAX),
877 );
878 self.buf.copy_within(vec.0.index()..vec.1.index(), start);
879 self.buf[start + vec.len()] = row;
880 let end = start + vec.len() + 1;
881 BufferedVec(BufferIndex::from_usize(start), BufferIndex::from_usize(end))
882 };
883 self.return_vec(vec);
884 res
885 }
886
887 fn make_ref<'a>(&'a self, vec: &BufferedVec) -> SubsetRef<'a> {
888 let res = SubsetRef::Sparse(unsafe {
893 SortedOffsetSlice::new_unchecked(&self.buf[vec.0.index()..vec.1.index()])
894 });
895 #[cfg(debug_assertions)]
896 {
897 use crate::offsets::Offsets;
898 res.offsets(|x| assert_ne!(x.rep(), u32::MAX))
899 }
900 res
901 }
902}
903
904#[derive(Debug, Clone)]
914pub(crate) struct BufferedVec(BufferIndex, BufferIndex);
915
916impl Default for BufferedVec {
917 fn default() -> Self {
918 BufferedVec(BufferIndex::new(0), BufferIndex::new(0))
919 }
920}
921
922impl BufferedVec {
923 fn is_empty(&self) -> bool {
924 self.0 == self.1
925 }
926 fn len(&self) -> usize {
927 self.1.index() - self.0.index()
928 }
929}
930
931#[derive(Clone)]
932pub(crate) enum BufferedSubset {
933 Dense(OffsetRange),
934 Sparse(BufferedVec),
935}
936
937impl BufferedSubset {
938 unsafe fn add_row_sorted(&mut self, row: RowId, buf: &mut SubsetBuffer) {
940 match self {
941 BufferedSubset::Dense(range) => {
942 if range.end == range.start {
943 range.start = row;
944 range.end = row.inc();
945 return;
946 }
947 if range.end == row {
948 range.end = row.inc();
949 return;
950 }
951 let mut v = buf.new_vec((range.start.rep()..range.end.rep()).map(RowId::new));
952 v = buf.push_vec(v, row);
953 *self = BufferedSubset::Sparse(v);
954 }
955 BufferedSubset::Sparse(vec) => *vec = buf.push_vec(mem::take(vec), row),
956 }
957 }
958
959 fn empty() -> Self {
960 BufferedSubset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
961 }
962
963 fn singleton(row: RowId) -> Self {
964 BufferedSubset::Dense(OffsetRange::new(row, row.inc()))
965 }
966
967 fn as_ref<'a>(&self, buf: &'a SubsetBuffer) -> SubsetRef<'a> {
968 match self {
969 BufferedSubset::Dense(range) => SubsetRef::Dense(*range),
970 BufferedSubset::Sparse(vec) => buf.make_ref(vec),
971 }
972 }
973}
974
975fn num_shards() -> usize {
976 let n_threads = rayon::current_num_threads();
977 if n_threads == 1 { 1 } else { n_threads * 2 }
978}
979
980static THREAD_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
988 rayon::ThreadPoolBuilder::new()
989 .num_threads(rayon::current_num_threads())
990 .build()
991 .unwrap()
992});
993
994#[derive(Clone, Default)]
1003pub(super) struct FreeList {
1004 data: [Vec<BufferIndex>; 32],
1005}
1006
1007impl FreeList {
1008 fn get_size_class(&mut self, size: usize) -> &mut Vec<BufferIndex> {
1009 let size_class = size.next_power_of_two();
1010 let idx = size_class.trailing_zeros() as usize;
1011 &mut self.data[idx]
1012 }
1013}