use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs,
};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
};
use datafusion_expr::logical_plan::builder::project;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::Expr::Alias;
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, WindowType};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};
use std::collections::HashSet;
use std::sync::Arc;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn select_to_plan(
&self,
mut select: Select,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
if !select.cluster_by.is_empty() {
return Err(DataFusionError::NotImplemented("CLUSTER BY".to_string()));
}
if !select.lateral_views.is_empty() {
return Err(DataFusionError::NotImplemented("LATERAL VIEWS".to_string()));
}
if select.qualify.is_some() {
return Err(DataFusionError::NotImplemented("QUALIFY".to_string()));
}
if select.top.is_some() {
return Err(DataFusionError::NotImplemented("TOP".to_string()));
}
if !select.sort_by.is_empty() {
return Err(DataFusionError::NotImplemented("SORT BY".to_string()));
}
let plan = self.plan_from_tables(select.from, planner_context)?;
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
let plan = self.plan_selection(select.selection, plan, planner_context)?;
check_conflicting_windows(&select.named_window)?;
match_window_definitions(&mut select.projection, &select.named_window)?;
let select_exprs = self.prepare_select_exprs(
&plan,
select.projection,
empty_from,
planner_context,
)?;
let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
let mut combined_schema = (**projected_plan.schema()).clone();
combined_schema.merge(plan.schema());
let alias_map = extract_aliases(&select_exprs);
let having_expr_opt = select
.having
.map::<Result<Expr>, _>(|having_expr| {
let having_expr = self.sql_expr_to_logical_expr(
having_expr,
&combined_schema,
planner_context,
)?;
let having_expr = resolve_aliases_to_exprs(&having_expr, &alias_map)?;
normalize_col(having_expr, &projected_plan)
})
.transpose()?;
let mut aggr_expr_haystack = select_exprs.clone();
if let Some(having_expr) = &having_expr_opt {
aggr_expr_haystack.push(having_expr.clone());
}
let aggr_exprs = find_aggregate_exprs(&aggr_expr_haystack);
let group_by_exprs = select
.group_by
.into_iter()
.map(|e| {
let group_by_expr =
self.sql_expr_to_logical_expr(e, &combined_schema, planner_context)?;
let mut alias_map = alias_map.clone();
for f in plan.schema().fields() {
alias_map.remove(f.name());
}
let group_by_expr = resolve_aliases_to_exprs(&group_by_expr, &alias_map)?;
let group_by_expr =
resolve_positions_to_exprs(&group_by_expr, &select_exprs)
.unwrap_or(group_by_expr);
let group_by_expr = normalize_col(group_by_expr, &projected_plan)?;
self.validate_schema_satisfies_exprs(
plan.schema(),
&[group_by_expr.clone()],
)?;
Ok(group_by_expr)
})
.collect::<Result<Vec<Expr>>>()?;
let (plan, mut select_exprs_post_aggr, having_expr_post_aggr) = if !group_by_exprs
.is_empty()
|| !aggr_exprs.is_empty()
{
self.aggregate(
plan,
&select_exprs,
having_expr_opt.as_ref(),
group_by_exprs,
aggr_exprs,
)?
} else {
match having_expr_opt {
Some(having_expr) => return Err(DataFusionError::Plan(
format!("HAVING clause references: {having_expr} must appear in the GROUP BY clause or be used in an aggregate function"))),
None => (plan, select_exprs, having_expr_opt)
}
};
let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr {
LogicalPlanBuilder::from(plan)
.filter(having_expr_post_aggr)?
.build()?
} else {
plan
};
let window_func_exprs = find_window_exprs(&select_exprs_post_aggr);
let plan = if window_func_exprs.is_empty() {
plan
} else {
let plan = LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())?;
select_exprs_post_aggr = select_exprs_post_aggr
.iter()
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
.collect::<Result<Vec<Expr>>>()?;
plan
};
let plan = project(plan, select_exprs_post_aggr)?;
let distinct = select
.distinct
.map(|distinct| match distinct {
Distinct::Distinct => Ok(true),
Distinct::On(_) => Err(DataFusionError::NotImplemented(
"DISTINCT ON Exprs not supported".to_string(),
)),
})
.transpose()?
.unwrap_or(false);
let plan = if distinct {
LogicalPlanBuilder::from(plan).distinct()?.build()
} else {
Ok(plan)
}?;
let plan = if !select.distribute_by.is_empty() {
let x = select
.distribute_by
.iter()
.map(|e| {
self.sql_expr_to_logical_expr(
e.clone(),
&combined_schema,
planner_context,
)
})
.collect::<Result<Vec<_>>>()?;
LogicalPlanBuilder::from(plan)
.repartition(Partitioning::DistributeBy(x))?
.build()?
} else {
plan
};
Ok(plan)
}
fn plan_selection(
&self,
selection: Option<SQLExpr>,
plan: LogicalPlan,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);
let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;
let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
&[using_columns],
)?;
Ok(LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(plan),
)?))
}
None => Ok(plan),
}
}
pub(crate) fn plan_from_tables(
&self,
mut from: Vec<TableWithJoins>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
match from.len() {
0 => Ok(LogicalPlanBuilder::empty(true).build()?),
1 => {
let from = from.remove(0);
self.plan_table_with_joins(from, planner_context)
}
_ => {
let mut plans = from
.into_iter()
.map(|t| self.plan_table_with_joins(t, planner_context));
let mut left = LogicalPlanBuilder::from(plans.next().unwrap()?);
for right in plans {
left = left.cross_join(right?)?;
}
Ok(left.build()?)
}
}
}
fn prepare_select_exprs(
&self,
plan: &LogicalPlan,
projection: Vec<SelectItem>,
empty_from: bool,
planner_context: &mut PlannerContext,
) -> Result<Vec<Expr>> {
projection
.into_iter()
.map(|expr| self.sql_select_to_rex(expr, plan, empty_from, planner_context))
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
})
.collect::<Result<Vec<Expr>>>()
}
fn sql_select_to_rex(
&self,
sql: SelectItem,
plan: &LogicalPlan,
empty_from: bool,
planner_context: &mut PlannerContext,
) -> Result<Vec<Expr>> {
match sql {
SelectItem::UnnamedExpr(expr) => {
let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?;
let col = normalize_col_with_schemas_and_ambiguity_check(
expr,
&[&[plan.schema()]],
&plan.using_columns()?,
)?;
Ok(vec![col])
}
SelectItem::ExprWithAlias { expr, alias } => {
let select_expr =
self.sql_to_expr(expr, plan.schema(), planner_context)?;
let col = normalize_col_with_schemas_and_ambiguity_check(
select_expr,
&[&[plan.schema()]],
&plan.using_columns()?,
)?;
let expr = Alias(Box::new(col), self.normalizer.normalize(alias));
Ok(vec![expr])
}
SelectItem::Wildcard(options) => {
Self::check_wildcard_options(&options)?;
if empty_from {
return Err(DataFusionError::Plan(
"SELECT * with no tables specified is not valid".to_string(),
));
}
expand_wildcard(plan.schema().as_ref(), plan, Some(options))
}
SelectItem::QualifiedWildcard(ref object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = format!("{object_name}");
expand_qualified_wildcard(
&qualifier,
plan.schema().as_ref(),
Some(options),
)
}
}
}
fn check_wildcard_options(options: &WildcardAdditionalOptions) -> Result<()> {
let WildcardAdditionalOptions {
opt_exclude: _opt_exclude,
opt_except: _opt_except,
opt_rename,
opt_replace,
} = options;
if opt_rename.is_some() || opt_replace.is_some() {
Err(DataFusionError::NotImplemented(
"wildcard * with RENAME or REPLACE not supported ".to_string(),
))
} else {
Ok(())
}
}
fn project(&self, input: LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
self.validate_schema_satisfies_exprs(input.schema(), &expr)?;
LogicalPlanBuilder::from(input).project(expr)?.build()
}
fn aggregate(
&self,
input: LogicalPlan,
select_exprs: &[Expr],
having_expr_opt: Option<&Expr>,
group_by_exprs: Vec<Expr>,
aggr_exprs: Vec<Expr>,
) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
let plan = LogicalPlanBuilder::from(input.clone())
.aggregate(group_by_exprs.clone(), aggr_exprs.clone())?
.build()?;
let mut aggr_projection_exprs = vec![];
for expr in &group_by_exprs {
match expr {
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
aggr_projection_exprs.extend_from_slice(exprs)
}
Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
aggr_projection_exprs.extend_from_slice(exprs)
}
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
for exprs in lists_of_exprs {
aggr_projection_exprs.extend_from_slice(exprs)
}
}
_ => aggr_projection_exprs.push(expr.clone()),
}
}
aggr_projection_exprs.extend_from_slice(&aggr_exprs);
let aggr_projection_exprs = aggr_projection_exprs
.iter()
.map(|expr| resolve_columns(expr, &input))
.collect::<Result<Vec<Expr>>>()?;
let column_exprs_post_aggr = aggr_projection_exprs
.iter()
.map(|expr| expr_as_column_expr(expr, &input))
.collect::<Result<Vec<Expr>>>()?;
let select_exprs_post_aggr = select_exprs
.iter()
.map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input))
.collect::<Result<Vec<Expr>>>()?;
check_columns_satisfy_exprs(
&column_exprs_post_aggr,
&select_exprs_post_aggr,
"Projection references non-aggregate values",
)?;
let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt {
let having_expr_post_aggr =
rebase_expr(having_expr, &aggr_projection_exprs, &input)?;
check_columns_satisfy_exprs(
&column_exprs_post_aggr,
&[having_expr_post_aggr.clone()],
"HAVING clause references non-aggregate values",
)?;
Some(having_expr_post_aggr)
} else {
None
};
Ok((plan, select_exprs_post_aggr, having_expr_post_aggr))
}
}
fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> {
for (i, window_def_i) in window_defs.iter().enumerate() {
for window_def_j in window_defs.iter().skip(i + 1) {
if window_def_i.0 == window_def_j.0 {
return Err(DataFusionError::Plan(format!(
"The window {} is defined multiple times!",
window_def_i.0
)));
}
}
}
Ok(())
}
fn match_window_definitions(
projection: &mut [SelectItem],
named_windows: &[NamedWindowDefinition],
) -> Result<()> {
for proj in projection.iter_mut() {
if let SelectItem::ExprWithAlias {
expr: SQLExpr::Function(f),
alias: _,
}
| SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj
{
for NamedWindowDefinition(window_ident, window_spec) in named_windows.iter() {
if let Some(WindowType::NamedWindow(ident)) = &f.over {
if ident.eq(window_ident) {
f.over = Some(WindowType::WindowSpec(window_spec.clone()))
}
}
}
if let Some(WindowType::NamedWindow(ident)) = &f.over {
return Err(DataFusionError::Plan(format!(
"The window {ident} is not defined!"
)));
}
}
}
Ok(())
}