use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::str::FromStr;
use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
#[macro_export]
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, $(warn = $warn: expr,)? $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone, PartialEq)]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
$(let value = $transform(value);)?
$(log::warn!($warn);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => return _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
}
}
config_namespace! {
pub struct CatalogOptions {
pub create_default_catalog_and_schema: bool, default = true
pub default_catalog: String, default = "datafusion".to_string()
pub default_schema: String, default = "public".to_string()
pub information_schema: bool, default = false
pub location: Option<String>, default = None
pub format: Option<String>, default = None
pub has_header: bool, default = true
pub newlines_in_values: bool, default = false
}
}
config_namespace! {
pub struct SqlParserOptions {
pub parse_float_as_decimal: bool, default = false
pub enable_ident_normalization: bool, default = true
pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
pub dialect: String, default = "generic".to_string()
pub support_varchar_with_length: bool, default = true
}
}
config_namespace! {
pub struct ExecutionOptions {
pub batch_size: usize, default = 8192
pub coalesce_batches: bool, default = true
pub collect_statistics: bool, default = false
pub target_partitions: usize, default = get_available_parallelism()
pub time_zone: Option<String>, default = Some("+00:00".into())
pub parquet: ParquetOptions, default = Default::default()
pub planning_concurrency: usize, default = get_available_parallelism()
pub skip_physical_aggregate_schema_check: bool, default = false
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
pub meta_fetch_concurrency: usize, default = 32
pub minimum_parallel_output_files: usize, default = 4
pub soft_max_rows_per_output_file: usize, default = 50000000
pub max_buffered_batches_per_output_file: usize, default = 2
pub listing_table_ignore_subdirectory: bool, default = true
pub enable_recursive_ctes: bool, default = true
pub split_file_groups_by_statistics: bool, default = false
pub keep_partition_by_columns: bool, default = false
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
pub enforce_batch_size_in_joins: bool, default = false
}
}
config_namespace! {
pub struct ParquetOptions {
pub enable_page_index: bool, default = true
pub pruning: bool, default = true
pub skip_metadata: bool, default = true
pub metadata_size_hint: Option<usize>, default = None
pub pushdown_filters: bool, default = false
pub reorder_filters: bool, default = false
pub schema_force_view_types: bool, default = true
pub binary_as_string: bool, default = false
pub data_pagesize_limit: usize, default = 1024 * 1024
pub write_batch_size: usize, default = 1024
pub writer_version: String, default = "1.0".to_string()
pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
pub dictionary_enabled: Option<bool>, default = Some(true)
pub dictionary_page_size_limit: usize, default = 1024 * 1024
pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
pub max_statistics_size: Option<usize>, default = Some(4096)
pub max_row_group_size: usize, default = 1024 * 1024
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
pub column_index_truncate_length: Option<usize>, default = Some(64)
pub data_page_row_count_limit: usize, default = 20_000
pub encoding: Option<String>, transform = str::to_lowercase, default = None
pub bloom_filter_on_read: bool, default = true
pub bloom_filter_on_write: bool, default = false
pub bloom_filter_fpp: Option<f64>, default = None
pub bloom_filter_ndv: Option<u64>, default = None
pub allow_single_file_parallelism: bool, default = true
pub maximum_parallel_row_group_writers: usize, default = 1
pub maximum_buffered_record_batches_per_stream: usize, default = 2
}
}
config_namespace! {
pub struct OptimizerOptions {
pub enable_distinct_aggregation_soft_limit: bool, default = true
pub enable_round_robin_repartition: bool, default = true
pub enable_topk_aggregation: bool, default = true
pub filter_null_join_keys: bool, default = false
pub repartition_aggregations: bool, default = true
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
pub repartition_joins: bool, default = true
pub allow_symmetric_joins_without_pruning: bool, default = true
pub repartition_file_scans: bool, default = true
pub repartition_windows: bool, default = true
pub repartition_sorts: bool, default = true
pub prefer_existing_sort: bool, default = false
pub skip_failed_rules: bool, default = false
pub max_passes: usize, default = 3
pub top_down_join_key_reordering: bool, default = true
pub prefer_hash_join: bool, default = true
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
pub default_filter_selectivity: u8, default = 20
pub prefer_existing_union: bool, default = false
pub expand_views_at_output: bool, default = false
}
}
config_namespace! {
pub struct ExplainOptions {
pub logical_plan_only: bool, default = false
pub physical_plan_only: bool, default = false
pub show_statistics: bool, default = false
pub show_sizes: bool, default = true
pub show_schema: bool, default = false
}
}
#[derive(Debug)]
pub struct ConfigEntry {
pub key: String,
pub value: Option<String>,
pub description: &'static str,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ConfigOptions {
pub catalog: CatalogOptions,
pub execution: ExecutionOptions,
pub optimizer: OptimizerOptions,
pub sql_parser: SqlParserOptions,
pub explain: ExplainOptions,
pub extensions: Extensions,
}
impl ConfigField for ConfigOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"catalog" => self.catalog.set(rem, value),
"execution" => self.execution.set(rem, value),
"optimizer" => self.optimizer.set(rem, value),
"explain" => self.explain.set(rem, value),
"sql_parser" => self.sql_parser.set(rem, value),
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
}
}
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
self.catalog.visit(v, "datafusion.catalog", "");
self.execution.visit(v, "datafusion.execution", "");
self.optimizer.visit(v, "datafusion.optimizer", "");
self.explain.visit(v, "datafusion.explain", "");
self.sql_parser.visit(v, "datafusion.sql_parser", "");
}
}
impl ConfigOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let Some((prefix, key)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};
if prefix == "datafusion" {
return ConfigField::set(self, key, value);
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
e.0.set(key, value)
}
pub fn from_env() -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
let env = key.to_uppercase().replace('.', "_");
if let Some(var) = std::env::var_os(env) {
ret.set(&key, var.to_string_lossy().as_ref())?;
}
}
Ok(ret)
}
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
if let Some(var) = settings.get(&key) {
ret.set(&key, var)?;
}
}
Ok(ret)
}
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "datafusion", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let mut s = Self::default();
s.execution.target_partitions = 0;
s.execution.planning_concurrency = 0;
let mut docs = "| key | default | description |\n".to_string();
docs += "|-----|---------|-------------|\n";
let mut entries = s.entries();
entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
for entry in s.entries() {
let _ = writeln!(
&mut docs,
"| {} | {} | {} |",
entry.key,
entry.value.as_deref().unwrap_or("NULL"),
entry.description
);
}
docs
}
}
pub trait ConfigExtension: ExtensionOptions {
const PREFIX: &'static str;
}
pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
fn cloned(&self) -> Box<dyn ExtensionOptions>;
fn set(&mut self, key: &str, value: &str) -> Result<()>;
fn entries(&self) -> Vec<ConfigEntry>;
}
#[derive(Debug, Default, Clone)]
pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
impl Extensions {
pub fn new() -> Self {
Self(BTreeMap::new())
}
pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
assert_ne!(T::PREFIX, "datafusion");
let e = ExtensionBox(Box::new(extension));
self.0.insert(T::PREFIX, e);
}
pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
}
pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
let e = self.0.get_mut(T::PREFIX)?;
e.0.as_any_mut().downcast_mut()
}
}
#[derive(Debug)]
struct ExtensionBox(Box<dyn ExtensionOptions>);
impl Clone for ExtensionBox {
fn clone(&self) -> Self {
Self(self.0.cloned())
}
}
pub trait ConfigField {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
fn set(&mut self, key: &str, value: &str) -> Result<()>;
}
impl<F: ConfigField + Default> ConfigField for Option<F> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
self.get_or_insert_with(Default::default).set(key, value)
}
}
fn default_transform<T>(input: &str) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: Sync + Send + Error + 'static,
{
input.parse().map_err(|e| {
DataFusionError::Context(
format!(
"Error parsing '{}' as {}",
input,
std::any::type_name::<T>()
),
Box::new(DataFusionError::External(Box::new(e))),
)
})
}
#[macro_export]
macro_rules! config_field {
($t:ty) => {
config_field!($t, value => default_transform(value)?);
};
($t:ty, $arg:ident => $transform:expr) => {
impl ConfigField for $t {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
*self = $transform;
Ok(())
}
}
};
}
config_field!(String);
config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
config_field!(usize);
config_field!(f64);
config_field!(u64);
impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if value.is_empty() {
return Err(DataFusionError::Configuration(format!(
"Input string for {} key is empty",
key
)));
}
if let Ok(num) = value.parse::<u8>() {
*self = num;
} else {
let bytes = value.as_bytes();
if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
return Err(DataFusionError::Configuration(format!(
"Error parsing {} as u8. Non-ASCII string provided",
value
)));
}
*self = bytes[0];
}
Ok(())
}
}
impl ConfigField for CompressionTypeVariant {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = CompressionTypeVariant::from_str(value)?;
Ok(())
}
}
pub trait Visit {
fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
fn none(&mut self, key: &str, description: &'static str);
}
#[macro_export]
macro_rules! extensions_options {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
impl $crate::config::ExtensionOptions for $struct_name {
fn as_any(&self) -> &dyn ::std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
self
}
fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
Box::new(self.clone())
}
fn set(&mut self, key: &str, value: &str) -> $crate::Result<()> {
match key {
$(
stringify!($field_name) => {
self.$field_name = value.parse().map_err(|e| {
$crate::DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new($crate::DataFusionError::External(Box::new(e))),
)
})?;
Ok(())
}
)*
_ => Err($crate::DataFusionError::Configuration(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
}
}
fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
vec![
$(
$crate::config::ConfigEntry {
key: stringify!($field_name).to_owned(),
value: (self.$field_name != $default).then(|| self.$field_name.to_string()),
description: concat!($($d),*).trim(),
},
)*
]
}
}
}
}
#[derive(Debug, Clone)]
pub enum ConfigFileType {
CSV,
#[cfg(feature = "parquet")]
PARQUET,
JSON,
}
#[derive(Debug, Clone, Default)]
pub struct TableOptions {
pub csv: CsvOptions,
pub parquet: TableParquetOptions,
pub json: JsonOptions,
pub current_format: Option<ConfigFileType>,
pub extensions: Extensions,
}
impl ConfigField for TableOptions {
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
ConfigFileType::JSON => self.json.visit(v, "format", ""),
}
} else {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
}
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"format" => {
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match format {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
}
}
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
}
impl TableOptions {
pub fn new() -> Self {
Self::default()
}
pub fn default_from_session_config(config: &ConfigOptions) -> Self {
let initial = TableOptions::default();
initial.combine_with_session_config(config);
initial
}
pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
let mut clone = self.clone();
clone.parquet.global = config.execution.parquet.clone();
clone
}
pub fn set_config_format(&mut self, format: ConfigFileType) {
self.current_format = Some(format);
}
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let Some((prefix, _)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};
if prefix == "format" {
return ConfigField::set(self, key, value);
}
if prefix == "execution" {
return Ok(());
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
e.0.set(key, value)
}
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
let mut ret = Self::default();
for (k, v) in settings {
ret.set(k, v)?;
}
Ok(ret)
}
pub fn alter_with_string_hash_map(
&mut self,
settings: &HashMap<String, String>,
) -> Result<()> {
for (k, v) in settings {
self.set(k, v)?;
}
Ok(())
}
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "format", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
pub global: ParquetOptions,
pub column_specific_options: HashMap<String, ParquetColumnOptions>,
pub key_value_metadata: HashMap<String, Option<String>>,
}
impl TableParquetOptions {
pub fn new() -> Self {
Self::default()
}
}
impl ConfigField for TableParquetOptions {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
self.global.visit(v, key_prefix, description);
self.column_specific_options
.visit(v, key_prefix, description)
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.starts_with("metadata::") {
let k = match key.split("::").collect::<Vec<_>>()[..] {
[_meta] | [_meta, ""] => {
return _config_err!(
"Invalid metadata key provided, missing key in metadata::<key>"
)
}
[_meta, k] => k.into(),
_ => {
return _config_err!(
"Invalid metadata key provided, found too many '::' in \"{key}\""
)
}
};
self.key_value_metadata.insert(k, Some(value.into()));
Ok(())
} else if key.contains("::") {
self.column_specific_options.set(key, value)
} else {
self.global.set(key, value)
}
}
}
macro_rules! config_namespace_with_hashmap {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone, PartialEq)]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
$(let value = $transform(value);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
impl ConfigField for HashMap<String,$struct_name> {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let parts: Vec<&str> = key.splitn(2, "::").collect();
match parts.as_slice() {
[inner_key, hashmap_key] => {
let inner_value = self
.entry((*hashmap_key).to_owned())
.or_insert_with($struct_name::default);
inner_value.set(inner_key, value)
}
_ => _config_err!("Unrecognized key '{key}'."),
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
for (column_name, col_options) in self {
$(
let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
let desc = concat!($($d),*).trim();
col_options.$field_name.visit(v, key.as_str(), desc);
)*
}
}
}
}
}
config_namespace_with_hashmap! {
pub struct ParquetColumnOptions {
pub bloom_filter_enabled: Option<bool>, default = None
pub encoding: Option<String>, default = None
pub dictionary_enabled: Option<bool>, default = None
pub compression: Option<String>, transform = str::to_lowercase, default = None
pub statistics_enabled: Option<String>, default = None
pub bloom_filter_fpp: Option<f64>, default = None
pub bloom_filter_ndv: Option<u64>, default = None
pub max_statistics_size: Option<usize>, default = None
}
}
config_namespace! {
pub struct CsvOptions {
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub terminator: Option<u8>, default = None
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}
impl CsvOptions {
pub fn with_compression(
mut self,
compression_type_variant: CompressionTypeVariant,
) -> Self {
self.compression = compression_type_variant;
self
}
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
self.schema_infer_max_rec = Some(max_rec);
self
}
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = Some(has_header);
self
}
pub fn has_header(&self) -> Option<bool> {
self.has_header
}
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
pub fn with_quote(mut self, quote: u8) -> Self {
self.quote = quote;
self
}
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}
pub fn with_escape(mut self, escape: Option<u8>) -> Self {
self.escape = escape;
self
}
pub fn with_double_quote(mut self, double_quote: bool) -> Self {
self.double_quote = Some(double_quote);
self
}
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}
pub fn with_file_compression_type(
mut self,
compression: CompressionTypeVariant,
) -> Self {
self.compression = compression;
self
}
pub fn delimiter(&self) -> u8 {
self.delimiter
}
pub fn quote(&self) -> u8 {
self.quote
}
pub fn terminator(&self) -> Option<u8> {
self.terminator
}
pub fn escape(&self) -> Option<u8> {
self.escape
}
}
config_namespace! {
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
}
}
pub trait FormatOptionsExt: Display {}
#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
CSV(CsvOptions),
JSON(JsonOptions),
#[cfg(feature = "parquet")]
PARQUET(TableParquetOptions),
AVRO,
ARROW,
}
impl Display for FormatOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = match self {
FormatOptions::CSV(_) => "csv",
FormatOptions::JSON(_) => "json",
#[cfg(feature = "parquet")]
FormatOptions::PARQUET(_) => "parquet",
FormatOptions::AVRO => "avro",
FormatOptions::ARROW => "arrow",
};
write!(f, "{}", out)
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;
use crate::config::{
ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
TableOptions,
};
#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
pub properties: HashMap<String, String>,
}
impl ExtensionOptions for TestExtensionConfig {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn cloned(&self) -> Box<dyn ExtensionOptions> {
Box::new(self.clone())
}
fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
assert_eq!(key, "test");
self.properties.insert(rem.to_owned(), value.to_owned());
Ok(())
}
fn entries(&self) -> Vec<ConfigEntry> {
self.properties
.iter()
.map(|(k, v)| ConfigEntry {
key: k.into(),
value: Some(v.into()),
description: "",
})
.collect()
}
}
impl ConfigExtension for TestExtensionConfig {
const PREFIX: &'static str = "test";
}
#[test]
fn create_table_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let table_config = TableOptions::new().with_extensions(extension);
let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
assert!(kafka_config.is_some())
}
#[test]
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
.get::<TestExtensionConfig>()
.unwrap();
assert_eq!(
kafka_config.properties.get("bootstrap.servers").unwrap(),
"asd"
);
}
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
.set("format.metadata::key3", "value with spaces ")
.unwrap();
table_config
.set("format.metadata::key4", "value with special chars :: :")
.unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata.clone();
assert_eq!(parsed_metadata.get("should not exist1"), None);
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
assert_eq!(
parsed_metadata.get("key3"),
Some(&Some("value with spaces ".into()))
);
assert_eq!(
parsed_metadata.get("key4"),
Some(&Some("value with special chars :: :".into()))
);
table_config.set("format.metadata::key_dupe", "A").unwrap();
table_config.set("format.metadata::key_dupe", "B").unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
}