use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};
#[derive(Debug, Clone)]
pub enum DiskManagerConfig {
Existing(Arc<DiskManager>),
NewOs,
NewSpecified(Vec<PathBuf>),
Disabled,
}
impl Default for DiskManagerConfig {
fn default() -> Self {
Self::NewOs
}
}
impl DiskManagerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn new_existing(existing: Arc<DiskManager>) -> Self {
Self::Existing(existing)
}
pub fn new_specified(paths: Vec<PathBuf>) -> Self {
Self::NewSpecified(paths)
}
}
#[derive(Debug)]
pub struct DiskManager {
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
}
impl DiskManager {
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
})),
}
}
pub fn tmp_files_enabled(&self) -> bool {
self.local_dirs.lock().is_some()
}
pub fn create_tmp_file(
&self,
request_description: &str,
) -> Result<RefCountedTempFile> {
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
resources_datafusion_err!(
"Memory Exhausted while {request_description} (DiskManager is disabled)"
)
})?;
if local_dirs.is_empty() {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
debug!(
"Created directory '{:?}' as DataFusion tempfile directory for {}",
tempdir.path().to_string_lossy(),
request_description,
);
local_dirs.push(Arc::new(tempdir));
}
let dir_index = thread_rng().gen_range(0..local_dirs.len());
Ok(RefCountedTempFile {
_parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
})
}
}
#[derive(Debug)]
pub struct RefCountedTempFile {
_parent_temp_dir: Arc<TempDir>,
tempfile: NamedTempFile,
}
impl RefCountedTempFile {
pub fn path(&self) -> &Path {
self.tempfile.path()
}
pub fn inner(&self) -> &NamedTempFile {
&self.tempfile
}
}
fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
local_dirs
.iter()
.map(|root| {
if !Path::new(root).exists() {
std::fs::create_dir(root)?;
}
Builder::new()
.prefix("datafusion-")
.tempdir_in(root)
.map_err(DataFusionError::IoError)
})
.map(|result| result.map(Arc::new))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lazy_temp_dir_creation() -> Result<()> {
let config = DiskManagerConfig::new();
let dm = DiskManager::try_new(config)?;
assert_eq!(0, local_dir_snapshot(&dm).len());
let actual = dm.create_tmp_file("Testing")?;
assert_eq!(1, local_dir_snapshot(&dm).len());
let local_dirs = local_dir_snapshot(&dm);
assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
Ok(())
}
fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
dm.local_dirs
.lock()
.iter()
.flatten()
.map(|p| p.path().into())
.collect()
}
#[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
let config = DiskManagerConfig::new_specified(
local_dirs.iter().map(|p| p.into()).collect(),
);
let dm = DiskManager::try_new(config)?;
assert!(dm.tmp_files_enabled());
let actual = dm.create_tmp_file("Testing")?;
assert_path_in_dirs(actual.path(), local_dirs.into_iter());
Ok(())
}
#[test]
fn test_disabled_disk_manager() {
let config = DiskManagerConfig::Disabled;
let manager = DiskManager::try_new(config).unwrap();
assert!(!manager.tmp_files_enabled());
assert_eq!(
manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
"Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
)
}
#[test]
fn test_disk_manager_create_spill_folder() {
let config = DiskManagerConfig::new_specified(vec!["DOESNT_EXIST".into()]);
DiskManager::try_new(config)
.unwrap()
.create_tmp_file("Testing")
.unwrap();
}
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
dirs: impl Iterator<Item = &'a Path>,
) {
let dirs: Vec<&Path> = dirs.collect();
let found = dirs.iter().any(|dir_path| {
file_path
.ancestors()
.any(|candidate_path| *dir_path == candidate_path)
});
assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
}
#[test]
fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
let config = DiskManagerConfig::new();
let dm = DiskManager::try_new(config)?;
let temp_file = dm.create_tmp_file("Testing")?;
let temp_file_path = temp_file.path().to_owned();
assert!(temp_file_path.exists());
drop(dm);
assert!(temp_file_path.exists());
drop(temp_file);
assert!(!temp_file_path.exists());
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
let config = DiskManagerConfig::new_specified(
local_dirs.iter().map(|p| p.into()).collect(),
);
let dm = DiskManager::try_new(config)?;
let temp_file = dm.create_tmp_file("Testing")?;
let temp_file_path = temp_file.path().to_owned();
assert!(temp_file_path.exists());
drop(dm);
assert!(temp_file_path.exists());
drop(temp_file);
assert!(!temp_file_path.exists());
Ok(())
}
}