use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use arrow::datatypes::Schema;
use datafusion_common::{
not_impl_err, plan_err, sql_err, Constraints, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, SetOperator,
SetQuantifier, Value,
};
use sqlparser::parser::ParserError::ParserError;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn query_to_plan(
&self,
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
self.query_to_plan_with_schema(query, planner_context)
}
fn query_to_plan_with_schema(
&self,
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
let is_recursive = with.recursive;
for cte in with.cte_tables {
let cte_name = self.normalizer.normalize(cte.alias.name.clone());
if planner_context.contains_cte(&cte_name) {
return sql_err!(ParserError(format!(
"WITH query name {cte_name:?} specified more than once"
)));
}
if is_recursive {
if !self
.context_provider
.options()
.execution
.enable_recursive_ctes
{
return not_impl_err!("Recursive CTEs are not enabled");
}
match *cte.query.body {
SetExpr::SetOperation {
op: SetOperator::Union,
left,
right,
set_quantifier,
} => {
let distinct = set_quantifier != SetQuantifier::All;
let static_plan = self
.set_expr_to_plan(*left, &mut planner_context.clone())?;
let work_table_source =
self.context_provider.create_cte_work_table(
&cte_name,
Arc::new(Schema::from(static_plan.schema().as_ref())),
)?;
let work_table_plan = LogicalPlanBuilder::scan(
cte_name.to_string(),
work_table_source,
None,
)?
.build()?;
let name = cte_name.clone();
planner_context.insert_cte(cte_name.clone(), work_table_plan);
let recursive_plan = self
.set_expr_to_plan(*right, &mut planner_context.clone())?;
let logical_plan = LogicalPlanBuilder::from(static_plan)
.to_recursive_query(name, recursive_plan, distinct)?
.build()?;
let final_plan =
self.apply_table_alias(logical_plan, cte.alias)?;
planner_context.insert_cte(cte_name.clone(), final_plan);
}
_ => {
return Err(DataFusionError::SQL(
ParserError(format!("Unsupported CTE: {cte}")),
None,
));
}
};
} else {
let logical_plan =
self.query_to_plan(*cte.query, &mut planner_context.clone())?;
let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;
planner_context.insert_cte(cte_name, logical_plan);
}
}
}
let plan = self.set_expr_to_plan(*(set_expr.clone()), planner_context)?;
let plan = self.order_by(plan, query.order_by, planner_context)?;
let plan = self.limit(plan, query.offset, query.limit)?;
let plan = match *set_expr {
SetExpr::Select(select) if select.into.is_some() => {
let select_into = select.into.unwrap();
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
name: self.object_name_to_table_reference(select_into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
column_defaults: vec![],
}))
}
_ => plan,
};
Ok(plan)
}
fn limit(
&self,
input: LogicalPlan,
skip: Option<SQLOffset>,
fetch: Option<SQLExpr>,
) -> Result<LogicalPlan> {
if skip.is_none() && fetch.is_none() {
return Ok(input);
}
let skip = match skip {
Some(skip_expr) => match self.sql_to_expr(
skip_expr.value,
input.schema(),
&mut PlannerContext::new(),
)? {
Expr::Literal(ScalarValue::Int64(Some(s))) => {
if s < 0 {
return plan_err!("Offset must be >= 0, '{s}' was provided.");
}
Ok(s as usize)
}
_ => plan_err!("Unexpected expression in OFFSET clause"),
}?,
_ => 0,
};
let fetch = match fetch {
Some(limit_expr)
if limit_expr != sqlparser::ast::Expr::Value(Value::Null) =>
{
let n = match self.sql_to_expr(
limit_expr,
input.schema(),
&mut PlannerContext::new(),
)? {
Expr::Literal(ScalarValue::Int64(Some(n))) if n >= 0 => {
Ok(n as usize)
}
_ => plan_err!("LIMIT must not be negative"),
}?;
Some(n)
}
_ => None,
};
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}
let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?;
if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
let distinct_on = distinct_on.clone().with_sort_expr(order_by_rex)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else {
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
}
}
}