use crate::{
disk_manager::{DiskManager, DiskManagerConfig},
memory_pool::{
GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
},
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
use datafusion_common::{DataFusionError, Result};
use object_store::ObjectStore;
use std::path::PathBuf;
use std::sync::Arc;
use std::{
fmt::{Debug, Formatter},
num::NonZeroUsize,
};
use url::Url;
#[derive(Clone)]
pub struct RuntimeEnv {
pub memory_pool: Arc<dyn MemoryPool>,
pub disk_manager: Arc<DiskManager>,
pub cache_manager: Arc<CacheManager>,
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}
impl Debug for RuntimeEnv {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "RuntimeEnv")
}
}
impl RuntimeEnv {
pub fn new(config: RuntimeConfig) -> Result<Self> {
let RuntimeConfig {
memory_pool,
disk_manager,
cache_manager,
object_store_registry,
} = config;
let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
Ok(Self {
memory_pool,
disk_manager: DiskManager::try_new(disk_manager)?,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
})
}
pub fn register_object_store(
&self,
url: &Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
self.object_store_registry.register_store(url, object_store)
}
pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
self.object_store_registry
.get_store(url.as_ref())
.map_err(DataFusionError::from)
}
}
impl Default for RuntimeEnv {
fn default() -> Self {
RuntimeEnvBuilder::new().build().unwrap()
}
}
pub type RuntimeConfig = RuntimeEnvBuilder;
#[derive(Clone)]
pub struct RuntimeEnvBuilder {
pub disk_manager: DiskManagerConfig,
pub memory_pool: Option<Arc<dyn MemoryPool>>,
pub cache_manager: CacheManagerConfig,
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}
impl Default for RuntimeEnvBuilder {
fn default() -> Self {
Self::new()
}
}
impl RuntimeEnvBuilder {
pub fn new() -> Self {
Self {
disk_manager: Default::default(),
memory_pool: Default::default(),
cache_manager: Default::default(),
object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()),
}
}
pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
self.disk_manager = disk_manager;
self
}
pub fn with_memory_pool(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self {
self.memory_pool = Some(memory_pool);
self
}
pub fn with_cache_manager(mut self, cache_manager: CacheManagerConfig) -> Self {
self.cache_manager = cache_manager;
self
}
pub fn with_object_store_registry(
mut self,
object_store_registry: Arc<dyn ObjectStoreRegistry>,
) -> Self {
self.object_store_registry = object_store_registry;
self
}
pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self {
let pool_size = (max_memory as f64 * memory_fraction) as usize;
self.with_memory_pool(Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(pool_size),
NonZeroUsize::new(5).unwrap(),
)))
}
pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
}
pub fn build(self) -> Result<RuntimeEnv> {
let memory_pool = self
.memory_pool
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
Ok(RuntimeEnv {
memory_pool,
disk_manager: DiskManager::try_new(self.disk_manager)?,
cache_manager: CacheManager::try_new(&self.cache_manager)?,
object_store_registry: self.object_store_registry,
})
}
pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
self.build().map(Arc::new)
}
}