use std::{
any::{Any, TypeId},
collections::HashMap,
hash::{BuildHasherDefault, Hasher},
sync::Arc,
};
use datafusion_common::{
config::{ConfigExtension, ConfigOptions},
Result, ScalarValue,
};
#[derive(Clone, Debug)]
pub struct SessionConfig {
options: ConfigOptions,
extensions: AnyMap,
}
impl Default for SessionConfig {
fn default() -> Self {
Self {
options: ConfigOptions::new(),
extensions: HashMap::with_capacity_and_hasher(
0,
BuildHasherDefault::default(),
),
}
}
}
impl SessionConfig {
pub fn new() -> Self {
Default::default()
}
pub fn from_env() -> Result<Self> {
Ok(ConfigOptions::from_env()?.into())
}
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
Ok(ConfigOptions::from_string_hash_map(settings)?.into())
}
pub fn options(&self) -> &ConfigOptions {
&self.options
}
pub fn options_mut(&mut self) -> &mut ConfigOptions {
&mut self.options
}
pub fn set(self, key: &str, value: &ScalarValue) -> Self {
self.set_str(key, &value.to_string())
}
pub fn set_bool(self, key: &str, value: bool) -> Self {
self.set_str(key, &value.to_string())
}
pub fn set_u64(self, key: &str, value: u64) -> Self {
self.set_str(key, &value.to_string())
}
pub fn set_usize(self, key: &str, value: usize) -> Self {
self.set_str(key, &value.to_string())
}
pub fn set_str(mut self, key: &str, value: &str) -> Self {
self.options.set(key, value).unwrap();
self
}
pub fn with_batch_size(mut self, n: usize) -> Self {
assert!(n > 0);
self.options.execution.batch_size = n;
self
}
pub fn with_target_partitions(mut self, n: usize) -> Self {
assert!(n > 0);
self.options.execution.target_partitions = n;
self
}
pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
self.options_mut().extensions.insert(extension);
self
}
pub fn target_partitions(&self) -> usize {
self.options.execution.target_partitions
}
pub fn information_schema(&self) -> bool {
self.options.catalog.information_schema
}
pub fn create_default_catalog_and_schema(&self) -> bool {
self.options.catalog.create_default_catalog_and_schema
}
pub fn repartition_joins(&self) -> bool {
self.options.optimizer.repartition_joins
}
pub fn repartition_aggregations(&self) -> bool {
self.options.optimizer.repartition_aggregations
}
pub fn repartition_window_functions(&self) -> bool {
self.options.optimizer.repartition_windows
}
pub fn repartition_sorts(&self) -> bool {
self.options.optimizer.repartition_sorts
}
pub fn prefer_existing_sort(&self) -> bool {
self.options.optimizer.prefer_existing_sort
}
pub fn collect_statistics(&self) -> bool {
self.options.execution.collect_statistics
}
pub fn with_default_catalog_and_schema(
mut self,
catalog: impl Into<String>,
schema: impl Into<String>,
) -> Self {
self.options.catalog.default_catalog = catalog.into();
self.options.catalog.default_schema = schema.into();
self
}
pub fn with_create_default_catalog_and_schema(mut self, create: bool) -> Self {
self.options.catalog.create_default_catalog_and_schema = create;
self
}
pub fn with_information_schema(mut self, enabled: bool) -> Self {
self.options.catalog.information_schema = enabled;
self
}
pub fn with_repartition_joins(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_joins = enabled;
self
}
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_aggregations = enabled;
self
}
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
self.options.optimizer.repartition_file_min_size = size;
self
}
pub fn with_allow_symmetric_joins_without_pruning(mut self, enabled: bool) -> Self {
self.options.optimizer.allow_symmetric_joins_without_pruning = enabled;
self
}
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_file_scans = enabled;
self
}
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_windows = enabled;
self
}
pub fn with_repartition_sorts(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_sorts = enabled;
self
}
pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
self.options.optimizer.prefer_existing_sort = enabled;
self
}
pub fn with_prefer_existing_union(mut self, enabled: bool) -> Self {
self.options.optimizer.prefer_existing_union = enabled;
self
}
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
self
}
pub fn parquet_pruning(&self) -> bool {
self.options.execution.parquet.pruning
}
pub fn parquet_bloom_filter_pruning(&self) -> bool {
self.options.execution.parquet.bloom_filter_on_read
}
pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.bloom_filter_on_read = enabled;
self
}
pub fn parquet_page_index_pruning(&self) -> bool {
self.options.execution.parquet.enable_page_index
}
pub fn with_parquet_page_index_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.enable_page_index = enabled;
self
}
pub fn with_collect_statistics(mut self, enabled: bool) -> Self {
self.options.execution.collect_statistics = enabled;
self
}
pub fn batch_size(&self) -> usize {
self.options.execution.batch_size
}
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.options.execution.coalesce_batches = enabled;
self
}
pub fn coalesce_batches(&self) -> bool {
self.options.execution.coalesce_batches
}
pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
self.options.optimizer.enable_round_robin_repartition = enabled;
self
}
pub fn round_robin_repartition(&self) -> bool {
self.options.optimizer.enable_round_robin_repartition
}
pub fn with_sort_spill_reservation_bytes(
mut self,
sort_spill_reservation_bytes: usize,
) -> Self {
self.options.execution.sort_spill_reservation_bytes =
sort_spill_reservation_bytes;
self
}
pub fn with_sort_in_place_threshold_bytes(
mut self,
sort_in_place_threshold_bytes: usize,
) -> Self {
self.options.execution.sort_in_place_threshold_bytes =
sort_in_place_threshold_bytes;
self
}
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
for entry in self.options.entries() {
map.insert(entry.key, entry.value.unwrap_or_default());
}
map
}
pub fn with_extension<T>(mut self, ext: Arc<T>) -> Self
where
T: Send + Sync + 'static,
{
self.set_extension(ext);
self
}
pub fn set_extension<T>(&mut self, ext: Arc<T>)
where
T: Send + Sync + 'static,
{
let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
let id = TypeId::of::<T>();
self.extensions.insert(id, ext);
}
pub fn get_extension<T>(&self) -> Option<Arc<T>>
where
T: Send + Sync + 'static,
{
let id = TypeId::of::<T>();
self.extensions
.get(&id)
.cloned()
.map(|ext| Arc::downcast(ext).expect("TypeId unique"))
}
}
impl From<ConfigOptions> for SessionConfig {
fn from(options: ConfigOptions) -> Self {
Self {
options,
..Default::default()
}
}
}
type AnyMap =
HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>>;
#[derive(Default)]
struct IdHasher(u64);
impl Hasher for IdHasher {
fn write(&mut self, _: &[u8]) {
unreachable!("TypeId calls write_u64");
}
#[inline]
fn write_u64(&mut self, id: u64) {
self.0 = id;
}
#[inline]
fn finish(&self) -> u64 {
self.0
}
}