use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
use crate::catalog::{
CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
MemoryCatalogProviderList,
};
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::arrow::ArrowFormatFactory;
use crate::datasource::file_format::avro::AvroFormatFactory;
use crate::datasource::file_format::csv::CsvFormatFactory;
use crate::datasource::file_format::json::JsonFormatFactory;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormatFactory;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
#[cfg(feature = "array_expressions")]
use crate::functions_array;
use crate::physical_optimizer::optimizer::PhysicalOptimizer;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use crate::{functions, functions_aggregate};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{
config_err, not_impl_err, plan_datafusion_err, DFSchema, DataFusionError,
ResolvedTableReference, TableReference,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::var_provider::{is_system_variables, VarType};
use datafusion_expr::{
AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, TableSource,
WindowUDF,
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_sql::parser::{DFParser, Statement};
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};
use sqlparser::ast::Expr as SQLExpr;
use sqlparser::dialect::dialect_from_str;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;
#[derive(Clone)]
pub struct SessionState {
session_id: String,
analyzer: Analyzer,
expr_planners: Vec<Arc<dyn ExprPlanner>>,
optimizer: Optimizer,
physical_optimizers: PhysicalOptimizer,
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
catalog_list: Arc<dyn CatalogProviderList>,
table_functions: HashMap<String, Arc<TableFunction>>,
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
window_functions: HashMap<String, Arc<WindowUDF>>,
serializer_registry: Arc<dyn SerializerRegistry>,
file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
config: SessionConfig,
table_options: TableOptions,
execution_props: ExecutionProps,
table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
runtime_env: Arc<RuntimeEnv>,
function_factory: Option<Arc<dyn FunctionFactory>>,
}
impl Debug for SessionState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionState")
.field("session_id", &self.session_id)
.field("analyzer", &"...")
.field("optimizer", &"...")
.field("physical_optimizers", &"...")
.field("query_planner", &"...")
.field("catalog_list", &"...")
.field("table_functions", &"...")
.field("scalar_functions", &self.scalar_functions)
.field("aggregate_functions", &self.aggregate_functions)
.field("window_functions", &self.window_functions)
.field("serializer_registry", &"...")
.field("config", &self.config)
.field("table_options", &self.table_options)
.field("execution_props", &self.execution_props)
.field("table_factories", &"...")
.field("runtime_env", &self.runtime_env)
.field("function_factory", &"...")
.finish_non_exhaustive()
}
}
impl SessionState {
pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
let catalog_list =
Arc::new(MemoryCatalogProviderList::new()) as Arc<dyn CatalogProviderList>;
Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list)
}
#[deprecated(since = "32.0.0", note = "Use SessionState::new_with_config_rt")]
pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
Self::new_with_config_rt(config, runtime)
}
pub fn new_with_config_rt_and_catalog_list(
config: SessionConfig,
runtime: Arc<RuntimeEnv>,
catalog_list: Arc<dyn CatalogProviderList>,
) -> Self {
let session_id = Uuid::new_v4().to_string();
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
#[cfg(feature = "parquet")]
table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));
if config.create_default_catalog_and_schema() {
let default_catalog = MemoryCatalogProvider::new();
default_catalog
.register_schema(
&config.options().catalog.default_schema,
Arc::new(MemorySchemaProvider::new()),
)
.expect("memory catalog provider can register schema");
Self::register_default_schema(
&config,
&table_factories,
&runtime,
&default_catalog,
);
catalog_list.register_catalog(
config.options().catalog.default_catalog.clone(),
Arc::new(default_catalog),
);
}
let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::ArrayFunctionPlanner),
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::FieldAccessPlanner),
#[cfg(any(
feature = "datetime_expressions",
feature = "unicode_expressions"
))]
Arc::new(functions::planner::UserDefinedFunctionPlanner),
];
let mut new_self = SessionState {
session_id,
analyzer: Analyzer::new(),
expr_planners,
optimizer: Optimizer::new(),
physical_optimizers: PhysicalOptimizer::new(),
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
table_functions: HashMap::new(),
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
window_functions: HashMap::new(),
serializer_registry: Arc::new(EmptySerializerRegistry),
file_formats: HashMap::new(),
table_options: TableOptions::default_from_session_config(config.options()),
config,
execution_props: ExecutionProps::new(),
runtime_env: runtime,
table_factories,
function_factory: None,
};
#[cfg(feature = "parquet")]
if let Err(e) =
new_self.register_file_format(Arc::new(ParquetFormatFactory::new()), false)
{
log::info!("Unable to register default ParquetFormat: {e}")
};
if let Err(e) =
new_self.register_file_format(Arc::new(JsonFormatFactory::new()), false)
{
log::info!("Unable to register default JsonFormat: {e}")
};
if let Err(e) =
new_self.register_file_format(Arc::new(CsvFormatFactory::new()), false)
{
log::info!("Unable to register default CsvFormat: {e}")
};
if let Err(e) =
new_self.register_file_format(Arc::new(ArrowFormatFactory::new()), false)
{
log::info!("Unable to register default ArrowFormat: {e}")
};
if let Err(e) =
new_self.register_file_format(Arc::new(AvroFormatFactory::new()), false)
{
log::info!("Unable to register default AvroFormat: {e}")
};
functions::register_all(&mut new_self)
.expect("can not register built in functions");
#[cfg(feature = "array_expressions")]
functions_array::register_all(&mut new_self)
.expect("can not register array expressions");
functions_aggregate::register_all(&mut new_self)
.expect("can not register aggregate functions");
new_self
}
#[deprecated(
since = "32.0.0",
note = "Use SessionState::new_with_config_rt_and_catalog_list"
)]
pub fn with_config_rt_and_catalog_list(
config: SessionConfig,
runtime: Arc<RuntimeEnv>,
catalog_list: Arc<dyn CatalogProviderList>,
) -> Self {
Self::new_with_config_rt_and_catalog_list(config, runtime, catalog_list)
}
fn register_default_schema(
config: &SessionConfig,
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
runtime: &Arc<RuntimeEnv>,
default_catalog: &MemoryCatalogProvider,
) {
let url = config.options().catalog.location.as_ref();
let format = config.options().catalog.format.as_ref();
let (url, format) = match (url, format) {
(Some(url), Some(format)) => (url, format),
_ => return,
};
let url = url.to_string();
let format = format.to_string();
let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
let authority = match url.host_str() {
Some(host) => format!("{}://{}", url.scheme(), host),
None => format!("{}://", url.scheme()),
};
let path = &url.as_str()[authority.len()..];
let path = object_store::path::Path::parse(path).expect("Can't parse path");
let store = ObjectStoreUrl::parse(authority.as_str())
.expect("Invalid default catalog url");
let store = match runtime.object_store(store) {
Ok(store) => store,
_ => return,
};
let factory = match table_factories.get(format.as_str()) {
Some(factory) => factory,
_ => return,
};
let schema =
ListingSchemaProvider::new(authority, path, factory.clone(), store, format);
let _ = default_catalog
.register_schema("default", Arc::new(schema))
.expect("Failed to register default schema");
}
pub(crate) fn resolve_table_ref(
&self,
table_ref: impl Into<TableReference>,
) -> ResolvedTableReference {
let catalog = &self.config_options().catalog;
table_ref
.into()
.resolve(&catalog.default_catalog, &catalog.default_schema)
}
pub(crate) fn schema_for_ref(
&self,
table_ref: impl Into<TableReference>,
) -> datafusion_common::Result<Arc<dyn SchemaProvider>> {
let resolved_ref = self.resolve_table_ref(table_ref);
if self.config.information_schema() && *resolved_ref.schema == *INFORMATION_SCHEMA
{
return Ok(Arc::new(InformationSchemaProvider::new(
self.catalog_list.clone(),
)));
}
self.catalog_list
.catalog(&resolved_ref.catalog)
.ok_or_else(|| {
plan_datafusion_err!(
"failed to resolve catalog: {}",
resolved_ref.catalog
)
})?
.schema(&resolved_ref.schema)
.ok_or_else(|| {
plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema)
})
}
pub fn with_session_id(mut self, session_id: String) -> Self {
self.session_id = session_id;
self
}
pub fn with_query_planner(
mut self,
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
) -> Self {
self.query_planner = query_planner;
self
}
pub fn with_analyzer_rules(
mut self,
rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
) -> Self {
self.analyzer = Analyzer::with_rules(rules);
self
}
pub fn with_optimizer_rules(
mut self,
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
) -> Self {
self.optimizer = Optimizer::with_rules(rules);
self
}
pub fn with_physical_optimizer_rules(
mut self,
physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
) -> Self {
self.physical_optimizers = PhysicalOptimizer::with_rules(physical_optimizers);
self
}
pub fn add_analyzer_rule(
&mut self,
analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
) -> &Self {
self.analyzer.rules.push(analyzer_rule);
self
}
pub fn add_optimizer_rule(
mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) -> Self {
self.optimizer.rules.push(optimizer_rule);
self
}
pub(crate) fn append_optimizer_rule(
&mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) {
self.optimizer.rules.push(optimizer_rule);
}
pub fn add_physical_optimizer_rule(
mut self,
physical_optimizer_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
) -> Self {
self.physical_optimizers.rules.push(physical_optimizer_rule);
self
}
pub fn add_table_options_extension<T: ConfigExtension>(
mut self,
extension: T,
) -> Self {
self.table_options.extensions.insert(extension);
self
}
pub fn with_function_factory(
mut self,
function_factory: Arc<dyn FunctionFactory>,
) -> Self {
self.function_factory = Some(function_factory);
self
}
pub fn set_function_factory(&mut self, function_factory: Arc<dyn FunctionFactory>) {
self.function_factory = Some(function_factory);
}
pub fn with_serializer_registry(
mut self,
registry: Arc<dyn SerializerRegistry>,
) -> Self {
self.serializer_registry = registry;
self
}
pub fn function_factory(&self) -> Option<&Arc<dyn FunctionFactory>> {
self.function_factory.as_ref()
}
pub fn table_factories(&self) -> &HashMap<String, Arc<dyn TableProviderFactory>> {
&self.table_factories
}
pub fn table_factories_mut(
&mut self,
) -> &mut HashMap<String, Arc<dyn TableProviderFactory>> {
&mut self.table_factories
}
pub fn sql_to_statement(
&self,
sql: &str,
dialect: &str,
) -> datafusion_common::Result<Statement> {
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
if statements.len() > 1 {
return not_impl_err!(
"The context currently only supports a single SQL statement"
);
}
let statement = statements.pop_front().ok_or_else(|| {
DataFusionError::NotImplemented(
"The context requires a statement!".to_string(),
)
})?;
Ok(statement)
}
pub fn sql_to_expr(
&self,
sql: &str,
dialect: &str,
) -> datafusion_common::Result<SQLExpr> {
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let expr = DFParser::parse_sql_into_expr_with_dialect(sql, dialect.as_ref())?;
Ok(expr)
}
pub fn resolve_table_references(
&self,
statement: &datafusion_sql::parser::Statement,
) -> datafusion_common::Result<Vec<TableReference>> {
let enable_ident_normalization =
self.config.options().sql_parser.enable_ident_normalization;
let (table_refs, _) = crate::catalog::resolve_table_references(
statement,
enable_ident_normalization,
)?;
Ok(table_refs)
}
pub async fn statement_to_plan(
&self,
statement: datafusion_sql::parser::Statement,
) -> datafusion_common::Result<LogicalPlan> {
let references = self.resolve_table_references(&statement)?;
let mut provider = SessionContextProvider {
state: self,
tables: HashMap::with_capacity(references.len()),
};
for reference in references {
let resolved = &self.resolve_table_ref(reference);
if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) {
if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
if let Some(table) = schema.table(&resolved.table).await? {
v.insert(provider_as_source(table));
}
}
}
}
let query = self.build_sql_query_planner(&provider);
query.statement_to_plan(statement)
}
fn get_parser_options(&self) -> ParserOptions {
let sql_parser_options = &self.config.options().sql_parser;
ParserOptions {
parse_float_as_decimal: sql_parser_options.parse_float_as_decimal,
enable_ident_normalization: sql_parser_options.enable_ident_normalization,
support_varchar_with_length: sql_parser_options.support_varchar_with_length,
}
}
pub async fn create_logical_plan(
&self,
sql: &str,
) -> datafusion_common::Result<LogicalPlan> {
let dialect = self.config.options().sql_parser.dialect.as_str();
let statement = self.sql_to_statement(sql, dialect)?;
let plan = self.statement_to_plan(statement).await?;
Ok(plan)
}
pub fn create_logical_expr(
&self,
sql: &str,
df_schema: &DFSchema,
) -> datafusion_common::Result<Expr> {
let dialect = self.config.options().sql_parser.dialect.as_str();
let sql_expr = self.sql_to_expr(sql, dialect)?;
let provider = SessionContextProvider {
state: self,
tables: HashMap::new(),
};
let query = self.build_sql_query_planner(&provider);
query.sql_to_expr(sql_expr, df_schema, &mut PlannerContext::new())
}
pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
let analyzer_result = self.analyzer.execute_and_check(
e.plan.as_ref().clone(),
self.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
stringified_plans.push(analyzed_plan.to_stringified(plan_type));
},
);
let analyzed_plan = match analyzer_result {
Ok(plan) => plan,
Err(DataFusionError::Context(analyzer_name, err)) => {
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, err.to_string()));
return Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: e.plan.clone(),
stringified_plans,
schema: e.schema.clone(),
logical_optimization_succeeded: false,
}));
}
Err(e) => return Err(e),
};
stringified_plans
.push(analyzed_plan.to_stringified(PlanType::FinalAnalyzedLogicalPlan));
let optimized_plan = self.optimizer.optimize(
analyzed_plan,
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
},
);
let (plan, logical_optimization_succeeded) = match optimized_plan {
Ok(plan) => (Arc::new(plan), true),
Err(DataFusionError::Context(optimizer_name, err)) => {
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, err.to_string()));
(e.plan.clone(), false)
}
Err(e) => return Err(e),
};
Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan,
stringified_plans,
schema: e.schema.clone(),
logical_optimization_succeeded,
}))
} else {
let analyzed_plan = self.analyzer.execute_and_check(
plan.clone(),
self.options(),
|_, _| {},
)?;
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
}
}
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let logical_plan = self.optimize(logical_plan)?;
self.query_planner
.create_physical_plan(&logical_plan, self)
.await
}
pub fn create_physical_expr(
&self,
expr: Expr,
df_schema: &DFSchema,
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
let simplifier =
ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
let mut expr = simplifier.coerce(expr, df_schema)?;
let config_options = self.config_options();
for rewrite in self.analyzer.function_rewrites() {
expr = expr
.transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))?
.data;
}
create_physical_expr(&expr, df_schema, self.execution_props())
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
&self.runtime_env
}
pub fn execution_props(&self) -> &ExecutionProps {
&self.execution_props
}
pub fn execution_props_mut(&mut self) -> &mut ExecutionProps {
&mut self.execution_props
}
pub fn config(&self) -> &SessionConfig {
&self.config
}
pub fn config_mut(&mut self) -> &mut SessionConfig {
&mut self.config
}
pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
&self.physical_optimizers.rules
}
pub fn config_options(&self) -> &ConfigOptions {
self.config.options()
}
pub fn default_table_options(&self) -> TableOptions {
self.table_options
.combine_with_session_config(self.config_options())
}
pub fn table_options(&self) -> &TableOptions {
&self.table_options
}
pub fn table_options_mut(&mut self) -> &mut TableOptions {
&mut self.table_options
}
pub fn register_table_options_extension<T: ConfigExtension>(&mut self, extension: T) {
self.table_options.extensions.insert(extension)
}
pub fn register_file_format(
&mut self,
file_format: Arc<dyn FileFormatFactory>,
overwrite: bool,
) -> Result<(), DataFusionError> {
let ext = file_format.get_ext().to_lowercase();
match (self.file_formats.entry(ext.clone()), overwrite){
(Entry::Vacant(e), _) => {e.insert(file_format);},
(Entry::Occupied(mut e), true) => {e.insert(file_format);},
(Entry::Occupied(_), false) => return config_err!("File type already registered for extension {ext}. Set overwrite to true to replace this extension."),
};
Ok(())
}
pub fn get_file_format_factory(
&self,
ext: &str,
) -> Option<Arc<dyn FileFormatFactory>> {
self.file_formats.get(&ext.to_lowercase()).cloned()
}
pub fn task_ctx(&self) -> Arc<TaskContext> {
Arc::new(TaskContext::from(self))
}
pub fn catalog_list(&self) -> &Arc<dyn CatalogProviderList> {
&self.catalog_list
}
pub(crate) fn register_catalog_list(
&mut self,
catalog_list: Arc<dyn CatalogProviderList>,
) {
self.catalog_list = catalog_list;
}
pub fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
&self.scalar_functions
}
pub fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
&self.aggregate_functions
}
pub fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
&self.window_functions
}
pub fn table_functions(&self) -> &HashMap<String, Arc<TableFunction>> {
&self.table_functions
}
pub fn serializer_registry(&self) -> &Arc<dyn SerializerRegistry> {
&self.serializer_registry
}
pub fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
pub fn register_udtf(&mut self, name: &str, fun: Arc<dyn TableFunctionImpl>) {
self.table_functions.insert(
name.to_owned(),
Arc::new(TableFunction::new(name.to_owned(), fun)),
);
}
pub fn deregister_udtf(
&mut self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableFunctionImpl>>> {
let udtf = self.table_functions.remove(name);
Ok(udtf.map(|x| x.function().clone()))
}
fn build_sql_query_planner<'a, S>(&self, provider: &'a S) -> SqlToRel<'a, S>
where
S: ContextProvider,
{
let mut query = SqlToRel::new_with_options(provider, self.get_parser_options());
for planner in self.expr_planners.iter() {
query = query.with_user_defined_planner(planner.clone());
}
query
}
}
struct SessionContextProvider<'a> {
state: &'a SessionState,
tables: HashMap<String, Arc<dyn TableSource>>,
}
impl<'a> ContextProvider for SessionContextProvider<'a> {
fn get_table_source(
&self,
name: TableReference,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
let name = self.state.resolve_table_ref(name).to_string();
self.tables
.get(&name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
}
fn get_table_function_source(
&self,
name: &str,
args: Vec<Expr>,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
let tbl_func = self
.state
.table_functions
.get(name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
let provider = tbl_func.create_table_provider(&args)?;
Ok(provider_as_source(provider))
}
fn create_cte_work_table(
&self,
name: &str,
schema: SchemaRef,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
let table = Arc::new(CteWorkTable::new(name, schema));
Ok(provider_as_source(table))
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.state.scalar_functions().get(name).cloned()
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.state.aggregate_functions().get(name).cloned()
}
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
self.state.window_functions().get(name).cloned()
}
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
if variable_names.is_empty() {
return None;
}
let provider_type = if is_system_variables(variable_names) {
VarType::System
} else {
VarType::UserDefined
};
self.state
.execution_props
.var_providers
.as_ref()
.and_then(|provider| provider.get(&provider_type)?.get_type(variable_names))
}
fn options(&self) -> &ConfigOptions {
self.state.config_options()
}
fn udf_names(&self) -> Vec<String> {
self.state.scalar_functions().keys().cloned().collect()
}
fn udaf_names(&self) -> Vec<String> {
self.state.aggregate_functions().keys().cloned().collect()
}
fn udwf_names(&self) -> Vec<String> {
self.state.window_functions().keys().cloned().collect()
}
fn get_file_type(&self, ext: &str) -> datafusion_common::Result<Arc<dyn FileType>> {
self.state
.file_formats
.get(&ext.to_lowercase())
.ok_or(plan_datafusion_err!(
"There is no registered file format with ext {ext}"
))
.map(|file_type| format_as_file_type(file_type.clone()))
}
}
impl FunctionRegistry for SessionState {
fn udfs(&self) -> HashSet<String> {
self.scalar_functions.keys().cloned().collect()
}
fn udf(&self, name: &str) -> datafusion_common::Result<Arc<ScalarUDF>> {
let result = self.scalar_functions.get(name);
result.cloned().ok_or_else(|| {
plan_datafusion_err!("There is no UDF named \"{name}\" in the registry")
})
}
fn udaf(&self, name: &str) -> datafusion_common::Result<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(name);
result.cloned().ok_or_else(|| {
plan_datafusion_err!("There is no UDAF named \"{name}\" in the registry")
})
}
fn udwf(&self, name: &str) -> datafusion_common::Result<Arc<WindowUDF>> {
let result = self.window_functions.get(name);
result.cloned().ok_or_else(|| {
plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry")
})
}
fn register_udf(
&mut self,
udf: Arc<ScalarUDF>,
) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
udf.aliases().iter().for_each(|alias| {
self.scalar_functions.insert(alias.clone(), udf.clone());
});
Ok(self.scalar_functions.insert(udf.name().into(), udf))
}
fn register_udaf(
&mut self,
udaf: Arc<AggregateUDF>,
) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
udaf.aliases().iter().for_each(|alias| {
self.aggregate_functions.insert(alias.clone(), udaf.clone());
});
Ok(self.aggregate_functions.insert(udaf.name().into(), udaf))
}
fn register_udwf(
&mut self,
udwf: Arc<WindowUDF>,
) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
udwf.aliases().iter().for_each(|alias| {
self.window_functions.insert(alias.clone(), udwf.clone());
});
Ok(self.window_functions.insert(udwf.name().into(), udwf))
}
fn deregister_udf(
&mut self,
name: &str,
) -> datafusion_common::Result<Option<Arc<ScalarUDF>>> {
let udf = self.scalar_functions.remove(name);
if let Some(udf) = &udf {
for alias in udf.aliases() {
self.scalar_functions.remove(alias);
}
}
Ok(udf)
}
fn deregister_udaf(
&mut self,
name: &str,
) -> datafusion_common::Result<Option<Arc<AggregateUDF>>> {
let udaf = self.aggregate_functions.remove(name);
if let Some(udaf) = &udaf {
for alias in udaf.aliases() {
self.aggregate_functions.remove(alias);
}
}
Ok(udaf)
}
fn deregister_udwf(
&mut self,
name: &str,
) -> datafusion_common::Result<Option<Arc<WindowUDF>>> {
let udwf = self.window_functions.remove(name);
if let Some(udwf) = &udwf {
for alias in udwf.aliases() {
self.window_functions.remove(alias);
}
}
Ok(udwf)
}
fn register_function_rewrite(
&mut self,
rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
) -> datafusion_common::Result<()> {
self.analyzer.add_function_rewrite(rewrite);
Ok(())
}
fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
self.expr_planners.clone()
}
fn register_expr_planner(
&mut self,
expr_planner: Arc<dyn ExprPlanner>,
) -> datafusion_common::Result<()> {
self.expr_planners.push(expr_planner);
Ok(())
}
}
impl OptimizerConfig for SessionState {
fn query_execution_start_time(&self) -> DateTime<Utc> {
self.execution_props.query_execution_start_time
}
fn alias_generator(&self) -> Arc<AliasGenerator> {
self.execution_props.alias_generator.clone()
}
fn options(&self) -> &ConfigOptions {
self.config_options()
}
fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
Some(self)
}
}
impl From<&SessionState> for TaskContext {
fn from(state: &SessionState) -> Self {
let task_id = None;
TaskContext::new(
task_id,
state.session_id.clone(),
state.config.clone(),
state.scalar_functions.clone(),
state.aggregate_functions.clone(),
state.window_functions.clone(),
state.runtime_env.clone(),
)
}
}
struct DefaultQueryPlanner {}
#[async_trait]
impl QueryPlanner for DefaultQueryPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let planner = DefaultPhysicalPlanner::default();
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}
struct SessionSimplifyProvider<'a> {
state: &'a SessionState,
df_schema: &'a DFSchema,
}
impl<'a> SessionSimplifyProvider<'a> {
fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
Self { state, df_schema }
}
}
impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {
fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result<bool> {
Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
}
fn nullable(&self, expr: &Expr) -> datafusion_common::Result<bool> {
expr.nullable(self.df_schema)
}
fn execution_props(&self) -> &ExecutionProps {
self.state.execution_props()
}
fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
expr.get_type(self.df_schema)
}
}