use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use arrow_schema::DataType;
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err,
DFSchema, Dependency, Result,
};
use datafusion_expr::expr::{ScalarFunction, Unnest};
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::{
expr, qualified_wildcard, wildcard, Expr, ExprFunctionExt, ExprSchemable,
WindowFrame, WindowFunctionDefinition,
};
use sqlparser::ast::{
DuplicateTreatment, Expr as SQLExpr, Function as SQLFunction, FunctionArg,
FunctionArgExpr, FunctionArgumentClause, FunctionArgumentList, FunctionArguments,
NullTreatment, ObjectName, OrderByExpr, WindowType,
};
pub fn suggest_valid_function(
input_function_name: &str,
is_window_func: bool,
ctx: &dyn ContextProvider,
) -> Option<String> {
let valid_funcs = if is_window_func {
let mut funcs = Vec::new();
funcs.extend(ctx.udaf_names());
funcs.extend(ctx.udwf_names());
funcs
} else {
let mut funcs = Vec::new();
funcs.extend(ctx.udf_names());
funcs.extend(ctx.udaf_names());
funcs
};
find_closest_match(valid_funcs, input_function_name)
}
fn find_closest_match(candidates: Vec<String>, target: &str) -> Option<String> {
let target = target.to_lowercase();
candidates.into_iter().min_by_key(|candidate| {
datafusion_common::utils::datafusion_strsim::levenshtein(
&candidate.to_lowercase(),
&target,
)
})
}
#[derive(Debug)]
struct FunctionArgs {
name: ObjectName,
args: Vec<FunctionArg>,
order_by: Vec<OrderByExpr>,
over: Option<WindowType>,
filter: Option<Box<SQLExpr>>,
null_treatment: Option<NullTreatment>,
distinct: bool,
}
impl FunctionArgs {
fn try_new(function: SQLFunction) -> Result<Self> {
let SQLFunction {
name,
args,
over,
filter,
mut null_treatment,
within_group,
..
} = function;
let FunctionArguments::List(args) = args else {
return Ok(Self {
name,
args: vec![],
order_by: vec![],
over,
filter,
null_treatment,
distinct: false,
});
};
let FunctionArgumentList {
duplicate_treatment,
args,
clauses,
} = args;
let distinct = match duplicate_treatment {
Some(DuplicateTreatment::Distinct) => true,
Some(DuplicateTreatment::All) => false,
None => false,
};
let mut order_by = None;
for clause in clauses {
match clause {
FunctionArgumentClause::IgnoreOrRespectNulls(nt) => {
if null_treatment.is_some() {
return not_impl_err!(
"Calling {name}: Duplicated null treatment clause"
);
}
null_treatment = Some(nt);
}
FunctionArgumentClause::OrderBy(oby) => {
if order_by.is_some() {
return not_impl_err!("Calling {name}: Duplicated ORDER BY clause in function arguments");
}
order_by = Some(oby);
}
FunctionArgumentClause::Limit(limit) => {
return not_impl_err!(
"Calling {name}: LIMIT not supported in function arguments: {limit}"
)
}
FunctionArgumentClause::OnOverflow(overflow) => {
return not_impl_err!(
"Calling {name}: ON OVERFLOW not supported in function arguments: {overflow}"
)
}
FunctionArgumentClause::Having(having) => {
return not_impl_err!(
"Calling {name}: HAVING not supported in function arguments: {having}"
)
}
FunctionArgumentClause::Separator(sep) => {
return not_impl_err!(
"Calling {name}: SEPARATOR not supported in function arguments: {sep}"
)
}
FunctionArgumentClause::JsonNullClause(jn) => {
return not_impl_err!(
"Calling {name}: JSON NULL clause not supported in function arguments: {jn}"
)
}
}
}
if !within_group.is_empty() {
return not_impl_err!("WITHIN GROUP is not supported yet: {within_group:?}");
}
let order_by = order_by.unwrap_or_default();
Ok(Self {
name,
args,
order_by,
over,
filter,
null_treatment,
distinct,
})
}
}
impl<S: ContextProvider> SqlToRel<'_, S> {
pub(super) fn sql_function_to_expr(
&self,
function: SQLFunction,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let function_args = FunctionArgs::try_new(function)?;
let FunctionArgs {
name,
args,
order_by,
over,
filter,
null_treatment,
distinct,
} = function_args;
let is_function_window = over.is_some();
let name = if name.0.len() > 1 {
name.to_string()
} else {
crate::utils::normalize_ident(name.0[0].clone())
};
if name.eq("make_map") {
let mut fn_args =
self.function_args_to_expr(args.clone(), schema, planner_context)?;
for planner in self.context_provider.get_expr_planners().iter() {
match planner.plan_make_map(fn_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => fn_args = args,
}
}
}
if let Some(fm) = self.context_provider.get_function_meta(&name) {
let args = self.function_args_to_expr(args, schema, planner_context)?;
return Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fm, args)));
}
if name.eq("unnest") {
let mut exprs = self.function_args_to_expr(args, schema, planner_context)?;
if exprs.len() != 1 {
return plan_err!("unnest() requires exactly one argument");
}
let expr = exprs.swap_remove(0);
Self::check_unnest_arg(&expr, schema)?;
return Ok(Expr::Unnest(Unnest::new(expr)));
}
if !order_by.is_empty() && is_function_window {
return plan_err!(
"Aggregate ORDER BY is not implemented for window functions"
);
}
if let Some(WindowType::WindowSpec(window)) = over {
let partition_by = window
.partition_by
.into_iter()
.filter(|e| !matches!(e, sqlparser::ast::Expr::Value { .. },))
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let mut order_by = self.order_by_to_sort_expr(
window.order_by,
schema,
planner_context,
false,
None,
)?;
let func_deps = schema.functional_dependencies();
let is_ordering_strict = order_by.iter().find_map(|orderby_expr| {
if let Expr::Column(col) = &orderby_expr.expr {
let idx = schema.index_of_column(col).ok()?;
return if func_deps.iter().any(|dep| {
dep.source_indices == vec![idx] && dep.mode == Dependency::Single
}) {
Some(true)
} else {
Some(false)
};
}
Some(false)
});
let window_frame = window
.window_frame
.as_ref()
.map(|window_frame| {
let window_frame: WindowFrame = window_frame.clone().try_into()?;
window_frame
.regularize_order_bys(&mut order_by)
.map(|_| window_frame)
})
.transpose()?;
let window_frame = if let Some(window_frame) = window_frame {
window_frame
} else if let Some(is_ordering_strict) = is_ordering_strict {
WindowFrame::new(Some(is_ordering_strict))
} else {
WindowFrame::new((!order_by.is_empty()).then_some(false))
};
if let Ok(fun) = self.find_window_func(&name) {
return Expr::WindowFunction(expr::WindowFunction::new(
fun,
self.function_args_to_expr(args, schema, planner_context)?,
))
.partition_by(partition_by)
.order_by(order_by)
.window_frame(window_frame)
.null_treatment(null_treatment)
.build();
}
} else {
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
let order_by = self.order_by_to_sort_expr(
order_by,
schema,
planner_context,
true,
None,
)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
let filter: Option<Box<Expr>> = filter
.map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
.transpose()?
.map(Box::new);
return Ok(Expr::AggregateFunction(expr::AggregateFunction::new_udf(
fm,
args,
distinct,
filter,
order_by,
null_treatment,
)));
}
}
if let Some(suggested_func_name) =
suggest_valid_function(&name, is_function_window, self.context_provider)
{
plan_err!("Invalid function '{name}'.\nDid you mean '{suggested_func_name}'?")
} else {
internal_err!("No functions registered with this context.")
}
}
pub(super) fn sql_fn_name_to_expr(
&self,
expr: SQLExpr,
fn_name: &str,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let fun = self
.context_provider
.get_function_meta(fn_name)
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected '{fn_name}' function")
})?;
let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
}
pub(super) fn find_window_func(
&self,
name: &str,
) -> Result<WindowFunctionDefinition> {
let udaf = self.context_provider.get_aggregate_meta(name);
if udaf.as_ref().is_some_and(|udaf| {
udaf.name() != "first_value"
&& udaf.name() != "last_value"
&& udaf.name() != "nth_value"
}) {
Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap()))
} else {
self.context_provider
.get_window_meta(name)
.map(WindowFunctionDefinition::WindowUDF)
.ok_or_else(|| {
plan_datafusion_err!("There is no window function named {name}")
})
}
}
fn sql_fn_arg_to_logical_expr(
&self,
sql: FunctionArg,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
match sql {
FunctionArg::Named {
name: _,
arg: FunctionArgExpr::Expr(arg),
operator: _,
} => self.sql_expr_to_logical_expr(arg, schema, planner_context),
FunctionArg::Named {
name: _,
arg: FunctionArgExpr::Wildcard,
operator: _,
} => Ok(wildcard()),
FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
self.sql_expr_to_logical_expr(arg, schema, planner_context)
}
FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(wildcard()),
FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(object_name)) => {
let qualifier = self.object_name_to_table_reference(object_name)?;
let qualified_indices = schema.fields_indices_with_qualified(&qualifier);
if qualified_indices.is_empty() {
return plan_err!("Invalid qualifier {qualifier}");
}
Ok(qualified_wildcard(qualifier))
}
_ => not_impl_err!("Unsupported qualified wildcard argument: {sql:?}"),
}
}
pub(super) fn function_args_to_expr(
&self,
args: Vec<FunctionArg>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Vec<Expr>> {
args.into_iter()
.map(|a| self.sql_fn_arg_to_logical_expr(a, schema, planner_context))
.collect::<Result<Vec<Expr>>>()
}
pub(crate) fn check_unnest_arg(arg: &Expr, schema: &DFSchema) -> Result<()> {
match arg.get_type(schema)? {
DataType::List(_)
| DataType::LargeList(_)
| DataType::FixedSizeList(_, _)
| DataType::Struct(_) => Ok(()),
DataType::Null => {
not_impl_err!("unnest() does not support null yet")
}
_ => {
plan_err!("unnest() can only be applied to array, struct and null")
}
}
}
}