use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{not_impl_err, plan_err, Constraints, Result, ScalarValue};
use datafusion_expr::expr::Sort;
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
Operator,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderBy, OrderByExpr, Query, SelectInto,
SetExpr, Value,
};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn query_to_plan(
&self,
query: Query,
outer_planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let mut query_plan_context = outer_planner_context.clone();
let planner_context = &mut query_plan_context;
if let Some(with) = query.with {
self.plan_with_clause(with, planner_context)?;
}
let set_expr = *query.body;
match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
let oby_exprs = to_order_by_exprs(query.order_by)?;
let plan = self.select_to_plan(*select, oby_exprs, planner_context)?;
let plan = self.limit(plan, query.offset, query.limit)?;
self.select_into(plan, select_into)
}
other => {
let plan = self.set_expr_to_plan(other, planner_context)?;
let oby_exprs = to_order_by_exprs(query.order_by)?;
let order_by_rex = self.order_by_to_sort_expr(
oby_exprs,
plan.schema(),
planner_context,
true,
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.offset, query.limit)
}
}
}
fn limit(
&self,
input: LogicalPlan,
skip: Option<SQLOffset>,
fetch: Option<SQLExpr>,
) -> Result<LogicalPlan> {
if skip.is_none() && fetch.is_none() {
return Ok(input);
}
let skip = match skip {
Some(skip_expr) => {
let expr = self.sql_to_expr(
skip_expr.value,
input.schema(),
&mut PlannerContext::new(),
)?;
let n = get_constant_result(&expr, "OFFSET")?;
convert_usize_with_check(n, "OFFSET")
}
_ => Ok(0),
}?;
let fetch = match fetch {
Some(limit_expr)
if limit_expr != sqlparser::ast::Expr::Value(Value::Null) =>
{
let expr = self.sql_to_expr(
limit_expr,
input.schema(),
&mut PlannerContext::new(),
)?;
let n = get_constant_result(&expr, "LIMIT")?;
Some(convert_usize_with_check(n, "LIMIT")?)
}
_ => None,
};
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
pub(super) fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<Sort>,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}
if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else {
LogicalPlanBuilder::from(plan).sort(order_by)?.build()
}
}
fn select_into(
&self,
plan: LogicalPlan,
select_into: Option<SelectInto>,
) -> Result<LogicalPlan> {
match select_into {
Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
column_defaults: vec![],
},
))),
_ => Ok(plan),
}
}
}
fn get_constant_result(expr: &Expr, arg_name: &str) -> Result<i64> {
match expr {
Expr::Literal(ScalarValue::Int64(Some(s))) => Ok(*s),
Expr::BinaryExpr(binary_expr) => {
let lhs = get_constant_result(&binary_expr.left, arg_name)?;
let rhs = get_constant_result(&binary_expr.right, arg_name)?;
let res = match binary_expr.op {
Operator::Plus => lhs + rhs,
Operator::Minus => lhs - rhs,
Operator::Multiply => lhs * rhs,
_ => return plan_err!("Unsupported operator for {arg_name} clause"),
};
Ok(res)
}
_ => plan_err!("Unexpected expression in {arg_name} clause"),
}
}
fn convert_usize_with_check(n: i64, arg_name: &str) -> Result<usize> {
if n < 0 {
plan_err!("{arg_name} must be >= 0, '{n}' was provided.")
} else {
Ok(n as usize)
}
}
fn to_order_by_exprs(order_by: Option<OrderBy>) -> Result<Vec<OrderByExpr>> {
let Some(OrderBy { exprs, interpolate }) = order_by else {
return Ok(vec![]);
};
if let Some(_interpolate) = interpolate {
return not_impl_err!("ORDER BY INTERPOLATE is not supported");
}
Ok(exprs)
}