use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use sqlparser::ast::{Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, Value};
use sqlparser::parser::ParserError::ParserError;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn query_to_plan(
&self,
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
self.query_to_plan_with_schema(query, planner_context)
}
fn query_to_plan_with_schema(
&self,
query: Query,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
if with.recursive {
return Err(DataFusionError::NotImplemented(
"Recursive CTEs are not supported".to_string(),
));
}
for cte in with.cte_tables {
let cte_name = self.normalizer.normalize(cte.alias.name.clone());
if planner_context.contains_cte(&cte_name) {
return Err(DataFusionError::SQL(ParserError(format!(
"WITH query name {cte_name:?} specified more than once"
))));
}
let logical_plan =
self.query_to_plan(*cte.query, &mut planner_context.clone())?;
let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;
planner_context.insert_cte(cte_name, logical_plan);
}
}
let plan = self.set_expr_to_plan(*set_expr, planner_context)?;
let plan = self.order_by(plan, query.order_by, planner_context)?;
self.limit(plan, query.offset, query.limit)
}
fn limit(
&self,
input: LogicalPlan,
skip: Option<SQLOffset>,
fetch: Option<SQLExpr>,
) -> Result<LogicalPlan> {
if skip.is_none() && fetch.is_none() {
return Ok(input);
}
let skip = match skip {
Some(skip_expr) => match self.sql_to_expr(
skip_expr.value,
input.schema(),
&mut PlannerContext::new(),
)? {
Expr::Literal(ScalarValue::Int64(Some(s))) => {
if s < 0 {
return Err(DataFusionError::Plan(format!(
"Offset must be >= 0, '{s}' was provided."
)));
}
Ok(s as usize)
}
_ => Err(DataFusionError::Plan(
"Unexpected expression in OFFSET clause".to_string(),
)),
}?,
_ => 0,
};
let fetch = match fetch {
Some(limit_expr)
if limit_expr != sqlparser::ast::Expr::Value(Value::Null) =>
{
let n = match self.sql_to_expr(
limit_expr,
input.schema(),
&mut PlannerContext::new(),
)? {
Expr::Literal(ScalarValue::Int64(Some(n))) if n >= 0 => {
Ok(n as usize)
}
_ => Err(DataFusionError::Plan(
"LIMIT must not be negative".to_string(),
)),
}?;
Some(n)
}
_ => None,
};
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<OrderByExpr>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}
let order_by_rex = order_by
.into_iter()
.map(|e| self.order_by_to_sort_expr(e, plan.schema(), planner_context))
.collect::<Result<Vec<_>>>()?;
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
}
}