use crate::memory_pool::{
human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
};
use datafusion_common::HashMap;
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicUsize, Ordering},
};
#[derive(Debug, Default)]
pub struct UnboundedMemoryPool {
used: AtomicUsize,
}
impl MemoryPool for UnboundedMemoryPool {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.grow(reservation, additional);
Ok(())
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Infinite
}
}
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
used: AtomicUsize,
}
impl GreedyMemoryPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
used: AtomicUsize::new(0),
}
}
}
impl MemoryPool for GreedyMemoryPool {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
)
})?;
Ok(())
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}
#[derive(Debug)]
pub struct FairSpillPool {
pool_size: usize,
state: Mutex<FairSpillPoolState>,
}
#[derive(Debug)]
struct FairSpillPoolState {
num_spill: usize,
spillable: usize,
unspillable: usize,
}
impl FairSpillPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new FairSpillPool(pool_size={pool_size})");
Self {
pool_size,
state: Mutex::new(FairSpillPoolState {
num_spill: 0,
spillable: 0,
unspillable: 0,
}),
}
}
}
impl MemoryPool for FairSpillPool {
fn register(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
self.state.lock().num_spill += 1;
}
}
fn unregister(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
let mut state = self.state.lock();
state.num_spill = state.num_spill.checked_sub(1).unwrap();
}
}
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable += additional,
false => state.unspillable += additional,
}
}
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable -= shrink,
false => state.unspillable -= shrink,
}
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => {
let spill_available = self.pool_size.saturating_sub(state.unspillable);
let available = spill_available
.checked_div(state.num_spill)
.unwrap_or(spill_available);
if reservation.size + additional > available {
return Err(insufficient_capacity_err(
reservation,
additional,
available,
));
}
state.spillable += additional;
}
false => {
let available = self
.pool_size
.saturating_sub(state.unspillable + state.spillable);
if available < additional {
return Err(insufficient_capacity_err(
reservation,
additional,
available,
));
}
state.unspillable += additional;
}
}
Ok(())
}
fn reserved(&self) -> usize {
let state = self.state.lock();
state.spillable + state.unspillable
}
fn memory_limit(&self) -> MemoryLimit {
MemoryLimit::Finite(self.pool_size)
}
}
#[inline(always)]
fn insufficient_capacity_err(
reservation: &MemoryReservation,
additional: usize,
available: usize,
) -> DataFusionError {
resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool",
human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
}
#[derive(Debug)]
struct TrackedConsumer {
name: String,
can_spill: bool,
reserved: AtomicUsize,
}
impl TrackedConsumer {
fn reserved(&self) -> usize {
self.reserved.load(Ordering::Relaxed)
}
fn grow(&self, additional: usize) {
self.reserved.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, shrink: usize) {
self.reserved.fetch_sub(shrink, Ordering::Relaxed);
}
}
#[derive(Debug)]
pub struct TrackConsumersPool<I> {
inner: I,
top: NonZeroUsize,
tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
}
impl<I: MemoryPool> TrackConsumersPool<I> {
pub fn new(inner: I, top: NonZeroUsize) -> Self {
Self {
inner,
top,
tracked_consumers: Default::default(),
}
}
pub fn report_top(&self, top: usize) -> String {
let mut consumers = self
.tracked_consumers
.lock()
.iter()
.map(|(consumer_id, tracked_consumer)| {
(
(
*consumer_id,
tracked_consumer.name.to_owned(),
tracked_consumer.can_spill,
),
tracked_consumer.reserved(),
)
})
.collect::<Vec<_>>();
consumers.sort_by(|a, b| b.1.cmp(&a.1));
consumers[0..std::cmp::min(top, consumers.len())]
.iter()
.map(|((id, name, can_spill), size)| {
format!(
" {name}#{id}(can spill: {can_spill}) consumed {}",
human_readable_size(*size)
)
})
.collect::<Vec<_>>()
.join(",\n")
+ "."
}
}
impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
fn register(&self, consumer: &MemoryConsumer) {
self.inner.register(consumer);
let mut guard = self.tracked_consumers.lock();
let existing = guard.insert(
consumer.id(),
TrackedConsumer {
name: consumer.name().to_string(),
can_spill: consumer.can_spill(),
reserved: Default::default(),
},
);
debug_assert!(
existing.is_none(),
"Registered was called twice on the same consumer"
);
}
fn unregister(&self, consumer: &MemoryConsumer) {
self.inner.unregister(consumer);
self.tracked_consumers.lock().remove(&consumer.id());
}
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.inner.grow(reservation, additional);
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.grow(additional);
});
}
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
self.inner.shrink(reservation, shrink);
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.shrink(shrink);
});
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.inner
.try_grow(reservation, additional)
.map_err(|e| match e {
DataFusionError::ResourcesExhausted(e) => {
DataFusionError::ResourcesExhausted(
provide_top_memory_consumers_to_error_msg(
e,
self.report_top(self.top.into()),
),
)
}
_ => e,
})?;
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.grow(additional);
});
Ok(())
}
fn reserved(&self) -> usize {
self.inner.reserved()
}
fn memory_limit(&self) -> MemoryLimit {
self.inner.memory_limit()
}
}
fn provide_top_memory_consumers_to_error_msg(
error_msg: String,
top_consumers: String,
) -> String {
format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
}
#[cfg(test)]
mod tests {
use super::*;
use insta::{allow_duplicates, assert_snapshot, Settings};
use std::sync::Arc;
fn make_settings() -> Settings {
let mut settings = Settings::clone_current();
settings.add_filter(
r"([^\s]+)\#\d+\(can spill: (true|false)\)",
"$1#[ID](can spill: $2)",
);
settings
}
#[test]
fn test_fair() {
let pool = Arc::new(FairSpillPool::new(100)) as _;
let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
r1.grow(2000);
assert_eq!(pool.reserved(), 2000);
let mut r2 = MemoryConsumer::new("r2")
.with_can_spill(true)
.register(&pool);
r2.grow(2000);
assert_eq!(pool.reserved(), 4000);
let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
r1.shrink(1990);
r2.shrink(2000);
assert_eq!(pool.reserved(), 10);
r1.try_grow(10).unwrap();
assert_eq!(pool.reserved(), 20);
r2.try_grow(80).unwrap();
assert_eq!(pool.reserved(), 100);
r2.shrink(70);
assert_eq!(r1.size(), 20);
assert_eq!(r2.size(), 10);
assert_eq!(pool.reserved(), 30);
let mut r3 = MemoryConsumer::new("r3")
.with_can_spill(true)
.register(&pool);
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
r2.free();
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
drop(r2);
assert_eq!(pool.reserved(), 20);
r3.try_grow(80).unwrap();
assert_eq!(pool.reserved(), 100);
r1.free();
assert_eq!(pool.reserved(), 80);
let mut r4 = MemoryConsumer::new("s4").register(&pool);
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
}
#[test]
fn test_tracked_consumers_pool() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let mut r1 = MemoryConsumer::new("r1").register(&pool);
r1.grow(70);
r1.shrink(20);
let mut r2 = MemoryConsumer::new("r2").register(&pool);
r2.try_grow(15)
.expect("should succeed in memory allotment for r2");
let mut r3 = MemoryConsumer::new("r3").register(&pool);
r3.try_resize(25)
.expect("should succeed in memory allotment for r3");
r3.try_resize(20)
.expect("should succeed in memory allotment for r3");
let mut r4 = MemoryConsumer::new("r4").register(&pool);
r4.grow(10);
let mut r5 = MemoryConsumer::new("r5").register(&pool);
let res = r5.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 50.0 B,
r3#[ID](can spill: false) consumed 20.0 B,
r2#[ID](can spill: false) consumed 15.0 B.
Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
");
}
#[test]
fn test_tracked_consumers_pool_register() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let same_name = "foo";
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
let res = r0.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
");
r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
let mut r1 = new_consumer_same_name.register(&pool);
let res = r1.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 10.0 B,
foo#[ID](can spill: false) consumed 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
");
r1.grow(20);
let res = r1.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
");
let consumer_with_same_name_but_different_hash =
MemoryConsumer::new(same_name).with_can_spill(true);
let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
let res = r2.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
foo#[ID](can spill: false) consumed 20.0 B,
foo#[ID](can spill: false) consumed 10.0 B,
foo#[ID](can spill: true) consumed 0.0 B.
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
");
}
#[test]
fn test_tracked_consumers_pool_deregister() {
fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let mut r0 = MemoryConsumer::new("r0").register(&pool);
r0.grow(10);
let r1_consumer = MemoryConsumer::new("r1");
let mut r1 = r1_consumer.register(&pool);
r1.grow(20);
let res = r0.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r1#[ID](can spill: false) consumed 20.0 B,
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
"));
drop(r1);
let res = r0.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
let res = r0.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
let res = r0.try_grow(150);
assert!(res.is_err());
let error = res.unwrap_err().strip_backtrace();
allow_duplicates!(assert_snapshot!(error, @r"
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
r0#[ID](can spill: false) consumed 10.0 B.
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
"));
}
let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
FairSpillPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
test_per_pool_type(tracked_spill_pool);
let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
test_per_pool_type(tracked_greedy_pool);
}
#[test]
fn test_tracked_consumers_pool_use_beyond_errors() {
let setting = make_settings();
let _bound = setting.bind_to_scope();
let upcasted: Arc<dyn std::any::Any + Send + Sync> =
Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
.downcast::<TrackConsumersPool<GreedyMemoryPool>>()
.unwrap();
let mut r1 = MemoryConsumer::new("r1").register(&pool);
r1.grow(20);
let mut r2 = MemoryConsumer::new("r2").register(&pool);
r2.grow(15);
let mut r3 = MemoryConsumer::new("r3").register(&pool);
r3.grow(45);
let downcasted = upcasted
.downcast::<TrackConsumersPool<GreedyMemoryPool>>()
.unwrap();
let res = downcasted.report_top(2);
assert_snapshot!(res, @r"
r3#[ID](can spill: false) consumed 45.0 B,
r1#[ID](can spill: false) consumed 20.0 B.
");
}
}