use datafusion_common::{
config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
};
use log::debug;
use parking_lot::Mutex;
use rand::{rng, Rng};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};
use crate::memory_pool::human_readable_size;
const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024;
#[derive(Clone, Debug)]
pub struct DiskManagerBuilder {
mode: DiskManagerMode,
max_temp_directory_size: u64,
}
impl Default for DiskManagerBuilder {
fn default() -> Self {
Self {
mode: DiskManagerMode::OsTmpDirectory,
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
}
}
}
impl DiskManagerBuilder {
pub fn set_mode(&mut self, mode: DiskManagerMode) {
self.mode = mode;
}
pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
self.set_mode(mode);
self
}
pub fn set_max_temp_directory_size(&mut self, value: u64) {
self.max_temp_directory_size = value;
}
pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
self.set_max_temp_directory_size(value);
self
}
pub fn build(self) -> Result<DiskManager> {
match self.mode {
DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
}),
DiskManagerMode::Directories(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {local_dirs:?} as DataFusion working directory"
);
Ok(DiskManager {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
})
}
DiskManagerMode::Disabled => Ok(DiskManager {
local_dirs: Mutex::new(None),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
}),
}
}
}
#[derive(Clone, Debug)]
pub enum DiskManagerMode {
OsTmpDirectory,
Directories(Vec<PathBuf>),
Disabled,
}
impl Default for DiskManagerMode {
fn default() -> Self {
Self::OsTmpDirectory
}
}
#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
#[derive(Debug, Clone)]
pub enum DiskManagerConfig {
Existing(Arc<DiskManager>),
NewOs,
NewSpecified(Vec<PathBuf>),
Disabled,
}
#[allow(deprecated)]
impl Default for DiskManagerConfig {
fn default() -> Self {
Self::NewOs
}
}
#[allow(deprecated)]
impl DiskManagerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn new_existing(existing: Arc<DiskManager>) -> Self {
Self::Existing(existing)
}
pub fn new_specified(paths: Vec<PathBuf>) -> Self {
Self::NewSpecified(paths)
}
}
#[derive(Debug)]
pub struct DiskManager {
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
max_temp_directory_size: u64,
used_disk_space: Arc<AtomicU64>,
}
impl DiskManager {
pub fn builder() -> DiskManagerBuilder {
DiskManagerBuilder::default()
}
#[allow(deprecated)]
#[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {local_dirs:?} as DataFusion working directory"
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
})),
}
}
pub fn set_max_temp_directory_size(
&mut self,
max_temp_directory_size: u64,
) -> Result<()> {
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for a disk manager that spilling is disabled"
);
}
self.max_temp_directory_size = max_temp_directory_size;
Ok(())
}
pub fn set_arc_max_temp_directory_size(
this: &mut Arc<Self>,
max_temp_directory_size: u64,
) -> Result<()> {
if let Some(inner) = Arc::get_mut(this) {
inner.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(())
} else {
config_err!("DiskManager should be a single instance")
}
}
pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
self.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(self)
}
pub fn used_disk_space(&self) -> u64 {
self.used_disk_space.load(Ordering::Relaxed)
}
pub fn tmp_files_enabled(&self) -> bool {
self.local_dirs.lock().is_some()
}
pub fn create_tmp_file(
self: &Arc<Self>,
request_description: &str,
) -> Result<RefCountedTempFile> {
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
resources_datafusion_err!(
"Memory Exhausted while {request_description} (DiskManager is disabled)"
)
})?;
if local_dirs.is_empty() {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
debug!(
"Created directory '{:?}' as DataFusion tempfile directory for {}",
tempdir.path().to_string_lossy(),
request_description,
);
local_dirs.push(Arc::new(tempdir));
}
let dir_index = rng().random_range(0..local_dirs.len());
Ok(RefCountedTempFile {
_parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
current_file_disk_usage: 0,
disk_manager: Arc::clone(self),
})
}
}
#[derive(Debug)]
pub struct RefCountedTempFile {
_parent_temp_dir: Arc<TempDir>,
tempfile: NamedTempFile,
current_file_disk_usage: u64,
disk_manager: Arc<DiskManager>,
}
impl RefCountedTempFile {
pub fn path(&self) -> &Path {
self.tempfile.path()
}
pub fn inner(&self) -> &NamedTempFile {
&self.tempfile
}
pub fn update_disk_usage(&mut self) -> Result<()> {
let metadata = self.tempfile.as_file().metadata()?;
let new_disk_usage = metadata.len();
self.disk_manager
.used_disk_space
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
self.disk_manager
.used_disk_space
.fetch_add(new_disk_usage, Ordering::Relaxed);
let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
if global_disk_usage > self.disk_manager.max_temp_directory_size {
return resources_err!(
"The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
human_readable_size(self.disk_manager.max_temp_directory_size as usize)
);
}
self.current_file_disk_usage = new_disk_usage;
Ok(())
}
}
impl Drop for RefCountedTempFile {
fn drop(&mut self) {
self.disk_manager
.used_disk_space
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
}
}
fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
local_dirs
.iter()
.map(|root| {
if !Path::new(root).exists() {
std::fs::create_dir(root)?;
}
Builder::new()
.prefix("datafusion-")
.tempdir_in(root)
.map_err(DataFusionError::IoError)
})
.map(|result| result.map(Arc::new))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lazy_temp_dir_creation() -> Result<()> {
let dm = Arc::new(DiskManagerBuilder::default().build()?);
assert_eq!(0, local_dir_snapshot(&dm).len());
let actual = dm.create_tmp_file("Testing")?;
assert_eq!(1, local_dir_snapshot(&dm).len());
let local_dirs = local_dir_snapshot(&dm);
assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
Ok(())
}
fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
dm.local_dirs
.lock()
.iter()
.flatten()
.map(|p| p.path().into())
.collect()
}
#[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
let dm = Arc::new(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(
local_dirs.iter().map(|p| p.into()).collect(),
))
.build()?,
);
assert!(dm.tmp_files_enabled());
let actual = dm.create_tmp_file("Testing")?;
assert_path_in_dirs(actual.path(), local_dirs.into_iter());
Ok(())
}
#[test]
fn test_disabled_disk_manager() {
let manager = Arc::new(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Disabled)
.build()
.unwrap(),
);
assert!(!manager.tmp_files_enabled());
assert_eq!(
manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
"Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
)
}
#[test]
fn test_disk_manager_create_spill_folder() {
let dir = TempDir::new().unwrap();
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
.build()
.unwrap();
}
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
dirs: impl Iterator<Item = &'a Path>,
) {
let dirs: Vec<&Path> = dirs.collect();
let found = dirs.iter().any(|dir_path| {
file_path
.ancestors()
.any(|candidate_path| *dir_path == candidate_path)
});
assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
}
#[test]
fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
let dm = Arc::new(DiskManagerBuilder::default().build()?);
let temp_file = dm.create_tmp_file("Testing")?;
let temp_file_path = temp_file.path().to_owned();
assert!(temp_file_path.exists());
drop(dm);
assert!(temp_file_path.exists());
drop(temp_file);
assert!(!temp_file_path.exists());
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
let dm = Arc::new(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(
local_dirs.iter().map(|p| p.into()).collect(),
))
.build()?,
);
let temp_file = dm.create_tmp_file("Testing")?;
let temp_file_path = temp_file.path().to_owned();
assert!(temp_file_path.exists());
drop(dm);
assert!(temp_file_path.exists());
drop(temp_file);
assert!(!temp_file_path.exists());
Ok(())
}
}