1use std::{
8 any::Any,
9 cmp,
10 hash::Hasher,
11 mem,
12 sync::{
13 Arc, Weak,
14 atomic::{AtomicUsize, Ordering},
15 },
16};
17
18use crate::numeric_id::{DenseIdMap, NumericId};
19use crossbeam_queue::SegQueue;
20use hashbrown::HashTable;
21use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
22use rustc_hash::FxHasher;
23use sharded_hash_table::ShardedHashTable;
24
25use crate::{
26 Pooled, TableChange, TableId,
27 action::ExecutionState,
28 common::{HashMap, ShardData, ShardId, SubsetTracker, Value},
29 hash_index::{ColumnIndex, Index},
30 offsets::{OffsetRange, Offsets, RowId, Subset, SubsetRef},
31 parallel_heuristics::parallelize_table_op,
32 pool::with_pool_set,
33 row_buffer::{ParallelRowBufWriter, RowBuffer},
34 table_spec::{
35 ColumnId, Constraint, Generation, MutationBuffer, Offset, Row, Table, TableSpec,
36 TableVersion,
37 },
38};
39
40mod rebuild;
41mod sharded_hash_table;
42#[cfg(test)]
43mod tests;
44
45type HashCode = u64;
51
52#[derive(Clone, Debug)]
54pub(crate) struct TableEntry {
55 hashcode: HashCode,
56 row: RowId,
57}
58
59impl TableEntry {
60 fn hashcode(&self) -> u64 {
61 #[allow(clippy::unnecessary_cast)]
63 {
64 self.hashcode as u64
65 }
66 }
67}
68
69#[derive(Clone)]
74struct Rows {
75 data: RowBuffer,
76 scratch: RowBuffer,
77 stale_rows: usize,
78}
79
80impl Rows {
81 fn new(data: RowBuffer) -> Rows {
82 let arity = data.arity();
83 Rows {
84 data,
85 scratch: RowBuffer::new(arity),
86 stale_rows: 0,
87 }
88 }
89 fn clear(&mut self) {
90 self.data.clear();
91 self.stale_rows = 0;
92 }
93 fn next_row(&self) -> RowId {
94 RowId::from_usize(self.data.len())
95 }
96 fn set_stale(&mut self, row: RowId) {
97 if !self.data.set_stale(row) {
98 self.stale_rows += 1;
99 }
100 }
101
102 fn get_row(&self, row: RowId) -> Option<&[Value]> {
103 let row = self.data.get_row(row);
104 if row[0].is_stale() { None } else { Some(row) }
105 }
106
107 unsafe fn get_row_unchecked(&self, row: RowId) -> Option<&[Value]> {
109 let row = unsafe { self.data.get_row_unchecked(row) };
110 if row[0].is_stale() { None } else { Some(row) }
111 }
112
113 fn add_row(&mut self, row: &[Value]) -> RowId {
114 if row[0].is_stale() {
115 self.stale_rows += 1;
116 }
117 self.data.add_row(row)
118 }
119
120 fn remove_stale(&mut self, remap: impl FnMut(&[Value], RowId, RowId)) {
121 self.data.remove_stale(remap);
122 self.stale_rows = 0;
123 }
124}
125
126pub type MergeFn =
134 dyn Fn(&mut ExecutionState, &[Value], &[Value], &mut Vec<Value>) -> bool + Send + Sync;
135
136pub struct SortedWritesTable {
137 generation: Generation,
138 data: Rows,
139 hash: ShardedHashTable<TableEntry>,
140
141 n_keys: usize,
142 n_columns: usize,
143 sort_by: Option<ColumnId>,
144 offsets: Vec<(Value, RowId)>,
145
146 pending_state: Arc<PendingState>,
147 merge: Arc<MergeFn>,
148 to_rebuild: Vec<ColumnId>,
149 rebuild_index: Index<ColumnIndex>,
150 subset_tracker: SubsetTracker,
152}
153
154impl Clone for SortedWritesTable {
155 fn clone(&self) -> SortedWritesTable {
156 SortedWritesTable {
157 generation: self.generation,
158 data: self.data.clone(),
159 hash: self.hash.clone(),
160 n_keys: self.n_keys,
161 n_columns: self.n_columns,
162 sort_by: self.sort_by,
163 offsets: self.offsets.clone(),
164 pending_state: Arc::new(self.pending_state.deep_copy()),
165 merge: self.merge.clone(),
166 to_rebuild: self.to_rebuild.clone(),
167 rebuild_index: Index::new(self.to_rebuild.clone(), ColumnIndex::new()),
168 subset_tracker: Default::default(),
169 }
170 }
171}
172
173#[derive(Clone)]
179enum ArbitraryRowBuffer {
180 NonEmpty(RowBuffer),
181 Empty { rows: usize },
182}
183
184impl ArbitraryRowBuffer {
185 fn new(arity: usize) -> ArbitraryRowBuffer {
186 if arity == 0 {
187 ArbitraryRowBuffer::Empty { rows: 0 }
188 } else {
189 ArbitraryRowBuffer::NonEmpty(RowBuffer::new(arity))
190 }
191 }
192
193 fn add_row(&mut self, row: &[Value]) {
194 match self {
195 ArbitraryRowBuffer::NonEmpty(buf) => {
196 buf.add_row(row);
197 }
198 ArbitraryRowBuffer::Empty { rows } => {
199 *rows += 1;
200 }
201 }
202 }
203
204 fn len(&self) -> usize {
205 match self {
206 ArbitraryRowBuffer::NonEmpty(buf) => buf.len(),
207 ArbitraryRowBuffer::Empty { rows } => *rows,
208 }
209 }
210
211 fn for_each(&self, mut f: impl FnMut(&[Value])) {
212 match self {
213 ArbitraryRowBuffer::NonEmpty(buf) => {
214 for row in buf.iter() {
215 f(row);
216 }
217 }
218 ArbitraryRowBuffer::Empty { rows } => {
219 for _ in 0..*rows {
220 f(&[]);
221 }
222 }
223 }
224 }
225}
226
227struct Buffer {
228 pending_rows: DenseIdMap<ShardId, RowBuffer>,
229 pending_removals: DenseIdMap<ShardId, ArbitraryRowBuffer>,
230 state: Weak<PendingState>,
231 n_cols: u32,
232 n_keys: u32,
233 shard_data: ShardData,
234}
235
236impl MutationBuffer for Buffer {
237 fn stage_insert(&mut self, row: &[Value]) {
238 let (shard, _) = hash_code(self.shard_data, row, self.n_keys as _);
239 self.pending_rows
240 .get_or_insert(shard, || RowBuffer::new(self.n_cols as _))
241 .add_row(row);
242 }
243 fn stage_remove(&mut self, key: &[Value]) {
244 let (shard, _) = hash_code(self.shard_data, key, self.n_keys as _);
245 self.pending_removals
246 .get_or_insert(shard, || ArbitraryRowBuffer::new(self.n_keys as _))
247 .add_row(key);
248 }
249 fn fresh_handle(&self) -> Box<dyn MutationBuffer> {
250 Box::new(Buffer {
251 pending_rows: Default::default(),
252 pending_removals: Default::default(),
253 state: self.state.clone(),
254 n_cols: self.n_cols,
255 n_keys: self.n_keys,
256 shard_data: self.shard_data,
257 })
258 }
259}
260
261impl Drop for Buffer {
262 fn drop(&mut self) {
263 if let Some(state) = self.state.upgrade() {
264 let mut rows = 0;
265 for shard_id in 0..self.pending_rows.n_ids() {
266 let shard = ShardId::from_usize(shard_id);
267 let Some(buf) = self.pending_rows.take(shard) else {
268 continue;
269 };
270 rows += buf.len();
271 state.pending_rows[shard].push(buf);
272 }
273 state.total_rows.fetch_add(rows, Ordering::Relaxed);
274
275 let mut rows = 0;
276 for shard_id in 0..self.pending_removals.n_ids() {
277 let shard = ShardId::from_usize(shard_id);
278 let Some(buf) = self.pending_removals.take(shard) else {
279 continue;
280 };
281 rows += buf.len();
282 state.pending_removals[shard].push(buf);
283 }
284 state.total_removals.fetch_add(rows, Ordering::Relaxed);
285 }
286 }
287}
288
289impl Table for SortedWritesTable {
290 fn dyn_clone(&self) -> Box<dyn Table> {
291 Box::new(self.clone())
292 }
293 fn as_any(&self) -> &dyn Any {
294 self
295 }
296 fn clear(&mut self) {
297 self.pending_state.clear();
298 if self.data.data.len() == 0 {
299 return;
300 }
301 self.offsets.clear();
302 self.data.clear();
303 self.hash.clear();
304 self.generation = Generation::from_usize(self.version().major.index() + 1);
305 }
306
307 fn spec(&self) -> TableSpec {
308 TableSpec {
309 n_keys: self.n_keys,
310 n_vals: self.n_columns - self.n_keys,
311 uncacheable_columns: Default::default(),
312 allows_delete: true,
313 }
314 }
315
316 fn apply_rebuild(
317 &mut self,
318 table_id: TableId,
319 table: &crate::WrappedTable,
320 next_ts: Value,
321 exec_state: &mut ExecutionState,
322 ) -> bool {
323 self.do_rebuild(table_id, table, next_ts, exec_state)
324 }
325
326 fn version(&self) -> TableVersion {
327 TableVersion {
328 major: self.generation,
329 minor: Offset::from_usize(self.data.next_row().index()),
330 }
331 }
332
333 fn updates_since(&self, offset: Offset) -> Subset {
334 Subset::Dense(OffsetRange::new(
335 RowId::from_usize(offset.index()),
336 self.data.next_row(),
337 ))
338 }
339
340 fn all(&self) -> Subset {
341 Subset::Dense(OffsetRange::new(RowId::new(0), self.data.next_row()))
342 }
343
344 fn len(&self) -> usize {
345 self.data.data.len() - self.data.stale_rows
346 }
347
348 fn scan_generic(&self, subset: SubsetRef, mut f: impl FnMut(RowId, &[Value]))
349 where
350 Self: Sized,
351 {
352 let Some((_low, hi)) = subset.bounds() else {
353 return;
355 };
356 assert!(
357 hi.index() <= self.data.data.len(),
358 "{} vs. {}",
359 hi.index(),
360 self.data.data.len()
361 );
362 subset.offsets(|row| unsafe {
365 if let Some(vals) = self.data.get_row_unchecked(row) {
366 f(row, vals)
367 }
368 })
369 }
370
371 fn scan_generic_bounded(
372 &self,
373 subset: SubsetRef,
374 start: Offset,
375 n: usize,
376 cs: &[Constraint],
377 mut f: impl FnMut(RowId, &[Value]),
378 ) -> Option<Offset>
379 where
380 Self: Sized,
381 {
382 if cs.is_empty() {
383 subset
384 .iter_bounded(start.index(), start.index() + n, |row| {
385 let Some(entry) = self.data.get_row(row) else {
386 return;
387 };
388 f(row, entry);
389 })
390 .map(Offset::from_usize)
391 } else {
392 subset
393 .iter_bounded(start.index(), start.index() + n, |row| {
394 let Some(entry) = self.get_if(cs, row) else {
395 return;
396 };
397 f(row, entry);
398 })
399 .map(Offset::from_usize)
400 }
401 }
402
403 fn fast_subset(&self, constraint: &Constraint) -> Option<Subset> {
404 let sort_by = self.sort_by?;
405 match constraint {
406 Constraint::Eq { .. } => None,
407 Constraint::EqConst { col, val } => {
408 if col == &sort_by {
409 match self.binary_search_sort_val(*val) {
410 Ok((found, bound)) => Some(Subset::Dense(OffsetRange::new(found, bound))),
411 Err(_) => Some(Subset::empty()),
412 }
413 } else {
414 None
415 }
416 }
417 Constraint::LtConst { col, val } => {
418 if col == &sort_by {
419 match self.binary_search_sort_val(*val) {
420 Ok((found, _)) => {
421 Some(Subset::Dense(OffsetRange::new(RowId::new(0), found)))
422 }
423 Err(next) => Some(Subset::Dense(OffsetRange::new(RowId::new(0), next))),
424 }
425 } else {
426 None
427 }
428 }
429 Constraint::GtConst { col, val } => {
430 if col == &sort_by {
431 match self.binary_search_sort_val(*val) {
432 Ok((_, bound)) => {
433 Some(Subset::Dense(OffsetRange::new(bound, self.data.next_row())))
434 }
435 Err(next) => {
436 Some(Subset::Dense(OffsetRange::new(next, self.data.next_row())))
437 }
438 }
439 } else {
440 None
441 }
442 }
443 Constraint::LeConst { col, val } => {
444 if col == &sort_by {
445 match self.binary_search_sort_val(*val) {
446 Ok((_, bound)) => {
447 Some(Subset::Dense(OffsetRange::new(RowId::new(0), bound)))
448 }
449 Err(next) => Some(Subset::Dense(OffsetRange::new(RowId::new(0), next))),
450 }
451 } else {
452 None
453 }
454 }
455 Constraint::GeConst { col, val } => {
456 if col == &sort_by {
457 match self.binary_search_sort_val(*val) {
458 Ok((found, _)) => {
459 Some(Subset::Dense(OffsetRange::new(found, self.data.next_row())))
460 }
461 Err(next) => {
462 Some(Subset::Dense(OffsetRange::new(next, self.data.next_row())))
463 }
464 }
465 } else {
466 None
467 }
468 }
469 }
470 }
471
472 fn refine_one(&self, mut subset: Subset, c: &Constraint) -> Subset {
473 subset.retain(|row| self.eval(std::slice::from_ref(c), row));
476 subset
477 }
478
479 fn new_buffer(&self) -> Box<dyn MutationBuffer> {
480 let n_shards = self.hash.shard_data().n_shards();
481 Box::new(Buffer {
482 pending_rows: DenseIdMap::with_capacity(n_shards),
483 pending_removals: DenseIdMap::with_capacity(n_shards),
484 state: Arc::downgrade(&self.pending_state),
485 n_keys: u32::try_from(self.n_keys).expect("n_keys should fit in u32"),
486 n_cols: u32::try_from(self.n_columns).expect("n_columns should fit in u32"),
487 shard_data: self.hash.shard_data(),
488 })
489 }
490
491 fn merge(&mut self, exec_state: &mut ExecutionState) -> TableChange {
492 let removed = self.do_delete();
493 let added = self.do_insert(exec_state);
494 self.maybe_rehash();
495 TableChange { removed, added }
496 }
497
498 fn get_row(&self, key: &[Value]) -> Option<Row> {
499 let id = get_entry(key, self.n_keys, &self.hash, |row| {
500 &self.data.get_row(row).unwrap()[0..self.n_keys] == key
501 })?;
502 let mut vals = with_pool_set(|ps| ps.get::<Vec<Value>>());
503 vals.extend_from_slice(self.data.get_row(id).unwrap());
504 Some(Row { id, vals })
505 }
506
507 fn get_row_column(&self, key: &[Value], col: ColumnId) -> Option<Value> {
508 let id = get_entry(key, self.n_keys, &self.hash, |row| {
509 &self.data.get_row(row).unwrap()[0..self.n_keys] == key
510 })?;
511 Some(self.data.get_row(id).unwrap()[col.index()])
512 }
513}
514
515impl SortedWritesTable {
516 pub fn new(
528 n_keys: usize,
529 n_columns: usize,
530 sort_by: Option<ColumnId>,
531 to_rebuild: Vec<ColumnId>,
532 merge_fn: Box<MergeFn>,
533 ) -> Self {
534 let hash = ShardedHashTable::<TableEntry>::default();
535 let shard_data = hash.shard_data();
536 let rebuild_index = Index::new(to_rebuild.clone(), ColumnIndex::new());
537 SortedWritesTable {
538 generation: Generation::new(0),
539 data: Rows::new(RowBuffer::new(n_columns)),
540 hash,
541 n_keys,
542 n_columns,
543 sort_by,
544 offsets: Default::default(),
545 pending_state: Arc::new(PendingState::new(shard_data)),
546 merge: merge_fn.into(),
547 to_rebuild,
548 rebuild_index,
549 subset_tracker: Default::default(),
550 }
551 }
552
553 fn parallel_delete(&mut self) -> bool {
555 let shard_data = self.hash.shard_data();
556 let stale_delta: usize = self
557 .hash
558 .mut_shards()
559 .par_iter_mut()
560 .enumerate()
561 .filter_map(|(shard_id, shard)| {
562 let shard_id = ShardId::from_usize(shard_id);
563 if self.pending_state.pending_removals[shard_id].is_empty() {
564 return None;
565 }
566 Some((shard_id, shard))
567 })
568 .map(|(shard_id, shard)| {
569 let queue = &self.pending_state.pending_removals[shard_id];
570 let mut marked_stale = 0;
571 while let Some(buf) = queue.pop() {
572 buf.for_each(|to_remove| {
573 let (actual_shard, hc) = hash_code(shard_data, to_remove, self.n_keys);
574 assert_eq!(actual_shard, shard_id);
575 if let Ok(entry) = shard.find_entry(hc, |entry| {
576 entry.hashcode == (hc as _)
577 && &self.data.get_row(entry.row).unwrap()[0..self.n_keys]
578 == to_remove
579 }) {
580 let (ent, _) = entry.remove();
581 marked_stale +=
589 unsafe { !self.data.data.set_stale_shared(ent.row) } as usize;
590 }
591 });
592 }
593 marked_stale
594 })
595 .sum();
596 self.data.stale_rows += stale_delta;
598 stale_delta > 0
599 }
600 fn serial_delete(&mut self) -> bool {
601 let shard_data = self.hash.shard_data();
602 let mut changed = false;
603 self.hash
604 .mut_shards()
605 .iter_mut()
606 .enumerate()
607 .for_each(|(shard_id, shard)| {
608 let shard_id = ShardId::from_usize(shard_id);
609 let queue = &self.pending_state.pending_removals[shard_id];
610 while let Some(buf) = queue.pop() {
611 buf.for_each(|to_remove| {
612 let (actual_shard, hc) = hash_code(shard_data, to_remove, self.n_keys);
613 assert_eq!(actual_shard, shard_id);
614 if let Ok(entry) = shard.find_entry(hc, |entry| {
615 entry.hashcode == (hc as _)
616 && &self.data.get_row(entry.row).unwrap()[0..self.n_keys]
617 == to_remove
618 }) {
619 let (ent, _) = entry.remove();
620 self.data.set_stale(ent.row);
621 changed = true;
622 }
623 })
624 }
625 });
626 changed
627 }
628
629 fn do_delete(&mut self) -> bool {
630 let total = self.pending_state.total_removals.swap(0, Ordering::Relaxed);
631
632 if parallelize_table_op(total) {
633 self.parallel_delete()
634 } else {
635 self.serial_delete()
636 }
637 }
638
639 fn do_insert(&mut self, exec_state: &mut ExecutionState) -> bool {
640 let total = self.pending_state.total_rows.swap(0, Ordering::Relaxed);
641 self.data.data.reserve(total);
642 if parallelize_table_op(total) {
643 if let Some(col) = self.sort_by {
644 self.parallel_insert(
645 exec_state,
646 SortChecker {
647 col,
648 current: None,
649 baseline: self.offsets.last().map(|(v, _)| *v),
650 },
651 )
652 } else {
653 self.parallel_insert(exec_state, ())
654 }
655 } else {
656 self.serial_insert(exec_state)
657 }
658 }
659
660 fn serial_insert(&mut self, exec_state: &mut ExecutionState) -> bool {
661 let mut changed = false;
662 let n_keys = self.n_keys;
663 let mut scratch = with_pool_set(|ps| ps.get::<Vec<Value>>());
664 for (_outer_shard, queue) in self.pending_state.pending_rows.iter() {
665 if let Some(sort_by) = self.sort_by {
666 while let Some(buf) = queue.pop() {
667 for query in buf.non_stale() {
668 let key = &query[0..n_keys];
669 let entry = get_entry_mut(query, n_keys, &mut self.hash, |row| {
670 let Some(row) = self.data.get_row(row) else {
671 return false;
672 };
673 &row[0..n_keys] == key
674 });
675
676 if let Some(row) = entry {
677 let cur = self
681 .data
682 .get_row(*row)
683 .expect("table should not point to stale entry");
684 if (self.merge)(exec_state, cur, query, &mut scratch) {
685 let sort_val = query[sort_by.index()];
686 let new = self.data.add_row(&scratch);
687 if let Some(largest) = self.offsets.last().map(|(v, _)| *v) {
688 assert!(
689 sort_val >= largest,
690 "inserting row that violates sort order ({sort_val:?} vs. {largest:?})"
691 );
692 if sort_val > largest {
693 self.offsets.push((sort_val, new));
694 }
695 } else {
696 self.offsets.push((sort_val, new));
697 }
698 self.data.set_stale(*row);
699 *row = new;
700 changed = true;
701 }
702 scratch.clear();
703 } else {
704 let sort_val = query[sort_by.index()];
705 let new = self.data.add_row(query);
707 if let Some(largest) = self.offsets.last().map(|(v, _)| *v) {
708 assert!(
709 sort_val >= largest,
710 "inserting row that violates sort order {sort_val:?} vs. {largest:?}"
711 );
712 if sort_val > largest {
713 self.offsets.push((sort_val, new));
714 }
715 } else {
716 self.offsets.push((sort_val, new));
717 }
718 let (shard, hc) = hash_code(self.hash.shard_data(), query, self.n_keys);
719 debug_assert_eq!(shard, _outer_shard);
720 self.hash.mut_shards()[shard.index()].insert_unique(
721 hc as _,
722 TableEntry {
723 hashcode: hc as _,
724 row: new,
725 },
726 TableEntry::hashcode,
727 );
728 changed = true;
729 }
730 }
731 }
732 } else {
733 while let Some(buf) = queue.pop() {
735 for query in buf.non_stale() {
736 let key = &query[0..n_keys];
737 let entry = get_entry_mut(query, n_keys, &mut self.hash, |row| {
738 let Some(row) = self.data.get_row(row) else {
739 return false;
740 };
741 &row[0..n_keys] == key
742 });
743
744 if let Some(row) = entry {
745 let cur = self
746 .data
747 .get_row(*row)
748 .expect("table should not point to stale entry");
749 if (self.merge)(exec_state, cur, query, &mut scratch) {
750 let new = self.data.add_row(&scratch);
751 self.data.set_stale(*row);
752 *row = new;
753 changed = true;
754 }
755 scratch.clear();
756 } else {
757 let new = self.data.add_row(query);
759 let (shard, hc) = hash_code(self.hash.shard_data(), query, self.n_keys);
760 debug_assert_eq!(shard, _outer_shard);
761 self.hash.mut_shards()[shard.index()].insert_unique(
762 hc as _,
763 TableEntry {
764 hashcode: hc as _,
765 row: new,
766 },
767 TableEntry::hashcode,
768 );
769 changed = true;
770 }
771 }
772 }
773 };
774 }
775 changed
776 }
777
778 fn parallel_insert<C: OrderingChecker>(
779 &mut self,
780 exec_state: &ExecutionState,
781 checker: C,
782 ) -> bool {
783 const BATCH_SIZE: usize = 1 << 18;
784 let shard_data = self.hash.shard_data();
789 let n_keys = self.n_keys;
790 let n_cols = self.n_columns;
791 let next_offset = RowId::from_usize(self.data.data.len());
792 let row_writer = self.data.data.parallel_writer();
793 let pending_adds = self
794 .hash
795 .mut_shards()
796 .par_iter_mut()
797 .enumerate()
798 .map(|(shard_id, shard)| {
799 let shard_id = ShardId::from_usize(shard_id);
800 let mut checker = checker.clone();
801 let mut exec_state = exec_state.clone();
802 let mut scratch = with_pool_set(|ps| ps.get::<Vec<Value>>());
803 let queue = &self.pending_state.pending_rows[shard_id];
804 let mut marked_stale = 0usize;
805 let mut staged = StagedOutputs::new(n_keys, n_cols, BATCH_SIZE);
806 let mut changed = false;
807 macro_rules! flush_staged_outputs {
810 () => {{
811 let (start_row, stale) = staged.write_output(&row_writer);
814 marked_stale += stale;
815 let mut cur_row = start_row;
824 let read_handle = row_writer.read_handle();
825 for row in staged.rows() {
826 if row.first().map(Value::is_stale).unwrap_or(false) {
827 cur_row = cur_row.inc();
828 continue;
829 }
830 use hashbrown::hash_table::Entry;
831 checker.check_local(row);
832 changed = true;
833 let key = &row[0..n_keys];
834 let (_actual_shard, hc) = hash_code(shard_data, row, n_keys);
835 #[cfg(any(debug_assertions, test))]
836 {
837 unsafe {
838 assert_eq!(read_handle.get_row_unchecked(cur_row), row);
841 }
842 }
843 debug_assert_eq!(_actual_shard, shard_id);
844 match shard.entry(
845 hc,
846 |ent| unsafe {
848 ent.hashcode == hc as HashCode
849 && &read_handle.get_row_unchecked(ent.row)[0..n_keys] == key
850 },
851 TableEntry::hashcode,
852 ) {
853 Entry::Occupied(mut occ) => {
854 let cur = unsafe { read_handle.get_row_unchecked(occ.get().row) };
857
858 if (self.merge)(&mut exec_state, cur, row, &mut scratch) {
864 unsafe {
865 let _was_stale = read_handle.set_stale_shared(occ.get().row);
866 debug_assert!(!_was_stale);
867 }
868 occ.get_mut().row = cur_row;
869 changed = true;
870 } else {
871 unsafe {
873 let _was_stale = read_handle.set_stale_shared(cur_row);
874 debug_assert!(!_was_stale);
875 }
876 }
877 marked_stale += 1;
878 scratch.clear();
879 }
880 Entry::Vacant(v) => {
881 changed = true;
882 v.insert(TableEntry {
883 hashcode: hc as HashCode,
884 row: cur_row,
885 });
886 }
887 }
888
889 cur_row = cur_row.inc();
890 }
891 staged.clear();
892 }};
893 }
894 while let Some(buf) = queue.pop() {
899 for row in buf.non_stale() {
903 staged.insert(row, |cur, new, out| {
904 (self.merge)(&mut exec_state, cur, new, out)
905 });
906 if staged.len() >= BATCH_SIZE {
907 flush_staged_outputs!();
908 }
909 }
910 }
911 flush_staged_outputs!();
912 (checker, marked_stale, changed)
913 })
914 .collect_vec_list();
915 self.data.data = row_writer.finish();
916 let checker = C::check_global(pending_adds.iter().flatten().map(|(checker, _, _)| checker));
921 checker.update_offsets(next_offset, &mut self.offsets);
922
923 self.data.stale_rows += pending_adds
925 .iter()
926 .flatten()
927 .map(|(_, stale, _)| *stale)
928 .sum::<usize>();
929
930 pending_adds
932 .iter()
933 .flatten()
934 .any(|(_, _, changed)| *changed)
935 }
936
937 fn binary_search_sort_val(&self, val: Value) -> Result<(RowId, RowId), RowId> {
938 debug_assert!(
939 self.offsets.windows(2).all(|x| x[0].1 < x[1].1),
940 "{:?}",
941 self.offsets
942 );
943
944 debug_assert!(
945 self.offsets.windows(2).all(|x| x[0].0 < x[1].0),
946 "{:?}",
947 self.offsets
948 );
949 match self.offsets.binary_search_by_key(&val, |(v, _)| *v) {
950 Ok(got) => Ok((
951 self.offsets[got].1,
952 self.offsets
953 .get(got + 1)
954 .map(|(_, r)| *r)
955 .unwrap_or(self.data.next_row()),
956 )),
957 Err(next) => Err(self
958 .offsets
959 .get(next)
960 .map(|(_, id)| *id)
961 .unwrap_or(self.data.next_row())),
962 }
963 }
964 fn eval(&self, cs: &[Constraint], row: RowId) -> bool {
965 self.get_if(cs, row).is_some()
966 }
967
968 fn get_if(&self, cs: &[Constraint], row: RowId) -> Option<&[Value]> {
969 let row = self.data.get_row(row)?;
970 let mut res = true;
971 for constraint in cs {
972 match constraint {
973 Constraint::Eq { l_col, r_col } => res &= row[l_col.index()] == row[r_col.index()],
974 Constraint::EqConst { col, val } => res &= row[col.index()] == *val,
975 Constraint::LtConst { col, val } => res &= row[col.index()] < *val,
976 Constraint::GtConst { col, val } => res &= row[col.index()] > *val,
977 Constraint::LeConst { col, val } => res &= row[col.index()] <= *val,
978 Constraint::GeConst { col, val } => res &= row[col.index()] >= *val,
979 }
980 }
981 if res { Some(row) } else { None }
982 }
983
984 fn maybe_rehash(&mut self) {
985 if self.data.stale_rows <= cmp::max(16, self.data.data.len() / 2) {
986 return;
987 }
988
989 if parallelize_table_op(self.data.data.len()) {
990 self.parallel_rehash();
991 } else {
992 self.rehash();
993 }
994 }
995 fn parallel_rehash(&mut self) {
996 use rayon::prelude::*;
997 let Some(sort_by) = self.sort_by else {
1001 self.rehash();
1008 return;
1009 };
1010 self.generation = self.generation.inc();
1011 assert!(!self.offsets.is_empty());
1012 struct TimestampStats {
1013 value: Value,
1014 count: usize,
1015 histogram: Pooled<DenseIdMap<ShardId, usize>>,
1016 }
1017 impl Default for TimestampStats {
1018 fn default() -> TimestampStats {
1019 TimestampStats {
1020 value: Value::stale(),
1021 count: 0,
1022 histogram: with_pool_set(|ps| ps.get()),
1023 }
1024 }
1025 }
1026 let mut results = Vec::<TimestampStats>::with_capacity(self.offsets.len());
1027 results.resize_with(self.offsets.len() - 1, Default::default);
1028 macro_rules! compute_hist {
1030 ($start_val: expr, $start_row: expr, $end_row: expr) => {{
1031 let mut histogram: Pooled<DenseIdMap<ShardId, usize>> =
1032 with_pool_set(|ps| ps.get());
1033 let mut cur_row = $start_row;
1034 let mut count = 0;
1035 while cur_row < $end_row {
1036 if let Some(row) = self.data.get_row(cur_row) {
1037 count += 1;
1038 let (shard, _) = hash_code(self.hash.shard_data(), row, self.n_keys);
1039 *histogram.get_or_default(shard) += 1;
1040 }
1041 cur_row = cur_row.inc();
1042 }
1043 TimestampStats {
1044 value: $start_val,
1045 count,
1046 histogram,
1047 }
1048 }};
1049 }
1050 let mut last: TimestampStats = Default::default();
1051 rayon::join(
1052 || {
1053 self.offsets
1055 .windows(2)
1056 .zip(results.iter_mut())
1057 .par_bridge()
1058 .for_each(|(xs, res)| {
1059 let [(start_val, start_row), (_, end_row)] = xs else {
1060 unreachable!()
1061 };
1062 *res = compute_hist!(*start_val, *start_row, *end_row);
1063 })
1064 },
1065 || {
1066 let (start_val, start_row) = self.offsets.last().unwrap();
1068 let end_row = self.data.next_row();
1069 last = compute_hist!(*start_val, *start_row, end_row);
1070 },
1071 );
1072 results.push(last);
1073 let mut prev_count = 0;
1079 self.offsets.clear();
1080 for stats in results.iter_mut() {
1081 if stats.count == 0 {
1082 continue;
1083 }
1084 self.offsets
1085 .push((stats.value, RowId::from_usize(prev_count)));
1086 let mut inner = prev_count;
1087 for (_, count) in stats.histogram.iter_mut() {
1088 let tmp = *count;
1091 *count = inner;
1092 inner += tmp;
1093 }
1094 prev_count += stats.count;
1095 debug_assert_eq!(inner, prev_count)
1096 }
1097
1098 self.data.scratch.clear();
1111 self.data.scratch.reserve(prev_count);
1112 self.hash
1113 .mut_shards()
1114 .par_iter_mut()
1115 .with_max_len(1)
1116 .enumerate()
1117 .for_each(|(shard_id, shard)| {
1118 let shard_id = ShardId::from_usize(shard_id);
1119 let scratch_ptr = self.data.scratch.raw_rows();
1120 let mut progress =
1121 HashMap::<Value , RowId >::default();
1122 progress.reserve(results.len());
1123 for stats in &results {
1124 let Some(start) = stats.histogram.get(shard_id) else {
1125 continue;
1126 };
1127 progress.insert(stats.value, RowId::from_usize(*start));
1128 }
1129 for TableEntry { row: row_id, .. } in shard.iter_mut() {
1130 let row = self
1131 .data
1132 .get_row(*row_id)
1133 .expect("shard should not map to a stale value");
1134 let val = row[sort_by.index()];
1135 let next = progress[&val];
1136 unsafe {
1138 std::ptr::copy_nonoverlapping(
1139 row.as_ptr(),
1140 scratch_ptr.add(next.index() * self.n_columns) as *mut Value,
1141 self.n_columns,
1142 )
1143 }
1144 *row_id = next;
1145 progress.insert(val, next.inc());
1146 }
1147 });
1148 unsafe { self.data.scratch.set_len(prev_count) };
1150 mem::swap(&mut self.data.data, &mut self.data.scratch);
1151 self.data.stale_rows = 0;
1152 }
1153 fn rehash_impl(
1154 sort_by: Option<ColumnId>,
1155 n_keys: usize,
1156 rows: &mut Rows,
1157 offsets: &mut Vec<(Value, RowId)>,
1158 hash: &mut ShardedHashTable<TableEntry>,
1159 ) {
1160 if let Some(sort_by) = sort_by {
1161 offsets.clear();
1162 rows.remove_stale(|row, old, new| {
1163 let stale_entry = get_entry_mut(row, n_keys, hash, |x| x == old)
1164 .expect("non-stale entry not mapped in hash");
1165 *stale_entry = new;
1166 let sort_col = row[sort_by.index()];
1167 if let Some((max, _)) = offsets.last() {
1168 if sort_col > *max {
1169 offsets.push((sort_col, new));
1170 }
1171 } else {
1172 offsets.push((sort_col, new));
1173 }
1174 })
1175 } else {
1176 rows.remove_stale(|row, old, new| {
1177 let stale_entry = get_entry_mut(row, n_keys, hash, |x| x == old)
1178 .expect("non-stale entry not mapped in hash");
1179 *stale_entry = new;
1180 })
1181 }
1182 }
1183
1184 fn rehash(&mut self) {
1185 self.generation = self.generation.inc();
1186 Self::rehash_impl(
1187 self.sort_by,
1188 self.n_keys,
1189 &mut self.data,
1190 &mut self.offsets,
1191 &mut self.hash,
1192 )
1193 }
1194}
1195
1196fn get_entry(
1197 row: &[Value],
1198 n_keys: usize,
1199 table: &ShardedHashTable<TableEntry>,
1200 test: impl Fn(RowId) -> bool,
1201) -> Option<RowId> {
1202 let (shard, hash) = hash_code(table.shard_data(), row, n_keys);
1203 table
1204 .get_shard(shard)
1205 .find(hash, |ent| {
1206 ent.hashcode == hash as HashCode && test(ent.row)
1207 })
1208 .map(|ent| ent.row)
1209}
1210
1211fn get_entry_mut<'a>(
1212 row: &[Value],
1213 n_keys: usize,
1214 table: &'a mut ShardedHashTable<TableEntry>,
1215 test: impl Fn(RowId) -> bool,
1216) -> Option<&'a mut RowId> {
1217 let (shard, hash) = hash_code(table.shard_data(), row, n_keys);
1218 table.mut_shards()[shard.index()]
1219 .find_mut(hash, |ent| {
1220 ent.hashcode == hash as HashCode && test(ent.row)
1221 })
1222 .map(|ent| &mut ent.row)
1223}
1224
1225fn hash_code(shard_data: ShardData, row: &[Value], n_keys: usize) -> (ShardId, u64) {
1226 let mut hasher = FxHasher::default();
1227 for val in &row[0..n_keys] {
1228 hasher.write_usize(val.index());
1229 }
1230 let full_code = hasher.finish();
1231 #[allow(clippy::unnecessary_cast)]
1233 (shard_data.shard_id(full_code), full_code as HashCode as u64)
1234}
1235
1236struct PendingState {
1238 pending_rows: DenseIdMap<ShardId, SegQueue<RowBuffer>>,
1239 pending_removals: DenseIdMap<ShardId, SegQueue<ArbitraryRowBuffer>>,
1240 total_removals: AtomicUsize,
1241 total_rows: AtomicUsize,
1242}
1243
1244impl PendingState {
1245 fn new(shard_data: ShardData) -> PendingState {
1246 let n_shards = shard_data.n_shards();
1247 let mut pending_rows = DenseIdMap::with_capacity(n_shards);
1248 let mut pending_removals = DenseIdMap::with_capacity(n_shards);
1249 for i in 0..n_shards {
1250 pending_rows.insert(ShardId::from_usize(i), SegQueue::default());
1251 pending_removals.insert(ShardId::from_usize(i), SegQueue::default());
1252 }
1253
1254 PendingState {
1255 pending_rows,
1256 pending_removals,
1257 total_removals: AtomicUsize::new(0),
1258 total_rows: AtomicUsize::new(0),
1259 }
1260 }
1261 fn clear(&self) {
1262 for (_, queue) in self.pending_rows.iter() {
1263 while queue.pop().is_some() {}
1264 }
1265
1266 for (_, queue) in self.pending_removals.iter() {
1267 while queue.pop().is_some() {}
1268 }
1269 }
1270
1271 fn deep_copy(&self) -> PendingState {
1277 let mut pending_rows = DenseIdMap::new();
1278 let mut pending_removals = DenseIdMap::new();
1279 fn drain_queue<T>(queue: &SegQueue<T>) -> Vec<T> {
1280 let mut res = Vec::new();
1281 while let Some(x) = queue.pop() {
1282 res.push(x);
1283 }
1284 res
1285 }
1286 for (shard, queue) in self.pending_rows.iter() {
1287 let contents = drain_queue(queue);
1288 let new_queue = SegQueue::default();
1289 for x in contents {
1290 new_queue.push(x.clone());
1291 queue.push(x);
1292 }
1293 pending_rows.insert(shard, new_queue);
1294 }
1295
1296 for (shard, queue) in self.pending_removals.iter() {
1297 let contents = drain_queue(queue);
1298 let new_queue = SegQueue::default();
1299 for x in contents {
1300 new_queue.push(x.clone());
1301 queue.push(x);
1302 }
1303 pending_removals.insert(shard, new_queue);
1304 }
1305
1306 PendingState {
1307 pending_rows,
1308 pending_removals,
1309 total_removals: AtomicUsize::new(self.total_removals.load(Ordering::Acquire)),
1310 total_rows: AtomicUsize::new(self.total_rows.load(Ordering::Acquire)),
1311 }
1312 }
1313}
1314
1315trait OrderingChecker: Clone + Send + Sync {
1323 fn check_local(&mut self, row: &[Value]);
1326 fn check_global<'a>(checkers: impl Iterator<Item = &'a Self>) -> Self
1330 where
1331 Self: 'a;
1332 fn update_offsets(&self, start: RowId, offsets: &mut Vec<(Value, RowId)>);
1334}
1335
1336impl OrderingChecker for () {
1337 fn check_local(&mut self, _: &[Value]) {}
1338 fn check_global<'a>(_: impl Iterator<Item = &'a ()>) {}
1339 fn update_offsets(&self, _: RowId, _: &mut Vec<(Value, RowId)>) {}
1340}
1341
1342#[derive(Copy, Clone)]
1343struct SortChecker {
1344 col: ColumnId,
1345 baseline: Option<Value>,
1346 current: Option<Value>,
1347}
1348
1349impl OrderingChecker for SortChecker {
1350 fn check_local(&mut self, row: &[Value]) {
1351 let val = row[self.col.index()];
1352 if let Some(cur) = self.current {
1353 assert_eq!(
1354 cur, val,
1355 "concurrently inserting rows with different sort keys"
1356 );
1357 } else {
1358 self.current = Some(val);
1359 if let Some(baseline) = self.baseline {
1360 assert!(val >= baseline, "inserted row violates sort order");
1361 }
1362 }
1363 }
1364
1365 fn check_global<'a>(mut checkers: impl Iterator<Item = &'a Self>) -> Self {
1366 let Some(start) = checkers.next() else {
1367 return SortChecker {
1368 col: ColumnId::new(!0),
1369 baseline: None,
1370 current: None,
1371 };
1372 };
1373 let mut expected = start.current;
1374 for checker in checkers {
1375 assert_eq!(checker.baseline, start.baseline);
1376 match (&mut expected, checker.current) {
1377 (None, None) => {}
1378 (cur @ None, Some(x)) => {
1379 *cur = Some(x);
1380 }
1381 (Some(_), None) => {}
1382 (Some(x), Some(y)) => {
1383 assert_eq!(
1384 *x, y,
1385 "concurrently inserting rows with different sort keys"
1386 );
1387 }
1388 }
1389 }
1390 SortChecker {
1391 col: start.col,
1392 baseline: start.baseline,
1393 current: expected,
1394 }
1395 }
1396
1397 fn update_offsets(&self, start: RowId, offsets: &mut Vec<(Value, RowId)>) {
1398 if let Some(cur) = self.current {
1399 if let Some((max, _)) = offsets.last() {
1400 if cur > *max {
1401 offsets.push((cur, start));
1402 }
1403 } else {
1404 offsets.push((cur, start));
1405 }
1406 }
1407 }
1408}
1409
1410struct StagedOutputs {
1415 shard_data: ShardData,
1416 n_keys: usize,
1417 hash: Pooled<HashTable<TableEntry>>,
1418 rows: RowBuffer,
1419 n_stale: usize,
1420 scratch: Pooled<Vec<Value>>,
1421}
1422
1423impl StagedOutputs {
1424 fn rows(&self) -> impl Iterator<Item = &[Value]> {
1425 self.rows.iter()
1426 }
1427 fn new(n_keys: usize, n_cols: usize, capacity: usize) -> Self {
1428 let mut res = with_pool_set(|ps| StagedOutputs {
1429 shard_data: ShardData::new(1),
1430 n_keys,
1431 n_stale: 0,
1432 hash: ps.get(),
1433 rows: RowBuffer::new(n_cols),
1434 scratch: ps.get(),
1435 });
1436 res.hash.reserve(capacity, TableEntry::hashcode);
1437 res.rows.reserve(capacity);
1438 res
1439 }
1440 fn clear(&mut self) {
1441 self.hash.clear();
1442 self.rows.clear();
1443 self.n_stale = 0;
1444 }
1445 fn len(&self) -> usize {
1446 self.rows.len() - self.n_stale
1447 }
1448
1449 fn insert(
1450 &mut self,
1451 row: &[Value],
1452 mut merge_fn: impl FnMut(&[Value], &[Value], &mut Vec<Value>) -> bool,
1453 ) {
1454 if row[0].is_stale() {
1455 return;
1456 }
1457 use hashbrown::hash_table::Entry;
1458 let (_, hc) = hash_code(self.shard_data, row, self.n_keys);
1459 let entry = self.hash.entry(
1460 hc,
1461 |te| {
1462 te.hashcode() == hc
1463 && self.rows.get_row(te.row)[0..self.n_keys] == row[0..self.n_keys]
1464 },
1465 TableEntry::hashcode,
1466 );
1467 match entry {
1468 Entry::Occupied(mut occupied_entry) => {
1469 let cur = self.rows.get_row(occupied_entry.get().row);
1470 if merge_fn(cur, row, &mut self.scratch) {
1471 let new = self.rows.add_row(&self.scratch);
1472 self.rows.set_stale(occupied_entry.get().row);
1473 self.n_stale += 1;
1474 occupied_entry.get_mut().row = new;
1475 }
1476 self.scratch.clear();
1477 }
1478 Entry::Vacant(vacant_entry) => {
1479 let next = self.rows.add_row(row);
1480 vacant_entry.insert(TableEntry {
1481 hashcode: hc as _,
1482 row: next,
1483 });
1484 }
1485 }
1486 }
1487
1488 fn write_output(&self, output: &ParallelRowBufWriter) -> (RowId, usize) {
1491 (output.append_contents(&self.rows), self.n_stale)
1492 }
1493}