use crate::{DataFusionError, Result};
use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Display;
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
)*
_ => Err(DataFusionError::Internal(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
}
}
config_namespace! {
pub struct CatalogOptions {
pub create_default_catalog_and_schema: bool, default = true
pub default_catalog: String, default = "datafusion".to_string()
pub default_schema: String, default = "public".to_string()
pub information_schema: bool, default = false
pub location: Option<String>, default = None
pub format: Option<String>, default = None
pub has_header: bool, default = false
}
}
config_namespace! {
pub struct SqlParserOptions {
pub parse_float_as_decimal: bool, default = false
pub enable_ident_normalization: bool, default = true
}
}
config_namespace! {
pub struct ExecutionOptions {
pub batch_size: usize, default = 8192
pub coalesce_batches: bool, default = true
pub collect_statistics: bool, default = false
pub target_partitions: usize, default = num_cpus::get()
pub time_zone: Option<String>, default = Some("+00:00".into())
pub parquet: ParquetOptions, default = Default::default()
}
}
config_namespace! {
pub struct ParquetOptions {
pub enable_page_index: bool, default = false
pub pruning: bool, default = true
pub skip_metadata: bool, default = true
pub metadata_size_hint: Option<usize>, default = None
pub pushdown_filters: bool, default = false
pub reorder_filters: bool, default = false
}
}
config_namespace! {
pub struct OptimizerOptions {
pub enable_round_robin_repartition: bool, default = true
pub filter_null_join_keys: bool, default = false
pub repartition_aggregations: bool, default = true
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
pub repartition_joins: bool, default = true
pub repartition_file_scans: bool, default = true
pub repartition_windows: bool, default = true
pub repartition_sorts: bool, default = true
pub skip_failed_rules: bool, default = true
pub max_passes: usize, default = 3
pub top_down_join_key_reordering: bool, default = true
pub prefer_hash_join: bool, default = true
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
}
}
config_namespace! {
pub struct ExplainOptions {
pub logical_plan_only: bool, default = false
pub physical_plan_only: bool, default = false
}
}
#[derive(Debug)]
pub struct ConfigEntry {
pub key: String,
pub value: Option<String>,
pub description: &'static str,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ConfigOptions {
pub catalog: CatalogOptions,
pub execution: ExecutionOptions,
pub optimizer: OptimizerOptions,
pub sql_parser: SqlParserOptions,
pub explain: ExplainOptions,
pub extensions: Extensions,
}
impl ConfigField for ConfigOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"catalog" => self.catalog.set(rem, value),
"execution" => self.execution.set(rem, value),
"optimizer" => self.optimizer.set(rem, value),
"explain" => self.explain.set(rem, value),
"sql_parser" => self.sql_parser.set(rem, value),
_ => Err(DataFusionError::Internal(format!(
"Config value \"{key}\" not found on ConfigOptions"
))),
}
}
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
self.catalog.visit(v, "datafusion.catalog", "");
self.execution.visit(v, "datafusion.execution", "");
self.optimizer.visit(v, "datafusion.optimizer", "");
self.explain.visit(v, "datafusion.explain", "");
self.sql_parser.visit(v, "datafusion.sql_parser", "");
}
}
impl ConfigOptions {
pub fn new() -> Self {
Self::default()
}
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (prefix, key) = key.split_once('.').ok_or_else(|| {
DataFusionError::External(
format!("could not find config namespace for key \"{key}\"",).into(),
)
})?;
if prefix == "datafusion" {
return ConfigField::set(self, key, value);
}
let e = self.extensions.0.get_mut(prefix);
let e = e.ok_or_else(|| {
DataFusionError::External(
format!("Could not find config namespace \"{prefix}\"",).into(),
)
})?;
e.0.set(key, value)
}
pub fn from_env() -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
let env = key.to_uppercase().replace('.', "_");
if let Some(var) = std::env::var_os(env) {
ret.set(&key, var.to_string_lossy().as_ref())?;
}
}
Ok(ret)
}
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "datafusion", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let mut s = Self::default();
s.execution.target_partitions = 0;
let mut docs = "| key | default | description |\n".to_string();
docs += "|-----|---------|-------------|\n";
let mut entries = s.entries();
entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
for entry in s.entries() {
let _ = writeln!(
&mut docs,
"| {} | {} | {} |",
entry.key,
entry.value.as_deref().unwrap_or("NULL"),
entry.description
);
}
docs
}
}
pub trait ConfigExtension: ExtensionOptions {
const PREFIX: &'static str;
}
pub trait ExtensionOptions: Send + Sync + std::fmt::Debug + 'static {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
fn cloned(&self) -> Box<dyn ExtensionOptions>;
fn set(&mut self, key: &str, value: &str) -> Result<()>;
fn entries(&self) -> Vec<ConfigEntry>;
}
#[derive(Debug, Default, Clone)]
pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
impl Extensions {
pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
assert_ne!(T::PREFIX, "datafusion");
let e = ExtensionBox(Box::new(extension));
self.0.insert(T::PREFIX, e);
}
pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
}
pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
let e = self.0.get_mut(T::PREFIX)?;
e.0.as_any_mut().downcast_mut()
}
}
#[derive(Debug)]
struct ExtensionBox(Box<dyn ExtensionOptions>);
impl Clone for ExtensionBox {
fn clone(&self) -> Self {
Self(self.0.cloned())
}
}
trait ConfigField {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
fn set(&mut self, key: &str, value: &str) -> Result<()>;
}
impl<F: ConfigField + Default> ConfigField for Option<F> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
self.get_or_insert_with(Default::default).set(key, value)
}
}
macro_rules! config_field {
($t:ty) => {
impl ConfigField for $t {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = value.parse().map_err(|e| {
DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
Ok(())
}
}
};
}
config_field!(String);
config_field!(bool);
config_field!(usize);
trait Visit {
fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
fn none(&mut self, key: &str, description: &'static str);
}