egglog_core_relations/offsets/
mod.rs

1use std::{cmp, fmt, mem};
2
3use crate::numeric_id::{NumericId, define_id};
4
5use crate::{
6    Pool,
7    pool::{Clear, Pooled, with_pool_set},
8};
9
10define_id!(pub RowId, u32, "a numeric offset into a table");
11
12#[cfg(test)]
13mod tests;
14
15/// A trait for types that represent a sequence of sorted offsets into a table.
16///
17/// NB: this trait may have outlived its usefulness. We may want to just get rid
18/// of it.
19pub(crate) trait Offsets {
20    // A half-open range enclosing the offsets in this sequence.
21    fn bounds(&self) -> Option<(RowId, RowId)>;
22    fn is_empty(&self) -> bool {
23        self.bounds().is_none_or(|(lo, hi)| lo == hi)
24    }
25    fn offsets(&self, f: impl FnMut(RowId));
26}
27
28#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)]
29pub struct OffsetRange {
30    pub(crate) start: RowId,
31    pub(crate) end: RowId,
32}
33
34impl Offsets for OffsetRange {
35    fn bounds(&self) -> Option<(RowId, RowId)> {
36        Some((self.start, self.end))
37    }
38
39    fn offsets(&self, f: impl FnMut(RowId)) {
40        RowId::range(self.start, self.end).for_each(f)
41    }
42}
43
44impl OffsetRange {
45    pub fn new(start: RowId, end: RowId) -> OffsetRange {
46        debug_assert!(
47            start <= end,
48            "attempting to create malformed range {start:?}..{end:?}"
49        );
50        OffsetRange { start, end }
51    }
52    pub(crate) fn size(&self) -> usize {
53        self.end.index() - self.start.index()
54    }
55}
56
57#[derive(Default, Clone, PartialEq, Eq, Debug, Hash)]
58pub struct SortedOffsetVector(Vec<RowId>);
59
60impl SortedOffsetVector {
61    pub(crate) fn slice(&self) -> &SortedOffsetSlice {
62        // SAFETY: self.0 is sorted.
63        unsafe { SortedOffsetSlice::new_unchecked(&self.0) }
64    }
65
66    pub(crate) fn push(&mut self, offset: RowId) {
67        assert!(self.0.last().is_none_or(|last| last <= &offset));
68        unsafe { self.push_unchecked(offset) }
69    }
70
71    pub(crate) unsafe fn push_unchecked(&mut self, offset: RowId) {
72        self.0.push(offset)
73    }
74
75    pub(crate) fn retain(&mut self, mut f: impl FnMut(RowId) -> bool) {
76        self.0.retain(|off| f(*off))
77    }
78
79    pub(crate) fn extend_nonoverlapping(&mut self, other: &SortedOffsetSlice) {
80        if other.inner().is_empty() {
81            return;
82        }
83        if self.0.is_empty() {
84            self.0.extend(other.iter());
85            return;
86        }
87        if self.0.last().unwrap() <= other.inner().first().unwrap() {
88            self.0.extend(other.iter());
89            return;
90        }
91        panic!("attempting to extend with overlapping offsets")
92    }
93
94    /// Overwrite the contents of the current vector with those of the offset range.
95    pub(crate) fn fill_from_dense(&mut self, range: &OffsetRange) {
96        self.0.clear();
97        self.0
98            .extend((range.start.index()..range.end.index()).map(RowId::from_usize));
99    }
100}
101
102impl Clear for SortedOffsetVector {
103    fn clear(&mut self) {
104        self.0.clear()
105    }
106    fn reuse(&self) -> bool {
107        self.0.capacity() > 0
108    }
109    fn bytes(&self) -> usize {
110        self.0.capacity() * mem::size_of::<RowId>()
111    }
112}
113
114impl Offsets for SortedOffsetVector {
115    fn bounds(&self) -> Option<(RowId, RowId)> {
116        self.slice().bounds()
117    }
118
119    fn offsets(&self, f: impl FnMut(RowId)) {
120        self.slice().offsets(f)
121    }
122}
123
124#[derive(PartialEq, Eq)]
125#[repr(transparent)]
126pub struct SortedOffsetSlice([RowId]);
127
128impl fmt::Debug for SortedOffsetSlice {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_list().entries(self.0.iter()).finish()
131    }
132}
133
134impl SortedOffsetSlice {
135    pub(crate) unsafe fn new_unchecked(slice: &[RowId]) -> &SortedOffsetSlice {
136        debug_assert!(
137            slice.windows(2).all(|w| w[0] <= w[1]),
138            "slice is not sorted: {slice:?}"
139        );
140        // SAFETY: SortedOffsetSlice is repr(transparent), so the two layouts are compatible.
141        unsafe { mem::transmute::<&[RowId], &SortedOffsetSlice>(slice) }
142    }
143
144    pub(crate) fn iter(&self) -> impl Iterator<Item = RowId> + '_ {
145        self.0.iter().copied()
146    }
147
148    pub(crate) fn inner(&self) -> &[RowId] {
149        &self.0
150    }
151
152    pub(crate) fn subslice(&self, lo: usize, hi: usize) -> &SortedOffsetSlice {
153        // Safety: any subslice of a sorted slice is sorted.
154        unsafe { SortedOffsetSlice::new_unchecked(&self.inner()[lo..hi]) }
155    }
156
157    /// Return the index of the first offset in the slice that is greater than or equal to `target`.
158    pub(crate) fn binary_search_by_id(&self, target: RowId) -> usize {
159        self.binary_search_from(0, target)
160    }
161    fn binary_search_from(&self, start: usize, target: RowId) -> usize {
162        match self.inner()[start..].binary_search(&target) {
163            Ok(mut found) => {
164                found += start;
165                // This is O(n), but offset slices probably won't have duplicates at all.
166                while found > 0 && self.inner()[found - 1] == target {
167                    found -= 1;
168                }
169                found
170            }
171            Err(x) => start + x,
172        }
173    }
174
175    #[inline]
176    fn scan_for_offset(&self, start: usize, target: RowId) -> Result<usize, usize> {
177        let slice = self.inner();
178        let len = slice.len();
179
180        if start >= len {
181            return Err(len);
182        }
183        if slice[start] == target {
184            return Ok(start);
185        }
186        if slice[start] > target {
187            return Err(start);
188        }
189
190        // Galloping search: slice[start] < target, so probe at start+1, start+2, start+4, ...
191        // until we overshoot or reach the end, then binary search the narrowed range.
192        let mut lo = start;
193        let mut step = 1usize;
194        let hi = loop {
195            let probe = lo + step;
196            if probe >= len {
197                break len;
198            }
199            match slice[probe].cmp(&target) {
200                cmp::Ordering::Less => {
201                    lo = probe;
202                    step = step.saturating_mul(2);
203                }
204                cmp::Ordering::Equal => {
205                    let mut found = probe;
206                    while found > start && slice[found - 1] == target {
207                        found -= 1;
208                    }
209                    return Ok(found);
210                }
211                cmp::Ordering::Greater => break probe,
212            }
213        };
214
215        // Invariant: slice[lo] < target, and either hi == len or slice[hi] > target.
216        match slice[lo + 1..hi].binary_search(&target) {
217            Ok(mut found) => {
218                found += lo + 1;
219                while found > start && slice[found - 1] == target {
220                    found -= 1;
221                }
222                Ok(found)
223            }
224            Err(x) => Err(lo + 1 + x),
225        }
226    }
227}
228
229impl Offsets for SortedOffsetSlice {
230    fn bounds(&self) -> Option<(RowId, RowId)> {
231        Some((
232            *self.0.first()?,
233            RowId::from_usize(self.0.last()?.index() + 1),
234        ))
235    }
236
237    fn offsets(&self, f: impl FnMut(RowId)) {
238        self.0.iter().copied().for_each(f)
239    }
240}
241
242impl Offsets for &'_ SortedOffsetSlice {
243    fn bounds(&self) -> Option<(RowId, RowId)> {
244        Some((
245            *self.0.first()?,
246            RowId::from_usize(self.0.last()?.index() + 1),
247        ))
248    }
249
250    fn offsets(&self, f: impl FnMut(RowId)) {
251        self.0.iter().copied().for_each(f)
252    }
253}
254
255#[derive(Copy, Clone)]
256pub enum SubsetRef<'a> {
257    Dense(OffsetRange),
258    Sparse(&'a SortedOffsetSlice),
259}
260
261impl Offsets for SubsetRef<'_> {
262    fn bounds(&self) -> Option<(RowId, RowId)> {
263        match self {
264            SubsetRef::Dense(r) => r.bounds(),
265            SubsetRef::Sparse(s) => s.bounds(),
266        }
267    }
268    fn offsets(&self, f: impl FnMut(RowId)) {
269        match self {
270            SubsetRef::Dense(r) => r.offsets(f),
271            SubsetRef::Sparse(s) => s.offsets(f),
272        }
273    }
274}
275
276impl SubsetRef<'_> {
277    pub(crate) fn size(&self) -> usize {
278        match self {
279            SubsetRef::Dense(range) => range.size(),
280            SubsetRef::Sparse(vec) => vec.0.len(),
281        }
282    }
283
284    pub(crate) fn to_owned(self, pool: &Pool<SortedOffsetVector>) -> Subset {
285        match self {
286            SubsetRef::Dense(r) => Subset::Dense(r),
287            SubsetRef::Sparse(s) => {
288                let mut vec = pool.get();
289                vec.extend_nonoverlapping(s);
290                Subset::Sparse(vec)
291            }
292        }
293    }
294
295    /// Get the underlying slice of a sparse subset. Used for debugging.
296    pub(crate) fn _slice(&self) -> &[RowId] {
297        match self {
298            SubsetRef::Dense(_) => panic!("getting slice from dense subset"),
299            SubsetRef::Sparse(slc) => slc.inner(),
300        }
301    }
302    pub(crate) fn iter_bounded(
303        self,
304        start: usize,
305        end: usize,
306        mut f: impl FnMut(RowId),
307    ) -> Option<usize> {
308        match self {
309            SubsetRef::Dense(r) => {
310                let mut cur = start;
311                for row in (r.start.index() + start.index())
312                    ..cmp::min(r.start.index().saturating_add(end), r.end.index())
313                {
314                    f(RowId::new(row as _));
315                    cur += 1;
316                }
317                if cur + r.start.index() < r.end.index() {
318                    Some(cur)
319                } else {
320                    None
321                }
322            }
323            SubsetRef::Sparse(vec) => {
324                let end = cmp::min(vec.0.len(), end);
325                let next = if end == vec.0.len() { None } else { Some(end) };
326                vec.0[start..end].iter().copied().for_each(f);
327                next
328            }
329        }
330    }
331}
332
333/// Either or an offset range or a sorted offset vector.
334#[derive(Debug, Hash, PartialEq, Eq)]
335pub enum Subset {
336    Dense(OffsetRange),
337    Sparse(Pooled<SortedOffsetVector>),
338}
339
340impl Offsets for Subset {
341    fn bounds(&self) -> Option<(RowId, RowId)> {
342        match self {
343            Subset::Dense(r) => r.bounds(),
344            Subset::Sparse(s) => s.slice().bounds(),
345        }
346    }
347    fn offsets(&self, f: impl FnMut(RowId)) {
348        match self {
349            Subset::Dense(r) => r.offsets(f),
350            Subset::Sparse(s) => s.slice().offsets(f),
351        }
352    }
353}
354
355impl Clone for Subset {
356    fn clone(&self) -> Self {
357        match self {
358            Subset::Dense(r) => Subset::Dense(*r),
359            Subset::Sparse(s) => Subset::Sparse(Pooled::cloned(s)),
360        }
361    }
362}
363
364// TODO: consider making Subset::Sparse an Rc, so copies are shallow?
365
366impl Subset {
367    /// The size of the subset.
368    pub fn size(&self) -> usize {
369        match self {
370            Subset::Dense(range) => range.size(),
371            Subset::Sparse(vec) => vec.0.len(),
372        }
373    }
374
375    pub(crate) fn is_dense(&self) -> bool {
376        matches!(self, Subset::Dense(_))
377    }
378
379    pub fn as_ref(&self) -> SubsetRef<'_> {
380        match self {
381            Subset::Dense(r) => SubsetRef::Dense(*r),
382            Subset::Sparse(s) => SubsetRef::Sparse(s.slice()),
383        }
384    }
385
386    pub(crate) fn retain(&mut self, mut filter: impl FnMut(RowId) -> bool) {
387        match self {
388            Subset::Dense(offs) => {
389                let mut res = Subset::empty();
390                offs.offsets(|row| {
391                    if filter(row) {
392                        res.add_row_sorted(row);
393                    }
394                });
395                *self = res;
396            }
397            Subset::Sparse(offs) => offs.retain(filter),
398        }
399    }
400    /// Remove any elements of the current subset not present in `other`.
401    #[inline]
402    pub(crate) fn intersect(&mut self, other: SubsetRef, pool: &Pool<SortedOffsetVector>) {
403        match (self, other) {
404            (Subset::Dense(cur), SubsetRef::Dense(other)) => {
405                let resl = cmp::max(cur.start, other.start);
406                let resr = cmp::min(cur.end, other.end);
407                if resl >= resr {
408                    *cur = OffsetRange::new(resl, resl);
409                } else {
410                    *cur = OffsetRange::new(resl, resr);
411                }
412            }
413            (x @ Subset::Dense(_), SubsetRef::Sparse(sparse)) => {
414                let (low, hi) = x.bounds().unwrap();
415                if sparse.bounds().is_some() {
416                    let l = sparse.binary_search_by_id(low);
417                    let r = sparse.binary_search_by_id(hi);
418                    // Check emptiness before allocating from the pool.
419                    if l >= r {
420                        *x = Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)));
421                    } else {
422                        let subslice = sparse.subslice(l, r);
423                        let mut res = pool.get();
424                        res.extend_nonoverlapping(subslice);
425                        *x = Subset::Sparse(res);
426                    }
427                } else {
428                    // empty range
429                    *x = Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)));
430                }
431            }
432            (Subset::Sparse(sparse), SubsetRef::Dense(dense)) => {
433                // Binary search for both bounds; avoid copy_within if no prefix to remove.
434                let l = sparse.slice().binary_search_by_id(dense.start);
435                let r = sparse.slice().binary_search_from(l, dense.end);
436                if l == 0 {
437                    sparse.0.truncate(r);
438                } else {
439                    sparse.0.copy_within(l..r, 0);
440                    sparse.0.truncate(r - l);
441                }
442            }
443            (Subset::Sparse(cur), SubsetRef::Sparse(other)) => {
444                let cur_len = cur.0.len();
445                let other_inner = other.inner();
446                let other_len = other_inner.len();
447                if (cur_len <= 16 && other_len <= 16)
448                    || (cur_len / 4 <= other_len && other_len <= cur_len * 4)
449                {
450                    // Similar sizes: two-pointer intersection in O(cur_len + other_len).
451                    let mut write = 0usize;
452                    let mut oi = 0usize;
453                    for ci in 0..cur_len {
454                        let target = cur.0[ci];
455                        while oi < other_len && other_inner[oi] < target {
456                            oi += 1;
457                        }
458                        if oi == other_len {
459                            break;
460                        }
461                        if other_inner[oi] == target {
462                            cur.0[write] = target;
463                            write += 1;
464                            oi += 1;
465                        }
466                    }
467                    cur.0.truncate(write);
468                } else if cur_len > other_len {
469                    // other is much smaller: iterate other and gallop in cur.
470                    // O(other_len * log(cur_len / other_len)) vs O(cur_len) for retain.
471                    let mut write = 0usize;
472                    let mut ci = 0usize;
473                    #[allow(clippy::needless_range_loop)]
474                    for oi in 0..other_len {
475                        if ci >= cur_len {
476                            break;
477                        }
478                        let target = other_inner[oi];
479                        let result = cur.slice().scan_for_offset(ci, target);
480                        match result {
481                            Ok(found) => {
482                                cur.0[write] = target;
483                                write += 1;
484                                ci = found + 1;
485                            }
486                            Err(next_ci) => {
487                                ci = next_ci;
488                            }
489                        }
490                    }
491                    cur.0.truncate(write);
492                } else {
493                    // cur is much smaller than other: iterate cur and gallop in other.
494                    // O(cur_len * log(other_len / cur_len)).
495                    let mut other_off = 0;
496                    cur.retain(|rowid| match other.scan_for_offset(other_off, rowid) {
497                        Ok(found) => {
498                            other_off = found + 1;
499                            true
500                        }
501                        Err(next_off) => {
502                            other_off = next_off;
503                            false
504                        }
505                    });
506                }
507            }
508        }
509    }
510
511    /// Append the given row id to the Subset.
512    ///
513    /// # Panics
514    /// The row id in question must be greater than or equal to the upper bound
515    /// of the subset. This method will panic if it is not.
516    pub(crate) fn add_row_sorted(&mut self, row: RowId) {
517        match self {
518            Subset::Dense(range) => {
519                if range.end == range.start {
520                    range.start = row;
521                    range.end = row.inc();
522                    return;
523                }
524                if range.end == row {
525                    range.end = row.inc();
526                    return;
527                }
528                let mut vec = with_pool_set(|pool_set| pool_set.get::<SortedOffsetVector>());
529                vec.fill_from_dense(range);
530                vec.push(row);
531                *self = Subset::Sparse(vec);
532            }
533            Subset::Sparse(s) => {
534                s.push(row);
535            }
536        }
537    }
538
539    pub(crate) fn empty() -> Subset {
540        Subset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
541    }
542}