use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result};
use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, LogicalPlan};
use sqlparser::ast::{self};
use crate::unparser::utils::unproject_agg_exprs;
use super::{
ast::{
BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
},
utils::find_agg_node_within_select,
Unparser,
};
pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
let unparser = Unparser::default();
unparser.plan_to_sql(plan)
}
impl Unparser<'_> {
pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
match plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Window(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Sort(_)
| LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Distinct(_) => self.select_to_sql(plan),
LogicalPlan::Dml(_) => self.dml_to_sql(plan),
LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
}
}
fn select_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
let mut query_builder = QueryBuilder::default();
let mut select_builder = SelectBuilder::default();
select_builder.push_from(TableWithJoinsBuilder::default());
let mut relation_builder = RelationBuilder::default();
self.select_to_sql_recursively(
plan,
&mut query_builder,
&mut select_builder,
&mut relation_builder,
)?;
let mut twj = select_builder.pop_from().unwrap();
twj.relation(relation_builder);
select_builder.push_from(twj);
let body = ast::SetExpr::Select(Box::new(select_builder.build()?));
let query = query_builder.body(Box::new(body)).build()?;
Ok(ast::Statement::Query(Box::new(query)))
}
fn select_to_sql_recursively(
&self,
plan: &LogicalPlan,
query: &mut QueryBuilder,
select: &mut SelectBuilder,
relation: &mut RelationBuilder,
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
let mut builder = TableRelationBuilder::default();
let mut table_parts = vec![];
if let Some(schema_name) = scan.table_name.schema() {
table_parts.push(self.new_ident(schema_name.to_string()));
}
table_parts.push(self.new_ident(scan.table_name.table().to_string()));
builder.name(ast::ObjectName(table_parts));
relation.table(builder);
Ok(())
}
LogicalPlan::Projection(p) => {
if !select.already_projected() {
if let Some(agg) = find_agg_node_within_select(plan, true) {
let items = p
.expr
.iter()
.map(|proj_expr| {
let unproj = unproject_agg_exprs(proj_expr, agg)?;
self.select_item_to_sql(&unproj)
})
.collect::<Result<Vec<_>>>()?;
select.projection(items);
select.group_by(ast::GroupByExpr::Expressions(
agg.group_expr
.iter()
.map(|expr| self.expr_to_sql(expr))
.collect::<Result<Vec<_>>>()?,
));
} else {
let items = p
.expr
.iter()
.map(|e| self.select_item_to_sql(e))
.collect::<Result<Vec<_>>>()?;
select.projection(items);
}
self.select_to_sql_recursively(
p.input.as_ref(),
query,
select,
relation,
)
} else {
let mut derived_builder = DerivedRelationBuilder::default();
derived_builder.lateral(false).alias(None).subquery({
let inner_statment = self.plan_to_sql(plan)?;
if let ast::Statement::Query(inner_query) = inner_statment {
inner_query
} else {
return internal_err!(
"Subquery must be a Query, but found {inner_statment:?}"
);
}
});
relation.derived(derived_builder);
Ok(())
}
}
LogicalPlan::Filter(filter) => {
if let Some(agg) =
find_agg_node_within_select(plan, select.already_projected())
{
let unprojected = unproject_agg_exprs(&filter.predicate, agg)?;
let filter_expr = self.expr_to_sql(&unprojected)?;
select.having(Some(filter_expr));
} else {
let filter_expr = self.expr_to_sql(&filter.predicate)?;
select.selection(Some(filter_expr));
}
self.select_to_sql_recursively(
filter.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Limit(limit) => {
if let Some(fetch) = limit.fetch {
query.limit(Some(ast::Expr::Value(ast::Value::Number(
fetch.to_string(),
false,
))));
}
self.select_to_sql_recursively(
limit.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Sort(sort) => {
query.order_by(self.sort_to_sql(sort.expr.clone())?);
self.select_to_sql_recursively(
sort.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Aggregate(agg) => {
self.select_to_sql_recursively(
agg.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Distinct(_distinct) => {
not_impl_err!("Unsupported operator: {plan:?}")
}
LogicalPlan::Join(join) => {
match join.join_constraint {
JoinConstraint::On => {}
JoinConstraint::Using => {
return not_impl_err!(
"Unsupported join constraint: {:?}",
join.join_constraint
)
}
}
let join_filter = match &join.filter {
Some(filter) => Some(self.expr_to_sql(filter)?),
None => None,
};
let eq_op = ast::BinaryOperator::Eq;
let join_on = self.join_conditions_to_sql(&join.on, eq_op)?;
let join_expr = match (join_filter, join_on) {
(Some(filter), Some(on)) => Some(self.and_op_to_sql(filter, on)),
(Some(filter), None) => Some(filter),
(None, Some(on)) => Some(on),
(None, None) => None,
};
let join_constraint = match join_expr {
Some(expr) => ast::JoinConstraint::On(expr),
None => ast::JoinConstraint::None,
};
let mut right_relation = RelationBuilder::default();
self.select_to_sql_recursively(
join.left.as_ref(),
query,
select,
relation,
)?;
self.select_to_sql_recursively(
join.right.as_ref(),
query,
select,
&mut right_relation,
)?;
let ast_join = ast::Join {
relation: right_relation.build()?,
join_operator: self
.join_operator_to_sql(join.join_type, join_constraint),
};
let mut from = select.pop_from().unwrap();
from.push_join(ast_join);
select.push_from(from);
Ok(())
}
LogicalPlan::SubqueryAlias(plan_alias) => {
self.select_to_sql_recursively(
plan_alias.input.as_ref(),
query,
select,
relation,
)?;
relation.alias(Some(
self.new_table_alias(plan_alias.alias.table().to_string()),
));
Ok(())
}
LogicalPlan::Union(_union) => {
not_impl_err!("Unsupported operator: {plan:?}")
}
LogicalPlan::Window(_window) => {
not_impl_err!("Unsupported operator: {plan:?}")
}
LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"),
_ => not_impl_err!("Unsupported operator: {plan:?}"),
}
}
fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
match expr {
Expr::Alias(Alias { expr, name, .. }) => {
let inner = self.expr_to_sql(expr)?;
Ok(ast::SelectItem::ExprWithAlias {
expr: inner,
alias: self.new_ident(name.to_string()),
})
}
_ => {
let inner = self.expr_to_sql(expr)?;
Ok(ast::SelectItem::UnnamedExpr(inner))
}
}
}
fn sort_to_sql(&self, sort_exprs: Vec<Expr>) -> Result<Vec<ast::OrderByExpr>> {
sort_exprs
.iter()
.map(|expr: &Expr| match expr {
Expr::Sort(sort_expr) => {
let col = self.expr_to_sql(&sort_expr.expr)?;
Ok(ast::OrderByExpr {
asc: Some(sort_expr.asc),
expr: col,
nulls_first: Some(sort_expr.nulls_first),
})
}
_ => plan_err!("Expecting Sort expr"),
})
.collect::<Result<Vec<_>>>()
}
fn join_operator_to_sql(
&self,
join_type: JoinType,
constraint: ast::JoinConstraint,
) -> ast::JoinOperator {
match join_type {
JoinType::Inner => ast::JoinOperator::Inner(constraint),
JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
JoinType::Right => ast::JoinOperator::RightOuter(constraint),
JoinType::Full => ast::JoinOperator::FullOuter(constraint),
JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
}
}
fn join_conditions_to_sql(
&self,
join_conditions: &Vec<(Expr, Expr)>,
eq_op: ast::BinaryOperator,
) -> Result<Option<ast::Expr>> {
let mut exprs: Vec<ast::Expr> = vec![];
for (left, right) in join_conditions {
let l = self.expr_to_sql(left)?;
let r = self.expr_to_sql(right)?;
exprs.push(self.binary_op_to_sql(l, r, eq_op.clone()));
}
let join_expr: Option<ast::Expr> =
exprs.into_iter().reduce(|r, l| self.and_op_to_sql(r, l));
Ok(join_expr)
}
fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
}
fn new_table_alias(&self, alias: String) -> ast::TableAlias {
ast::TableAlias {
name: self.new_ident(alias),
columns: Vec::new(),
}
}
fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
not_impl_err!("Unsupported plan: {plan:?}")
}
}
impl From<BuilderError> for DataFusionError {
fn from(e: BuilderError) -> Self {
DataFusionError::External(Box::new(e))
}
}