use std::sync::Arc;
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::stack::StackGuard;
use datafusion_common::{not_impl_err, Constraints, DFSchema, Result};
use datafusion_expr::expr::Sort;
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{
CreateMemoryTable, DdlStatement, Distinct, LogicalPlan, LogicalPlanBuilder,
};
use sqlparser::ast::{
Expr as SQLExpr, Offset as SQLOffset, OrderBy, OrderByExpr, OrderByKind, Query,
SelectInto, SetExpr,
};
impl<S: ContextProvider> SqlToRel<'_, S> {
pub(crate) fn query_to_plan(
&self,
query: Query,
outer_planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let mut query_plan_context = outer_planner_context.clone();
let planner_context = &mut query_plan_context;
if let Some(with) = query.with {
self.plan_with_clause(with, planner_context)?;
}
let set_expr = *query.body;
match set_expr {
SetExpr::Select(mut select) => {
let select_into = select.into.take();
let plan =
self.select_to_plan(*select, query.order_by, planner_context)?;
let plan =
self.limit(plan, query.offset, query.limit, planner_context)?;
self.select_into(plan, select_into)
}
other => {
let plan = {
let _guard = StackGuard::new(256 * 1024);
self.set_expr_to_plan(other, planner_context)
}?;
let oby_exprs = to_order_by_exprs(query.order_by)?;
let order_by_rex = self.order_by_to_sort_expr(
oby_exprs,
plan.schema(),
planner_context,
true,
None,
)?;
let plan = self.order_by(plan, order_by_rex)?;
self.limit(plan, query.offset, query.limit, planner_context)
}
}
}
fn limit(
&self,
input: LogicalPlan,
skip: Option<SQLOffset>,
fetch: Option<SQLExpr>,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
if skip.is_none() && fetch.is_none() {
return Ok(input);
}
let empty_schema = DFSchema::empty();
let skip = skip
.map(|o| self.sql_to_expr(o.value, &empty_schema, planner_context))
.transpose()?;
let fetch = fetch
.map(|e| self.sql_to_expr(e, &empty_schema, planner_context))
.transpose()?;
LogicalPlanBuilder::from(input)
.limit_by_expr(skip, fetch)?
.build()
}
pub(super) fn order_by(
&self,
plan: LogicalPlan,
order_by: Vec<Sort>,
) -> Result<LogicalPlan> {
if order_by.is_empty() {
return Ok(plan);
}
if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
let distinct_on = distinct_on.clone().with_sort_expr(order_by)?;
Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
} else {
LogicalPlanBuilder::from(plan).sort(order_by)?.build()
}
}
fn select_into(
&self,
plan: LogicalPlan,
select_into: Option<SelectInto>,
) -> Result<LogicalPlan> {
match select_into {
Some(into) => Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
CreateMemoryTable {
name: self.object_name_to_table_reference(into.name)?,
constraints: Constraints::empty(),
input: Arc::new(plan),
if_not_exists: false,
or_replace: false,
temporary: false,
column_defaults: vec![],
},
))),
_ => Ok(plan),
}
}
}
fn to_order_by_exprs(order_by: Option<OrderBy>) -> Result<Vec<OrderByExpr>> {
to_order_by_exprs_with_select(order_by, None)
}
pub(crate) fn to_order_by_exprs_with_select(
order_by: Option<OrderBy>,
_select_exprs: Option<&Vec<SelectExpr>>, ) -> Result<Vec<OrderByExpr>> {
let Some(OrderBy { kind, interpolate }) = order_by else {
return Ok(vec![]);
};
if let Some(_interpolate) = interpolate {
return not_impl_err!("ORDER BY INTERPOLATE is not supported");
}
match kind {
OrderByKind::All(_) => not_impl_err!("ORDER BY ALL is not supported"),
OrderByKind::Expressions(order_by_exprs) => Ok(order_by_exprs),
}
}