egglog_core_relations/pool/
mod.rs1use std::{
4 cell::{Cell, RefCell},
5 fmt,
6 hash::{Hash, Hasher},
7 mem::{self, ManuallyDrop},
8 ops::{Deref, DerefMut},
9 ptr,
10 rc::Rc,
11};
12
13use crate::{
14 AtomId,
15 free_join::execute::TrieNode,
16 numeric_id::{DenseIdMap, IdVec},
17};
18use fixedbitset::FixedBitSet;
19use hashbrown::HashTable;
20
21use crate::{
22 ColumnId, RowId,
23 action::{Instr, PredictedVals},
24 common::{HashMap, HashSet, IndexMap, IndexSet, ShardId, Value},
25 hash_index::{BufferedSubset, ColumnIndex, TableEntry},
26 offsets::SortedOffsetVector,
27 table::TableEntry as SwTableEntry,
28 table_spec::Constraint,
29};
30
31#[cfg(test)]
32mod tests;
33
34pub trait Clear: Default {
36 fn clear(&mut self);
40 fn reuse(&self) -> bool {
42 true
43 }
44 fn bytes(&self) -> usize;
46}
47
48impl<T> Clear for Vec<T> {
49 fn clear(&mut self) {
50 self.clear()
51 }
52 fn reuse(&self) -> bool {
53 self.capacity() > 256
54 }
55 fn bytes(&self) -> usize {
56 self.capacity() * mem::size_of::<T>()
57 }
58}
59
60impl<T: Clear> Clear for Rc<T> {
61 fn clear(&mut self) {
62 Rc::get_mut(self).unwrap().clear()
63 }
64 fn reuse(&self) -> bool {
65 Rc::strong_count(self) == 1 && Rc::weak_count(self) == 0
66 }
67 fn bytes(&self) -> usize {
68 mem::size_of::<T>()
69 }
70}
71
72impl<T: Clear> Clone for Pooled<Rc<T>>
73where
74 Rc<T>: InPoolSet<PoolSet>,
75{
76 fn clone(&self) -> Self {
77 Pooled {
78 data: self.data.clone(),
79 }
80 }
81}
82
83impl<T> Clear for HashSet<T> {
84 fn clear(&mut self) {
85 self.clear()
86 }
87 fn reuse(&self) -> bool {
88 self.capacity() > 0
89 }
90 fn bytes(&self) -> usize {
91 self.capacity() * mem::size_of::<T>()
92 }
93}
94
95impl<T> Clear for HashTable<T> {
96 fn clear(&mut self) {
97 self.clear()
98 }
99 fn reuse(&self) -> bool {
100 self.capacity() > 0
101 }
102 fn bytes(&self) -> usize {
103 self.capacity() * mem::size_of::<T>()
104 }
105}
106
107impl<K, V> Clear for HashMap<K, V> {
108 fn clear(&mut self) {
109 self.clear()
110 }
111 fn reuse(&self) -> bool {
112 self.capacity() > 0
113 }
114 fn bytes(&self) -> usize {
115 self.capacity() * mem::size_of::<(K, V)>()
116 }
117}
118
119impl<K, V> Clear for IndexMap<K, V> {
120 fn clear(&mut self) {
121 self.clear()
122 }
123 fn reuse(&self) -> bool {
124 self.capacity() > 0
125 }
126 fn bytes(&self) -> usize {
127 self.capacity() * (mem::size_of::<u64>() + mem::size_of::<(K, V)>())
128 }
129}
130
131impl<T> Clear for IndexSet<T> {
132 fn clear(&mut self) {
133 self.clear()
134 }
135 fn reuse(&self) -> bool {
136 self.capacity() > 0
137 }
138 fn bytes(&self) -> usize {
139 self.capacity() * (mem::size_of::<u64>() + mem::size_of::<T>())
140 }
141}
142
143impl Clear for FixedBitSet {
144 fn clear(&mut self) {
145 self.clone_from(&Default::default());
146 }
147 fn reuse(&self) -> bool {
148 !self.is_empty()
149 }
150 fn bytes(&self) -> usize {
151 self.len() / 8
152 }
153}
154
155impl<K, V> Clear for IdVec<K, V> {
156 fn clear(&mut self) {
157 self.clear()
158 }
159 fn reuse(&self) -> bool {
160 self.capacity() > 0
161 }
162 fn bytes(&self) -> usize {
163 self.capacity() * mem::size_of::<V>()
164 }
165}
166
167struct PoolState<T> {
168 data: Vec<T>,
169 bytes: usize,
170 limit: usize,
171}
172
173impl<T: Clear> PoolState<T> {
174 fn new(limit: usize) -> Self {
175 PoolState {
176 data: Vec::new(),
177 bytes: 0,
178 limit,
179 }
180 }
181
182 fn push(&mut self, mut item: T) {
183 if !item.reuse() {
184 return;
185 }
186 if self.bytes + item.bytes() > self.limit {
187 return;
188 }
189 item.clear();
190 self.bytes += item.bytes();
191 self.data.push(item);
192 }
193
194 fn pop(&mut self) -> T {
195 if let Some(got) = self.data.pop() {
196 self.bytes -= got.bytes();
197 got
198 } else {
199 Default::default()
200 }
201 }
202
203 fn clear_and_shrink(&mut self) {
204 self.data.clear();
205 self.bytes = 0;
206 self.data.shrink_to_fit();
207 }
208}
209
210pub struct Pool<T> {
212 data: Rc<RefCell<PoolState<T>>>,
213}
214
215impl<T> Clone for Pool<T> {
216 fn clone(&self) -> Self {
217 Pool {
218 data: self.data.clone(),
219 }
220 }
221}
222
223impl<T: Clear> Default for Pool<T> {
224 fn default() -> Self {
225 Pool {
226 data: Rc::new(RefCell::new(PoolState::new(usize::MAX))),
227 }
228 }
229}
230
231impl<T: Clear + InPoolSet<PoolSet>> Pool<T> {
232 pub(crate) fn new(limit: usize) -> Pool<T> {
233 Pool {
234 data: Rc::new(RefCell::new(PoolState::new(limit))),
235 }
236 }
237 pub(crate) fn get(&self) -> Pooled<T> {
239 let empty = self.data.borrow_mut().pop();
240
241 Pooled {
242 data: ManuallyDrop::new(empty),
243 }
244 }
245
246 pub(crate) fn clear(&self) {
248 let mut data_mut = self.data.borrow_mut();
249 data_mut.clear_and_shrink();
250 }
251}
252
253pub struct Pooled<T: Clear + InPoolSet<PoolSet>> {
256 data: ManuallyDrop<T>,
257}
258
259impl<T: Clear + InPoolSet<PoolSet>> Default for Pooled<T> {
260 fn default() -> Self {
261 with_pool_set(|ps| ps.get::<T>())
262 }
263}
264
265impl<T: Clear + fmt::Debug + InPoolSet<PoolSet>> fmt::Debug for Pooled<T> {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 let data: &T = &self.data;
268 data.fmt(f)
269 }
270}
271impl<T: Clear + PartialEq + InPoolSet<PoolSet>> PartialEq for Pooled<T> {
272 fn eq(&self, other: &Self) -> bool {
273 <T as PartialEq>::eq(&self.data, &other.data)
275 }
276}
277
278impl<T: Clear + InPoolSet<PoolSet> + Eq> Eq for Pooled<T> {}
279
280impl<T: Clear + Hash + InPoolSet<PoolSet>> Hash for Pooled<T> {
281 fn hash<H: Hasher>(&self, state: &mut H) {
282 self.data.hash(state)
283 }
284}
285
286impl<T: Clear + InPoolSet<PoolSet> + 'static> Pooled<T> {
287 pub(crate) fn refresh(this: &mut Pooled<T>) {
296 this.data.clear();
297 if this.data.reuse() {
298 return;
299 }
300 let pool = with_pool_set(|ps| ps.get_pool::<T>());
301 let mut other = pool.data.borrow_mut().pop();
302 if !other.reuse() {
303 return;
304 }
305 let slot: &mut T = &mut this.data;
306 mem::swap(slot, &mut other);
307 }
308
309 pub(crate) fn into_inner(this: Pooled<T>) -> T {
310 let inner = unsafe { ptr::read(&this.data) };
313 mem::forget(this);
314 ManuallyDrop::into_inner(inner)
315 }
316
317 pub(crate) fn new(data: T) -> Pooled<T> {
318 Pooled {
319 data: ManuallyDrop::new(data),
320 }
321 }
322}
323
324impl<T: Clear + Clone + InPoolSet<PoolSet>> Pooled<T> {
325 pub(crate) fn cloned(this: &Pooled<T>) -> Pooled<T> {
326 let mut res = with_pool_set(|ps| ps.get::<T>());
327 res.clone_from(this);
328 res
329 }
330}
331
332impl<T: Clear + InPoolSet<PoolSet>> Drop for Pooled<T> {
333 fn drop(&mut self) {
334 let reuse = self.data.reuse();
335 if !reuse {
336 unsafe { ManuallyDrop::drop(&mut self.data) };
339 return;
340 }
341 self.data.clear();
342 let t: &T = &self.data;
343 with_pool_set(|ps| {
345 T::with_pool(ps, |pool| {
346 pool.data.borrow_mut().push(unsafe { ptr::read(t) })
347 })
348 });
349 }
350}
351
352impl<T: Clear + InPoolSet<PoolSet>> Deref for Pooled<T> {
353 type Target = T;
354
355 fn deref(&self) -> &T {
356 &self.data
357 }
358}
359
360impl<T: Clear + InPoolSet<PoolSet>> DerefMut for Pooled<T> {
361 fn deref_mut(&mut self) -> &mut T {
362 &mut self.data
363 }
364}
365
366pub trait InPoolSet<PoolSet>
369where
370 Self: Sized + Clear,
371{
372 fn with_pool<R>(pool_set: &PoolSet, f: impl FnOnce(&Pool<Self>) -> R) -> R;
373}
374
375macro_rules! pool_set {
376 ($vis:vis $name:ident { $($ident:ident : $ty:ty [ $bytes:expr ],)* }) => {
377 $vis struct $name {
378 $(
379 $ident: Pool<$ty>,
380 )*
381 }
382
383 impl Default for $name {
384 fn default() -> Self {
385 $name {
386 $(
387 $ident: Pool::new($bytes),
388 )*
389 }
390 }
391 }
392
393 impl $name {
394 $vis fn get_pool<T: InPoolSet<Self>>(&self) -> Pool<T> {
395 T::with_pool(self, Pool::clone)
396 }
397
398 $vis fn get<T: InPoolSet<Self> + Default>(&self) -> Pooled<T> {
399 T::with_pool(self, |pool| pool.get())
400 }
401 $vis fn clear(&self) {
402 $( self.$ident.clear(); )*
403 }
404 }
405
406 $(
407 impl InPoolSet<$name> for $ty {
408 fn with_pool<R>(pool_set: &$name, f: impl FnOnce(&Pool<Self>) -> R) -> R {
409 f(&pool_set.$ident)
410 }
411 }
412 )*
413 }
414}
415
416pool_set! {
425 pub PoolSet {
426 vec_vals: Vec<Value> [ 1 << 25 ],
427 vec_cell_vals: Vec<Cell<Value>> [ 1 << 25 ],
428 rows: Vec<RowId> [ 1 << 25 ],
431 offset_vec: SortedOffsetVector [ 1 << 20 ],
432 column_index: IndexMap<Value, BufferedSubset> [ 1 << 20 ],
433 constraints: Vec<Constraint> [ 1 << 20 ],
434 bitsets: FixedBitSet [ 1 << 20 ],
435 instrs: Vec<Instr> [ 1 << 20 ],
436 tuple_indexes: HashTable<TableEntry<BufferedSubset>> [ 1 << 20 ],
437 staged_outputs: HashTable<SwTableEntry> [ 1 << 25 ],
438 predicted_vals: PredictedVals [ 1 << 20 ],
439 shard_hist: DenseIdMap<ShardId, usize> [ 1 << 20 ],
440 instr_indexes: Vec<u32> [ 1 << 20 ],
441 cached_subsets: IdVec<ColumnId, std::sync::OnceLock<std::sync::Arc<ColumnIndex>>> [ 4 << 20 ],
442 intersected_on: DenseIdMap<AtomId, i64> [ 1 << 20 ],
443
444 cached_child: IdVec<ColumnId, std::sync::RwLock<HashMap<Value, std::sync::Arc<TrieNode>>>> [ 1 << 20 ],
445 }
446}
447
448pub(crate) fn with_pool_set<R>(f: impl FnOnce(&PoolSet) -> R) -> R {
450 POOL_SET.with(|pool_set| f(pool_set))
451}
452
453thread_local! {
454 static POOL_SET: ManuallyDrop<PoolSet> = Default::default();
463}