use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{Constraints, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Expr, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, Value,
};
use datafusion_common::plan_err;
use sqlparser::parser::ParserError::ParserError;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn query_to_plan(
&self,
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
self.query_to_plan_with_schema(query, planner_context)
}
fn query_to_plan_with_schema(
&self,
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
if with.recursive {
return Err(DataFusionError::NotImplemented(
"Recursive CTEs are not supported".to_string(),
));
}
for cte in with.cte_tables {
let cte_name = self.normalizer.normalize(cte.alias.name.clone());
if planner_context.contains_cte(&cte_name) {
return Err(DataFusionError::SQL(ParserError(format!(
"WITH query name {cte_name:?} specified more than once"
))));
}
let logical_plan =
self.query_to_plan(*cte.query, &mut planner_context.clone())?;
let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;
planner_context.insert_cte(cte_name, logical_plan);
}
}
let plan = self.set_expr_to_plan(*(set_expr.clone()), planner_context)?;
let plan = self.order_by(plan, query.order_by, planner_context)?;
let plan = self.limit(plan, query.offset, query.limit)?;
let plan = match *set_expr {
SetExpr::Select(select) if select.into.is_some() => {
let select_into = select.into.unwrap();
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
name: self.object_name_to_table_reference(select_into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
}))
}
_ => plan,
};
Ok(plan)
}
fn limit(
&self,
input: LogicalPlan,
skip: Option<SQLOffset>,
fetch: Option<SQLExpr>,
) -> Result<LogicalPlan> {
if skip.is_none() && fetch.is_none() {
return Ok(input);
}
let skip = match skip {
Some(skip_expr) => match self.sql_to_expr(
skip_expr.value,
input.schema(),
&mut PlannerContext::new(),
)? {
Expr::Literal(ScalarValue::Int64(Some(s))) => {
if s < 0 {
return plan_err!("Offset must be >= 0, '{s}' was provided.");
}
Ok(s as usize)
}
_ => plan_err!("Unexpected expression in OFFSET clause"),
}?,
_ => 0,
};
let fetch = match fetch {
Some(limit_expr)
if limit_expr != sqlparser::ast::Expr::Value(Value::Null) =>
{
let n = match self.sql_to_expr(
limit_expr,
input.schema(),
&mut PlannerContext::new(),
)? {
Expr::Literal(ScalarValue::Int64(Some(n))) if n >= 0 => {
Ok(n as usize)
}
_ => plan_err!("LIMIT must not be negative"),
}?;
Some(n)
}
_ => None,
};
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}
let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context)?;
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
}
}