egglog_core_relations/pool/
mod.rs

1//! Utilities for pooling object allocations.
2
3use std::{
4    cell::{Cell, RefCell},
5    fmt,
6    hash::{Hash, Hasher},
7    mem::{self, ManuallyDrop},
8    ops::{Deref, DerefMut},
9    ptr,
10    rc::Rc,
11};
12
13use crate::{
14    AtomId,
15    free_join::execute::TrieNode,
16    numeric_id::{DenseIdMap, IdVec},
17};
18use fixedbitset::FixedBitSet;
19use hashbrown::HashTable;
20
21use crate::{
22    ColumnId, RowId,
23    action::{Instr, PredictedVals},
24    common::{HashMap, HashSet, IndexMap, IndexSet, ShardId, Value},
25    hash_index::{BufferedSubset, ColumnIndex, TableEntry},
26    offsets::SortedOffsetVector,
27    table::TableEntry as SwTableEntry,
28    table_spec::Constraint,
29};
30
31#[cfg(test)]
32mod tests;
33
34/// A trait for types whose allocations can be reused.
35pub trait Clear: Default {
36    /// Clear the object.
37    ///
38    /// The end result must be equivalent to `Self::default()`.
39    fn clear(&mut self);
40    /// Indicate whether or not this object should be reused.
41    fn reuse(&self) -> bool {
42        true
43    }
44    /// A rough approximation for the in-memory overhead of this object.
45    fn bytes(&self) -> usize;
46}
47
48impl<T> Clear for Vec<T> {
49    fn clear(&mut self) {
50        self.clear()
51    }
52    fn reuse(&self) -> bool {
53        self.capacity() > 256
54    }
55    fn bytes(&self) -> usize {
56        self.capacity() * mem::size_of::<T>()
57    }
58}
59
60impl<T: Clear> Clear for Rc<T> {
61    fn clear(&mut self) {
62        Rc::get_mut(self).unwrap().clear()
63    }
64    fn reuse(&self) -> bool {
65        Rc::strong_count(self) == 1 && Rc::weak_count(self) == 0
66    }
67    fn bytes(&self) -> usize {
68        mem::size_of::<T>()
69    }
70}
71
72impl<T: Clear> Clone for Pooled<Rc<T>>
73where
74    Rc<T>: InPoolSet<PoolSet>,
75{
76    fn clone(&self) -> Self {
77        Pooled {
78            data: self.data.clone(),
79        }
80    }
81}
82
83impl<T> Clear for HashSet<T> {
84    fn clear(&mut self) {
85        self.clear()
86    }
87    fn reuse(&self) -> bool {
88        self.capacity() > 0
89    }
90    fn bytes(&self) -> usize {
91        self.capacity() * mem::size_of::<T>()
92    }
93}
94
95impl<T> Clear for HashTable<T> {
96    fn clear(&mut self) {
97        self.clear()
98    }
99    fn reuse(&self) -> bool {
100        self.capacity() > 0
101    }
102    fn bytes(&self) -> usize {
103        self.capacity() * mem::size_of::<T>()
104    }
105}
106
107impl<K, V> Clear for HashMap<K, V> {
108    fn clear(&mut self) {
109        self.clear()
110    }
111    fn reuse(&self) -> bool {
112        self.capacity() > 0
113    }
114    fn bytes(&self) -> usize {
115        self.capacity() * mem::size_of::<(K, V)>()
116    }
117}
118
119impl<K, V> Clear for IndexMap<K, V> {
120    fn clear(&mut self) {
121        self.clear()
122    }
123    fn reuse(&self) -> bool {
124        self.capacity() > 0
125    }
126    fn bytes(&self) -> usize {
127        self.capacity() * (mem::size_of::<u64>() + mem::size_of::<(K, V)>())
128    }
129}
130
131impl<T> Clear for IndexSet<T> {
132    fn clear(&mut self) {
133        self.clear()
134    }
135    fn reuse(&self) -> bool {
136        self.capacity() > 0
137    }
138    fn bytes(&self) -> usize {
139        self.capacity() * (mem::size_of::<u64>() + mem::size_of::<T>())
140    }
141}
142
143impl Clear for FixedBitSet {
144    fn clear(&mut self) {
145        self.clone_from(&Default::default());
146    }
147    fn reuse(&self) -> bool {
148        !self.is_empty()
149    }
150    fn bytes(&self) -> usize {
151        self.len() / 8
152    }
153}
154
155impl<K, V> Clear for IdVec<K, V> {
156    fn clear(&mut self) {
157        self.clear()
158    }
159    fn reuse(&self) -> bool {
160        self.capacity() > 0
161    }
162    fn bytes(&self) -> usize {
163        self.capacity() * mem::size_of::<V>()
164    }
165}
166
167struct PoolState<T> {
168    data: Vec<T>,
169    bytes: usize,
170    limit: usize,
171}
172
173impl<T: Clear> PoolState<T> {
174    fn new(limit: usize) -> Self {
175        PoolState {
176            data: Vec::new(),
177            bytes: 0,
178            limit,
179        }
180    }
181
182    fn push(&mut self, mut item: T) {
183        if !item.reuse() {
184            return;
185        }
186        if self.bytes + item.bytes() > self.limit {
187            return;
188        }
189        item.clear();
190        self.bytes += item.bytes();
191        self.data.push(item);
192    }
193
194    fn pop(&mut self) -> T {
195        if let Some(got) = self.data.pop() {
196            self.bytes -= got.bytes();
197            got
198        } else {
199            Default::default()
200        }
201    }
202
203    fn clear_and_shrink(&mut self) {
204        self.data.clear();
205        self.bytes = 0;
206        self.data.shrink_to_fit();
207    }
208}
209
210/// A shared pool of objects.
211pub struct Pool<T> {
212    data: Rc<RefCell<PoolState<T>>>,
213}
214
215impl<T> Clone for Pool<T> {
216    fn clone(&self) -> Self {
217        Pool {
218            data: self.data.clone(),
219        }
220    }
221}
222
223impl<T: Clear> Default for Pool<T> {
224    fn default() -> Self {
225        Pool {
226            data: Rc::new(RefCell::new(PoolState::new(usize::MAX))),
227        }
228    }
229}
230
231impl<T: Clear + InPoolSet<PoolSet>> Pool<T> {
232    pub(crate) fn new(limit: usize) -> Pool<T> {
233        Pool {
234            data: Rc::new(RefCell::new(PoolState::new(limit))),
235        }
236    }
237    /// Get an empty value of type `T`, potentially reused from the pool.
238    pub(crate) fn get(&self) -> Pooled<T> {
239        let empty = self.data.borrow_mut().pop();
240
241        Pooled {
242            data: ManuallyDrop::new(empty),
243        }
244    }
245
246    /// Clear the contents of the pool and release any memory associated with it.
247    pub(crate) fn clear(&self) {
248        let mut data_mut = self.data.borrow_mut();
249        data_mut.clear_and_shrink();
250    }
251}
252
253/// An owned value of type `T` that can be returned to a memory pool when it is
254/// no longer used.
255pub struct Pooled<T: Clear + InPoolSet<PoolSet>> {
256    data: ManuallyDrop<T>,
257}
258
259impl<T: Clear + InPoolSet<PoolSet>> Default for Pooled<T> {
260    fn default() -> Self {
261        with_pool_set(|ps| ps.get::<T>())
262    }
263}
264
265impl<T: Clear + fmt::Debug + InPoolSet<PoolSet>> fmt::Debug for Pooled<T> {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        let data: &T = &self.data;
268        data.fmt(f)
269    }
270}
271impl<T: Clear + PartialEq + InPoolSet<PoolSet>> PartialEq for Pooled<T> {
272    fn eq(&self, other: &Self) -> bool {
273        // This form rid of a spuriou clippy warning about unconditional recursion.
274        <T as PartialEq>::eq(&self.data, &other.data)
275    }
276}
277
278impl<T: Clear + InPoolSet<PoolSet> + Eq> Eq for Pooled<T> {}
279
280impl<T: Clear + Hash + InPoolSet<PoolSet>> Hash for Pooled<T> {
281    fn hash<H: Hasher>(&self, state: &mut H) {
282        self.data.hash(state)
283    }
284}
285
286impl<T: Clear + InPoolSet<PoolSet> + 'static> Pooled<T> {
287    /// Clear the contents the wrapped object. If the object cannot be reused,
288    /// attempt to fetch another value from the pool.
289    ///
290    /// This method can be used in concert with `relinquish` to provide a
291    /// `clear` operation that hands data back to the pool, and then grabs it
292    /// back again if it needs to be reused.
293    ///
294    /// This pattern is likely only suitable for "temporary" buffers.
295    pub(crate) fn refresh(this: &mut Pooled<T>) {
296        this.data.clear();
297        if this.data.reuse() {
298            return;
299        }
300        let pool = with_pool_set(|ps| ps.get_pool::<T>());
301        let mut other = pool.data.borrow_mut().pop();
302        if !other.reuse() {
303            return;
304        }
305        let slot: &mut T = &mut this.data;
306        mem::swap(slot, &mut other);
307    }
308
309    pub(crate) fn into_inner(this: Pooled<T>) -> T {
310        // SAFETY: ownership of `this.data` is transferred to the caller. We
311        // will not drop `this` or use it again.
312        let inner = unsafe { ptr::read(&this.data) };
313        mem::forget(this);
314        ManuallyDrop::into_inner(inner)
315    }
316
317    pub(crate) fn new(data: T) -> Pooled<T> {
318        Pooled {
319            data: ManuallyDrop::new(data),
320        }
321    }
322}
323
324impl<T: Clear + Clone + InPoolSet<PoolSet>> Pooled<T> {
325    pub(crate) fn cloned(this: &Pooled<T>) -> Pooled<T> {
326        let mut res = with_pool_set(|ps| ps.get::<T>());
327        res.clone_from(this);
328        res
329    }
330}
331
332impl<T: Clear + InPoolSet<PoolSet>> Drop for Pooled<T> {
333    fn drop(&mut self) {
334        let reuse = self.data.reuse();
335        if !reuse {
336            // SAFETY: we own `self.data` and being in the drop method means no
337            // one else will access it.
338            unsafe { ManuallyDrop::drop(&mut self.data) };
339            return;
340        }
341        self.data.clear();
342        let t: &T = &self.data;
343        // SAFETY: ownership of `self.data` is transferred to the pool
344        with_pool_set(|ps| {
345            T::with_pool(ps, |pool| {
346                pool.data.borrow_mut().push(unsafe { ptr::read(t) })
347            })
348        });
349    }
350}
351
352impl<T: Clear + InPoolSet<PoolSet>> Deref for Pooled<T> {
353    type Target = T;
354
355    fn deref(&self) -> &T {
356        &self.data
357    }
358}
359
360impl<T: Clear + InPoolSet<PoolSet>> DerefMut for Pooled<T> {
361    fn deref_mut(&mut self) -> &mut T {
362        &mut self.data
363    }
364}
365
366/// Helper trait for allowing the trait resolution system to infer the correct
367/// pool type during allocation.
368pub trait InPoolSet<PoolSet>
369where
370    Self: Sized + Clear,
371{
372    fn with_pool<R>(pool_set: &PoolSet, f: impl FnOnce(&Pool<Self>) -> R) -> R;
373}
374
375macro_rules! pool_set {
376    ($vis:vis $name:ident { $($ident:ident : $ty:ty [ $bytes:expr ],)* }) => {
377        $vis struct $name {
378            $(
379                $ident: Pool<$ty>,
380            )*
381        }
382
383        impl Default for $name {
384            fn default() -> Self {
385                $name {
386                $(
387                    $ident: Pool::new($bytes),
388                )*
389                }
390            }
391        }
392
393        impl $name {
394            $vis fn get_pool<T: InPoolSet<Self>>(&self) -> Pool<T> {
395                T::with_pool(self, Pool::clone)
396            }
397
398            $vis fn get<T: InPoolSet<Self> + Default>(&self) -> Pooled<T> {
399                T::with_pool(self, |pool| pool.get())
400            }
401            $vis fn clear(&self) {
402                $( self.$ident.clear(); )*
403            }
404        }
405
406        $(
407            impl InPoolSet<$name> for $ty {
408                fn with_pool<R>(pool_set: &$name, f: impl FnOnce(&Pool<Self>) -> R) -> R {
409                    f(&pool_set.$ident)
410                }
411            }
412        )*
413    }
414}
415
416// The main thread-local memory pool used for reusing allocations. The syntax is:
417//
418// <name> : <type> [ <bytes> ],
419//
420// Where `name` is not used for anything, `type` feeds into the `InPoolSet` machinery and allows
421// anything of that type to be allocated using `with_pool_set`, and `bytes` is a per-type limit on
422// the total bytes that can be buffered in a single (per-thread) memory pool.
423
424pool_set! {
425    pub PoolSet {
426        vec_vals: Vec<Value> [ 1 << 25 ],
427        vec_cell_vals: Vec<Cell<Value>> [ 1 << 25 ],
428        // TODO: work on scaffolding/DI/etc. so that we can share allocations
429        // between vec_vals and shared_vals.
430        rows: Vec<RowId> [ 1 << 25 ],
431        offset_vec: SortedOffsetVector [ 1 << 20 ],
432        column_index: IndexMap<Value, BufferedSubset> [ 1 << 20 ],
433        constraints: Vec<Constraint> [ 1 << 20 ],
434        bitsets: FixedBitSet [ 1 << 20 ],
435        instrs: Vec<Instr> [ 1 << 20 ],
436        tuple_indexes: HashTable<TableEntry<BufferedSubset>> [ 1 << 20 ],
437        staged_outputs: HashTable<SwTableEntry> [ 1 << 25 ],
438        predicted_vals: PredictedVals [ 1 << 20 ],
439        shard_hist: DenseIdMap<ShardId, usize> [ 1 << 20 ],
440        instr_indexes: Vec<u32> [ 1 << 20 ],
441        cached_subsets: IdVec<ColumnId, std::sync::OnceLock<std::sync::Arc<ColumnIndex>>> [ 4 << 20 ],
442        intersected_on: DenseIdMap<AtomId, i64> [ 1 << 20 ],
443
444        cached_child: IdVec<ColumnId, std::sync::RwLock<HashMap<Value, std::sync::Arc<TrieNode>>>> [ 1 << 20 ],
445    }
446}
447
448/// Run `f` on the thread-local [`PoolSet`].
449pub(crate) fn with_pool_set<R>(f: impl FnOnce(&PoolSet) -> R) -> R {
450    POOL_SET.with(|pool_set| f(pool_set))
451}
452
453thread_local! {
454    /// A thread-local pool set. All pooled allocations land back in the local thread.
455    ///
456    /// We don't drop this PoolSet because it does not contain any resources
457    /// that need to be released, other than memory (which will be reclaimed
458    /// when the process exits, right after drop runs).
459    ///
460    /// For large egraphs, this be a big runtime win. The main egglog binary
461    /// avoids dropping the egraph for the same reason.
462    static POOL_SET: ManuallyDrop<PoolSet> = Default::default();
463}