use arrow_ipc::CompressionType;
#[cfg(feature = "parquet_encryption")]
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::str::FromStr;
#[cfg(feature = "parquet_encryption")]
use hex;
#[macro_export]
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])* // Struct-level documentation attributes
$(#[deprecated($($struct_depr:tt)*)])? $(#[allow($($struct_de:tt)*)])?
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])* // Field-level documentation attributes
$(#[deprecated($($field_depr:tt)*)])? $(#[allow($($field_de:tt)*)])?
$field_vis:vis $field_name:ident : $field_type:ty,
$(warn = $warn:expr,)?
$(transform = $transform:expr,)?
default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])* $(#[deprecated($($struct_depr)*)])? $(#[allow($($struct_de)*)])?
#[derive(Debug, Clone, PartialEq)]
$vis struct $struct_name {
$(
$(#[doc = $d])* $(#[deprecated($($field_depr)*)])? $(#[allow($($field_de)*)])?
$field_vis $field_name: $field_type,
)*
}
impl $crate::config::ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
{
$(let value = $transform(value);)? #[allow(deprecated)]
let ret = self.$field_name.set(rem, value.as_ref());
$(if !$warn.is_empty() {
let default: $field_type = $default;
#[allow(deprecated)]
if default != self.$field_name {
log::warn!($warn);
}
})? ret
}
},
)*
_ => return $crate::error::_config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
#[allow(deprecated)]
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
#[allow(deprecated)]
Self {
$($field_name: $default),*
}
}
}
}
}
config_namespace! {
pub struct CatalogOptions {
pub create_default_catalog_and_schema: bool, default = true
pub default_catalog: String, default = "datafusion".to_string()
pub default_schema: String, default = "public".to_string()
pub information_schema: bool, default = false
pub location: Option<String>, default = None
pub format: Option<String>, default = None
pub has_header: bool, default = true
pub newlines_in_values: bool, default = false
}
}
config_namespace! {
pub struct SqlParserOptions {
pub parse_float_as_decimal: bool, default = false
pub enable_ident_normalization: bool, default = true
pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
pub dialect: String, default = "generic".to_string()
pub support_varchar_with_length: bool, default = true
pub map_string_types_to_utf8view: bool, default = true
pub collect_spans: bool, default = false
pub recursion_limit: usize, default = 50
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum SpillCompression {
Zstd,
Lz4Frame,
#[default]
Uncompressed,
}
impl FromStr for SpillCompression {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"zstd" => Ok(Self::Zstd),
"lz4_frame" => Ok(Self::Lz4Frame),
"uncompressed" | "" => Ok(Self::Uncompressed),
other => Err(DataFusionError::Configuration(format!(
"Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
))),
}
}
}
impl ConfigField for SpillCompression {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = SpillCompression::from_str(value)?;
Ok(())
}
}
impl Display for SpillCompression {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let str = match self {
Self::Zstd => "zstd",
Self::Lz4Frame => "lz4_frame",
Self::Uncompressed => "uncompressed",
};
write!(f, "{str}")
}
}
impl From<SpillCompression> for Option<CompressionType> {
fn from(c: SpillCompression) -> Self {
match c {
SpillCompression::Zstd => Some(CompressionType::ZSTD),
SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
SpillCompression::Uncompressed => None,
}
}
}
config_namespace! {
pub struct ExecutionOptions {
pub batch_size: usize, default = 8192
pub coalesce_batches: bool, default = true
pub collect_statistics: bool, default = true
pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
pub time_zone: String, default = "+00:00".into()
pub parquet: ParquetOptions, default = Default::default()
pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
pub skip_physical_aggregate_schema_check: bool, default = false
pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
pub meta_fetch_concurrency: usize, default = 32
pub minimum_parallel_output_files: usize, default = 4
pub soft_max_rows_per_output_file: usize, default = 50000000
pub max_buffered_batches_per_output_file: usize, default = 2
pub listing_table_ignore_subdirectory: bool, default = true
pub enable_recursive_ctes: bool, default = true
pub split_file_groups_by_statistics: bool, default = false
pub keep_partition_by_columns: bool, default = false
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
pub enforce_batch_size_in_joins: bool, default = false
pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
}
}
config_namespace! {
pub struct ParquetOptions {
pub enable_page_index: bool, default = true
pub pruning: bool, default = true
pub skip_metadata: bool, default = true
pub metadata_size_hint: Option<usize>, default = None
pub pushdown_filters: bool, default = false
pub reorder_filters: bool, default = false
pub schema_force_view_types: bool, default = true
pub binary_as_string: bool, default = false
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
pub bloom_filter_on_read: bool, default = true
pub data_pagesize_limit: usize, default = 1024 * 1024
pub write_batch_size: usize, default = 1024
pub writer_version: String, default = "1.0".to_string()
pub skip_arrow_metadata: bool, default = false
pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
pub dictionary_enabled: Option<bool>, default = Some(true)
pub dictionary_page_size_limit: usize, default = 1024 * 1024
pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
#[deprecated(since = "45.0.0", note = "Setting does not do anything")]
pub max_statistics_size: Option<usize>, default = Some(4096)
pub max_row_group_size: usize, default = 1024 * 1024
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
pub column_index_truncate_length: Option<usize>, default = Some(64)
pub statistics_truncate_length: Option<usize>, default = None
pub data_page_row_count_limit: usize, default = 20_000
pub encoding: Option<String>, transform = str::to_lowercase, default = None
pub bloom_filter_on_write: bool, default = false
pub bloom_filter_fpp: Option<f64>, default = None
pub bloom_filter_ndv: Option<u64>, default = None
pub allow_single_file_parallelism: bool, default = true
pub maximum_parallel_row_group_writers: usize, default = 1
pub maximum_buffered_record_batches_per_stream: usize, default = 2
}
}
config_namespace! {
pub struct ParquetEncryptionOptions {
pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
}
}
config_namespace! {
pub struct OptimizerOptions {
pub enable_distinct_aggregation_soft_limit: bool, default = true
pub enable_round_robin_repartition: bool, default = true
pub enable_topk_aggregation: bool, default = true
pub enable_dynamic_filter_pushdown: bool, default = true
pub filter_null_join_keys: bool, default = false
pub repartition_aggregations: bool, default = true
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
pub repartition_joins: bool, default = true
pub allow_symmetric_joins_without_pruning: bool, default = true
pub repartition_file_scans: bool, default = true
pub repartition_windows: bool, default = true
pub repartition_sorts: bool, default = true
pub prefer_existing_sort: bool, default = false
pub skip_failed_rules: bool, default = false
pub max_passes: usize, default = 3
pub top_down_join_key_reordering: bool, default = true
pub prefer_hash_join: bool, default = true
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
pub default_filter_selectivity: u8, default = 20
pub prefer_existing_union: bool, default = false
pub expand_views_at_output: bool, default = false
}
}
config_namespace! {
pub struct ExplainOptions {
pub logical_plan_only: bool, default = false
pub physical_plan_only: bool, default = false
pub show_statistics: bool, default = false
pub show_sizes: bool, default = true
pub show_schema: bool, default = false
pub format: String, default = "indent".to_string()
}
}
impl ExecutionOptions {
fn normalized_parallelism(value: &str) -> String {
if value.parse::<usize>() == Ok(0) {
get_available_parallelism().to_string()
} else {
value.to_owned()
}
}
}
config_namespace! {
pub struct FormatOptions {
pub safe: bool, default = true
pub null: String, default = "".into()
pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
pub types_info: bool, default = false
}
}
impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
type Error = DataFusionError;
fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
let duration_format = match self.duration_format.as_str() {
"pretty" => arrow::util::display::DurationFormat::Pretty,
"iso8601" => arrow::util::display::DurationFormat::ISO8601,
_ => {
return _config_err!(
"Invalid duration format: {}. Valid values are pretty or iso8601",
self.duration_format
)
}
};
Ok(arrow::util::display::FormatOptions::new()
.with_display_error(self.safe)
.with_null(&self.null)
.with_date_format(self.date_format.as_deref())
.with_datetime_format(self.datetime_format.as_deref())
.with_timestamp_format(self.timestamp_format.as_deref())
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
.with_time_format(self.time_format.as_deref())
.with_duration_format(duration_format)
.with_types_info(self.types_info))
}
}
#[derive(Debug)]
pub struct ConfigEntry {
pub key: String,
pub value: Option<String>,
pub description: &'static str,
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ConfigOptions {
pub catalog: CatalogOptions,
pub execution: ExecutionOptions,
pub optimizer: OptimizerOptions,
pub sql_parser: SqlParserOptions,
pub explain: ExplainOptions,
pub extensions: Extensions,
pub format: FormatOptions,
}
impl ConfigField for ConfigOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"catalog" => self.catalog.set(rem, value),
"execution" => self.execution.set(rem, value),
"optimizer" => self.optimizer.set(rem, value),
"explain" => self.explain.set(rem, value),
"sql_parser" => self.sql_parser.set(rem, value),
"format" => self.format.set(rem, value),
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
}
}
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
self.catalog.visit(v, "datafusion.catalog", "");
self.execution.visit(v, "datafusion.execution", "");
self.optimizer.visit(v, "datafusion.optimizer", "");
self.explain.visit(v, "datafusion.explain", "");
self.sql_parser.visit(v, "datafusion.sql_parser", "");
self.format.visit(v, "datafusion.format", "");
}
}
impl ConfigOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let Some((prefix, key)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};
if prefix == "datafusion" {
return ConfigField::set(self, key, value);
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
e.0.set(key, value)
}
pub fn from_env() -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
let env = key.to_uppercase().replace('.', "_");
if let Some(var) = std::env::var_os(env) {
let value = var.to_string_lossy();
log::info!("Set {key} to {value} from the environment variable");
ret.set(&key, value.as_ref())?;
}
}
Ok(ret)
}
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
if let Some(var) = settings.get(&key) {
ret.set(&key, var)?;
}
}
Ok(ret)
}
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "datafusion", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let mut s = Self::default();
s.execution.target_partitions = 0;
s.execution.planning_concurrency = 0;
let mut docs = "| key | default | description |\n".to_string();
docs += "|-----|---------|-------------|\n";
let mut entries = s.entries();
entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
for entry in s.entries() {
let _ = writeln!(
&mut docs,
"| {} | {} | {} |",
entry.key,
entry.value.as_deref().unwrap_or("NULL"),
entry.description
);
}
docs
}
}
pub trait ConfigExtension: ExtensionOptions {
const PREFIX: &'static str;
}
pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
fn cloned(&self) -> Box<dyn ExtensionOptions>;
fn set(&mut self, key: &str, value: &str) -> Result<()>;
fn entries(&self) -> Vec<ConfigEntry>;
}
#[derive(Debug, Default, Clone)]
pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
impl Extensions {
pub fn new() -> Self {
Self(BTreeMap::new())
}
pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
assert_ne!(T::PREFIX, "datafusion");
let e = ExtensionBox(Box::new(extension));
self.0.insert(T::PREFIX, e);
}
pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
}
pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
let e = self.0.get_mut(T::PREFIX)?;
e.0.as_any_mut().downcast_mut()
}
}
#[derive(Debug)]
struct ExtensionBox(Box<dyn ExtensionOptions>);
impl Clone for ExtensionBox {
fn clone(&self) -> Self {
Self(self.0.cloned())
}
}
pub trait ConfigField {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
fn set(&mut self, key: &str, value: &str) -> Result<()>;
}
impl<F: ConfigField + Default> ConfigField for Option<F> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
self.get_or_insert_with(Default::default).set(key, value)
}
}
pub fn default_config_transform<T>(input: &str) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: Sync + Send + Error + 'static,
{
input.parse().map_err(|e| {
DataFusionError::Context(
format!(
"Error parsing '{}' as {}",
input,
std::any::type_name::<T>()
),
Box::new(DataFusionError::External(Box::new(e))),
)
})
}
#[macro_export]
macro_rules! config_field {
($t:ty) => {
config_field!($t, value => $crate::config::default_config_transform(value)?);
};
($t:ty, $arg:ident => $transform:expr) => {
impl $crate::config::ConfigField for $t {
fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
*self = $transform;
Ok(())
}
}
};
}
config_field!(String);
config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
config_field!(usize);
config_field!(f64);
config_field!(u64);
impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if value.is_empty() {
return Err(DataFusionError::Configuration(format!(
"Input string for {key} key is empty"
)));
}
if let Ok(num) = value.parse::<u8>() {
*self = num;
} else {
let bytes = value.as_bytes();
if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
return Err(DataFusionError::Configuration(format!(
"Error parsing {value} as u8. Non-ASCII string provided"
)));
}
*self = bytes[0];
}
Ok(())
}
}
impl ConfigField for CompressionTypeVariant {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = CompressionTypeVariant::from_str(value)?;
Ok(())
}
}
pub trait Visit {
fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
fn none(&mut self, key: &str, description: &'static str);
}
#[macro_export]
macro_rules! extensions_options {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
impl $crate::config::ExtensionOptions for $struct_name {
fn as_any(&self) -> &dyn ::std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
self
}
fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
Box::new(self.clone())
}
fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
$crate::config::ConfigField::set(self, key, value)
}
fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
struct Visitor(Vec<$crate::config::ConfigEntry>);
impl $crate::config::Visit for Visitor {
fn some<V: std::fmt::Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push($crate::config::ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push($crate::config::ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
$crate::config::ConfigField::visit(self, &mut v, "", "");
v.0
}
}
impl $crate::config::ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
{
#[allow(deprecated)]
self.$field_name.set(rem, value.as_ref())
}
},
)*
_ => return $crate::error::_config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
$(
let key = stringify!($field_name).to_string();
let desc = concat!($($d),*).trim();
#[allow(deprecated)]
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
}
}
#[derive(Debug, Clone)]
pub enum ConfigFileType {
CSV,
#[cfg(feature = "parquet")]
PARQUET,
JSON,
}
#[derive(Debug, Clone, Default)]
pub struct TableOptions {
pub csv: CsvOptions,
pub parquet: TableParquetOptions,
pub json: JsonOptions,
pub current_format: Option<ConfigFileType>,
pub extensions: Extensions,
}
impl ConfigField for TableOptions {
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
ConfigFileType::JSON => self.json.visit(v, "format", ""),
}
} else {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
}
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"format" => {
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match format {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
}
}
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
}
impl TableOptions {
pub fn new() -> Self {
Self::default()
}
pub fn default_from_session_config(config: &ConfigOptions) -> Self {
let initial = TableOptions::default();
initial.combine_with_session_config(config)
}
#[must_use = "this method returns a new instance"]
pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
let mut clone = self.clone();
clone.parquet.global = config.execution.parquet.clone();
clone
}
pub fn set_config_format(&mut self, format: ConfigFileType) {
self.current_format = Some(format);
}
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let Some((prefix, _)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};
if prefix == "format" {
return ConfigField::set(self, key, value);
}
if prefix == "execution" {
return Ok(());
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
e.0.set(key, value)
}
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
let mut ret = Self::default();
for (k, v) in settings {
ret.set(k, v)?;
}
Ok(ret)
}
pub fn alter_with_string_hash_map(
&mut self,
settings: &HashMap<String, String>,
) -> Result<()> {
for (k, v) in settings {
self.set(k, v)?;
}
Ok(())
}
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "format", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
pub global: ParquetOptions,
pub column_specific_options: HashMap<String, ParquetColumnOptions>,
pub key_value_metadata: HashMap<String, Option<String>>,
pub crypto: ParquetEncryptionOptions,
}
impl TableParquetOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
Self {
global: ParquetOptions {
skip_arrow_metadata: skip,
..self.global
},
..self
}
}
}
impl ConfigField for TableParquetOptions {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
self.global.visit(v, key_prefix, description);
self.column_specific_options
.visit(v, key_prefix, description);
self.crypto
.visit(v, &format!("{key_prefix}.crypto"), description);
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.starts_with("metadata::") {
let k = match key.split("::").collect::<Vec<_>>()[..] {
[_meta] | [_meta, ""] => {
return _config_err!(
"Invalid metadata key provided, missing key in metadata::<key>"
)
}
[_meta, k] => k.into(),
_ => {
return _config_err!(
"Invalid metadata key provided, found too many '::' in \"{key}\""
)
}
};
self.key_value_metadata.insert(k, Some(value.into()));
Ok(())
} else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
self.crypto.set(crypto_feature, value)
} else if key.contains("::") {
self.column_specific_options.set(key, value)
} else {
self.global.set(key, value)
}
}
}
macro_rules! config_namespace_with_hashmap {
(
$(#[doc = $struct_d:tt])*
$(#[deprecated($($struct_depr:tt)*)])? $vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$(#[deprecated($($field_depr:tt)*)])? $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
$(#[deprecated($($struct_depr)*)])? #[derive(Debug, Clone, PartialEq)]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$(#[deprecated($($field_depr)*)])? $field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
#[allow(deprecated)] $(let value = $transform(value);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
#[allow(deprecated)]
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
#[allow(deprecated)]
Self {
$($field_name: $default),*
}
}
}
impl ConfigField for HashMap<String,$struct_name> {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let parts: Vec<&str> = key.splitn(2, "::").collect();
match parts.as_slice() {
[inner_key, hashmap_key] => {
let inner_value = self
.entry((*hashmap_key).to_owned())
.or_insert_with($struct_name::default);
inner_value.set(inner_key, value)
}
_ => _config_err!("Unrecognized key '{key}'."),
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
for (column_name, col_options) in self {
$(
let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
let desc = concat!($($d),*).trim();
#[allow(deprecated)]
col_options.$field_name.visit(v, key.as_str(), desc);
)*
}
}
}
}
}
config_namespace_with_hashmap! {
pub struct ParquetColumnOptions {
pub bloom_filter_enabled: Option<bool>, default = None
pub encoding: Option<String>, default = None
pub dictionary_enabled: Option<bool>, default = None
pub compression: Option<String>, transform = str::to_lowercase, default = None
pub statistics_enabled: Option<String>, default = None
pub bloom_filter_fpp: Option<f64>, default = None
pub bloom_filter_ndv: Option<u64>, default = None
#[deprecated(since = "45.0.0", note = "Setting does not do anything")]
pub max_statistics_size: Option<usize>, default = None
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ConfigFileEncryptionProperties {
pub encrypt_footer: bool,
pub footer_key_as_hex: String,
pub footer_key_metadata_as_hex: String,
pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
pub aad_prefix_as_hex: String,
pub store_aad_prefix: bool,
}
impl Default for ConfigFileEncryptionProperties {
fn default() -> Self {
ConfigFileEncryptionProperties {
encrypt_footer: true,
footer_key_as_hex: String::new(),
footer_key_metadata_as_hex: String::new(),
column_encryption_properties: Default::default(),
aad_prefix_as_hex: String::new(),
store_aad_prefix: false,
}
}
}
config_namespace_with_hashmap! {
pub struct ColumnEncryptionProperties {
pub column_key_as_hex: String, default = "".to_string()
pub column_metadata_as_hex: Option<String>, default = None
}
}
impl ConfigField for ConfigFileEncryptionProperties {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
let key = format!("{key_prefix}.encrypt_footer");
let desc = "Encrypt the footer";
self.encrypt_footer.visit(v, key.as_str(), desc);
let key = format!("{key_prefix}.footer_key_as_hex");
let desc = "Key to use for the parquet footer";
self.footer_key_as_hex.visit(v, key.as_str(), desc);
let key = format!("{key_prefix}.footer_key_metadata_as_hex");
let desc = "Metadata to use for the parquet footer";
self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
let key = format!("{key_prefix}.aad_prefix_as_hex");
let desc = "AAD prefix to use";
self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
let key = format!("{key_prefix}.store_aad_prefix");
let desc = "If true, store the AAD prefix";
self.store_aad_prefix.visit(v, key.as_str(), desc);
self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.contains("::") {
return self.column_encryption_properties.set(key, value);
};
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
"footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
"footer_key_metadata_as_hex" => {
self.footer_key_metadata_as_hex.set(rem, value.as_ref())
}
"aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
"store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
_ => _config_err!(
"Config value \"{}\" not found on ConfigFileEncryptionProperties",
key
),
}
}
}
#[cfg(feature = "parquet_encryption")]
impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
fn from(val: ConfigFileEncryptionProperties) -> Self {
let mut fep = FileEncryptionProperties::builder(
hex::decode(val.footer_key_as_hex).unwrap(),
)
.with_plaintext_footer(!val.encrypt_footer)
.with_aad_prefix_storage(val.store_aad_prefix);
if !val.footer_key_metadata_as_hex.is_empty() {
fep = fep.with_footer_key_metadata(
hex::decode(&val.footer_key_metadata_as_hex)
.expect("Invalid footer key metadata"),
);
}
for (column_name, encryption_props) in val.column_encryption_properties.iter() {
let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
.expect("Invalid column encryption key");
let key_metadata = encryption_props
.column_metadata_as_hex
.as_ref()
.map(|x| hex::decode(x).expect("Invalid column metadata"));
match key_metadata {
Some(key_metadata) => {
fep = fep.with_column_key_and_metadata(
column_name,
encryption_key,
key_metadata,
);
}
None => {
fep = fep.with_column_key(column_name, encryption_key);
}
}
}
if !val.aad_prefix_as_hex.is_empty() {
let aad_prefix: Vec<u8> =
hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
fep = fep.with_aad_prefix(aad_prefix);
}
fep.build().unwrap()
}
}
#[cfg(feature = "parquet_encryption")]
impl From<&FileEncryptionProperties> for ConfigFileEncryptionProperties {
fn from(f: &FileEncryptionProperties) -> Self {
let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
let mut column_encryption_properties: HashMap<
String,
ColumnEncryptionProperties,
> = HashMap::new();
for (i, column_name) in column_names_vec.iter().enumerate() {
let column_key_as_hex = hex::encode(&column_keys_vec[i]);
let column_metadata_as_hex: Option<String> =
column_metas_vec.get(i).map(hex::encode);
column_encryption_properties.insert(
column_name.clone(),
ColumnEncryptionProperties {
column_key_as_hex,
column_metadata_as_hex,
},
);
}
let mut aad_prefix: Vec<u8> = Vec::new();
if let Some(prefix) = f.aad_prefix() {
aad_prefix = prefix.clone();
}
ConfigFileEncryptionProperties {
encrypt_footer: f.encrypt_footer(),
footer_key_as_hex: hex::encode(f.footer_key()),
footer_key_metadata_as_hex: f
.footer_key_metadata()
.map(hex::encode)
.unwrap_or_default(),
column_encryption_properties,
aad_prefix_as_hex: hex::encode(aad_prefix),
store_aad_prefix: f.store_aad_prefix(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ConfigFileDecryptionProperties {
pub footer_key_as_hex: String,
pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
pub aad_prefix_as_hex: String,
pub footer_signature_verification: bool,
}
config_namespace_with_hashmap! {
pub struct ColumnDecryptionProperties {
pub column_key_as_hex: String, default = "".to_string()
}
}
impl Default for ConfigFileDecryptionProperties {
fn default() -> Self {
ConfigFileDecryptionProperties {
footer_key_as_hex: String::new(),
column_decryption_properties: Default::default(),
aad_prefix_as_hex: String::new(),
footer_signature_verification: true,
}
}
}
impl ConfigField for ConfigFileDecryptionProperties {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
let key = format!("{key_prefix}.footer_key_as_hex");
let desc = "Key to use for the parquet footer";
self.footer_key_as_hex.visit(v, key.as_str(), desc);
let key = format!("{key_prefix}.aad_prefix_as_hex");
let desc = "AAD prefix to use";
self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
let key = format!("{key_prefix}.footer_signature_verification");
let desc = "If true, verify the footer signature";
self.footer_signature_verification
.visit(v, key.as_str(), desc);
self.column_decryption_properties.visit(v, key_prefix, desc);
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.contains("::") {
return self.column_decryption_properties.set(key, value);
};
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
"aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
"footer_signature_verification" => {
self.footer_signature_verification.set(rem, value.as_ref())
}
_ => _config_err!(
"Config value \"{}\" not found on ConfigFileEncryptionProperties",
key
),
}
}
}
#[cfg(feature = "parquet_encryption")]
impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
fn from(val: ConfigFileDecryptionProperties) -> Self {
let mut column_names: Vec<&str> = Vec::new();
let mut column_keys: Vec<Vec<u8>> = Vec::new();
for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
column_names.push(col_name.as_str());
column_keys.push(
hex::decode(&decryption_properties.column_key_as_hex)
.expect("Invalid column decryption key"),
);
}
let mut fep = FileDecryptionProperties::builder(
hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
)
.with_column_keys(column_names, column_keys)
.unwrap();
if !val.footer_signature_verification {
fep = fep.disable_footer_signature_verification();
}
if !val.aad_prefix_as_hex.is_empty() {
let aad_prefix =
hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
fep = fep.with_aad_prefix(aad_prefix);
}
fep.build().unwrap()
}
}
#[cfg(feature = "parquet_encryption")]
impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties {
fn from(f: &FileDecryptionProperties) -> Self {
let (column_names_vec, column_keys_vec) = f.column_keys();
let mut column_decryption_properties: HashMap<
String,
ColumnDecryptionProperties,
> = HashMap::new();
for (i, column_name) in column_names_vec.iter().enumerate() {
let props = ColumnDecryptionProperties {
column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
};
column_decryption_properties.insert(column_name.clone(), props);
}
let mut aad_prefix: Vec<u8> = Vec::new();
if let Some(prefix) = f.aad_prefix() {
aad_prefix = prefix.clone();
}
ConfigFileDecryptionProperties {
footer_key_as_hex: hex::encode(
f.footer_key(None).unwrap_or_default().as_ref(),
),
column_decryption_properties,
aad_prefix_as_hex: hex::encode(aad_prefix),
footer_signature_verification: f.check_plaintext_footer_integrity(),
}
}
}
config_namespace! {
pub struct CsvOptions {
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub terminator: Option<u8>, default = None
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
pub null_value: Option<String>, default = None
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}
impl CsvOptions {
pub fn with_compression(
mut self,
compression_type_variant: CompressionTypeVariant,
) -> Self {
self.compression = compression_type_variant;
self
}
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
self.schema_infer_max_rec = Some(max_rec);
self
}
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = Some(has_header);
self
}
pub fn has_header(&self) -> Option<bool> {
self.has_header
}
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
pub fn with_quote(mut self, quote: u8) -> Self {
self.quote = quote;
self
}
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}
pub fn with_escape(mut self, escape: Option<u8>) -> Self {
self.escape = escape;
self
}
pub fn with_double_quote(mut self, double_quote: bool) -> Self {
self.double_quote = Some(double_quote);
self
}
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}
pub fn with_file_compression_type(
mut self,
compression: CompressionTypeVariant,
) -> Self {
self.compression = compression;
self
}
pub fn delimiter(&self) -> u8 {
self.delimiter
}
pub fn quote(&self) -> u8 {
self.quote
}
pub fn terminator(&self) -> Option<u8> {
self.terminator
}
pub fn escape(&self) -> Option<u8> {
self.escape
}
}
config_namespace! {
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
}
}
pub trait OutputFormatExt: Display {}
#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum OutputFormat {
CSV(CsvOptions),
JSON(JsonOptions),
#[cfg(feature = "parquet")]
PARQUET(TableParquetOptions),
AVRO,
ARROW,
}
impl Display for OutputFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = match self {
OutputFormat::CSV(_) => "csv",
OutputFormat::JSON(_) => "json",
#[cfg(feature = "parquet")]
OutputFormat::PARQUET(_) => "parquet",
OutputFormat::AVRO => "avro",
OutputFormat::ARROW => "arrow",
};
write!(f, "{out}")
}
}
#[cfg(test)]
mod tests {
use crate::config::{
ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
Extensions, TableOptions,
};
use std::any::Any;
use std::collections::HashMap;
#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
pub properties: HashMap<String, String>,
}
impl ExtensionOptions for TestExtensionConfig {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn cloned(&self) -> Box<dyn ExtensionOptions> {
Box::new(self.clone())
}
fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
assert_eq!(key, "test");
self.properties.insert(rem.to_owned(), value.to_owned());
Ok(())
}
fn entries(&self) -> Vec<ConfigEntry> {
self.properties
.iter()
.map(|(k, v)| ConfigEntry {
key: k.into(),
value: Some(v.into()),
description: "",
})
.collect()
}
}
impl ConfigExtension for TestExtensionConfig {
const PREFIX: &'static str = "test";
}
#[test]
fn create_table_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let table_config = TableOptions::new().with_extensions(extension);
let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
assert!(kafka_config.is_some())
}
#[test]
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
.get::<TestExtensionConfig>()
.unwrap();
assert_eq!(
kafka_config.properties.get("bootstrap.servers").unwrap(),
"asd"
);
}
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}
#[test]
fn warning_only_not_default() {
use std::sync::atomic::AtomicUsize;
static COUNT: AtomicUsize = AtomicUsize::new(0);
use log::{Level, LevelFilter, Metadata, Record};
struct SimpleLogger;
impl log::Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Info
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
fn flush(&self) {}
}
log::set_logger(&SimpleLogger).unwrap();
log::set_max_level(LevelFilter::Info);
let mut sql_parser_options = crate::config::SqlParserOptions::default();
sql_parser_options
.set("enable_options_value_normalization", "false")
.unwrap();
assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
sql_parser_options
.set("enable_options_value_normalization", "true")
.unwrap();
assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}
#[cfg(feature = "parquet_encryption")]
#[test]
fn parquet_table_encryption() {
use crate::config::{
ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
let footer_key = b"0123456789012345".to_vec(); let column_names = vec!["double_field", "float_field"];
let column_keys =
vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
let file_encryption_properties =
FileEncryptionProperties::builder(footer_key.clone())
.with_column_keys(column_names.clone(), column_keys.clone())
.unwrap()
.build()
.unwrap();
let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
.with_column_keys(column_names.clone(), column_keys.clone())
.unwrap()
.build()
.unwrap();
let config_encrypt: ConfigFileEncryptionProperties =
(&file_encryption_properties).into();
let encryption_properties_built: FileEncryptionProperties =
config_encrypt.clone().into();
assert_eq!(file_encryption_properties, encryption_properties_built);
let config_decrypt: ConfigFileDecryptionProperties =
(&decryption_properties).into();
let decryption_properties_built: FileDecryptionProperties =
config_decrypt.clone().into();
assert_eq!(decryption_properties, decryption_properties_built);
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.parquet
.set(
"crypto.file_encryption.encrypt_footer",
config_encrypt.encrypt_footer.to_string().as_str(),
)
.unwrap();
table_config
.parquet
.set(
"crypto.file_encryption.footer_key_as_hex",
config_encrypt.footer_key_as_hex.as_str(),
)
.unwrap();
for (i, col_name) in column_names.iter().enumerate() {
let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
let value = hex::encode(column_keys[i].clone());
table_config
.parquet
.set(key.as_str(), value.as_str())
.unwrap();
}
assert_eq!(
table_config.parquet.crypto.file_encryption,
Some(config_encrypt)
);
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.parquet
.set(
"crypto.file_decryption.footer_key_as_hex",
config_decrypt.footer_key_as_hex.as_str(),
)
.unwrap();
for (i, col_name) in column_names.iter().enumerate() {
let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
let value = hex::encode(column_keys[i].clone());
table_config
.parquet
.set(key.as_str(), value.as_str())
.unwrap();
}
assert_eq!(
table_config.parquet.crypto.file_decryption,
Some(config_decrypt.clone())
);
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
assert_eq!(
table_config.parquet.crypto.file_decryption,
Some(config_decrypt.clone())
);
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
.set("format.metadata::key3", "value with spaces ")
.unwrap();
table_config
.set("format.metadata::key4", "value with special chars :: :")
.unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata.clone();
assert_eq!(parsed_metadata.get("should not exist1"), None);
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
assert_eq!(
parsed_metadata.get("key3"),
Some(&Some("value with spaces ".into()))
);
assert_eq!(
parsed_metadata.get("key4"),
Some(&Some("value with special chars :: :".into()))
);
table_config.set("format.metadata::key_dupe", "A").unwrap();
table_config.set("format.metadata::key_dupe", "B").unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
}