use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{not_impl_err, plan_err, DFSchema, Result, TableReference};
use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
use sqlparser::ast::{FunctionArg, FunctionArgExpr, TableFactor};
mod join;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn create_relation(
&self,
relation: TableFactor,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
TableFactor::Table {
name, alias, args, ..
} => {
if let Some(func_args) = args {
let tbl_func_name = name.0.first().unwrap().value.to_string();
let args = func_args
.into_iter()
.flat_map(|arg| {
if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
{
self.sql_expr_to_logical_expr(
expr,
&DFSchema::empty(),
planner_context,
)
} else {
plan_err!("Unsupported function argument type: {:?}", arg)
}
})
.collect::<Vec<_>>();
let provider = self
.context_provider
.get_table_function_source(&tbl_func_name, args)?;
let plan = LogicalPlanBuilder::scan(
TableReference::Bare {
table: "tmp_table".into(),
},
provider,
None,
)?
.build()?;
(plan, alias)
} else {
let table_ref = self.object_name_to_table_reference(name)?;
let table_name = table_ref.to_string();
let cte = planner_context.get_cte(&table_name);
(
match (
cte,
self.context_provider.get_table_source(table_ref.clone()),
) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Ok(provider)) => {
LogicalPlanBuilder::scan(table_ref, provider, None)?
.build()
}
(None, Err(e)) => Err(e),
}?,
alias,
)
}
}
TableFactor::Derived {
subquery, alias, ..
} => {
let logical_plan = self.query_to_plan(*subquery, planner_context)?;
(logical_plan, alias)
}
TableFactor::NestedJoin {
table_with_joins,
alias,
} => (
self.plan_table_with_joins(*table_with_joins, planner_context)?,
alias,
),
TableFactor::UNNEST {
alias,
array_exprs,
with_offset: false,
with_offset_alias: None,
} => {
let schema = DFSchema::empty();
let input = LogicalPlanBuilder::empty(true).build()?;
let unnest_exprs = array_exprs
.into_iter()
.map(|sql_expr| {
let expr = self.sql_expr_to_logical_expr(
sql_expr,
&schema,
planner_context,
)?;
Self::check_unnest_arg(&expr, &schema)?;
Ok(Expr::Unnest(Unnest::new(expr)))
})
.collect::<Result<Vec<_>>>()?;
if unnest_exprs.is_empty() {
return plan_err!("UNNEST must have at least one argument");
}
let logical_plan = self.try_process_unnest(input, unnest_exprs)?;
(logical_plan, alias)
}
TableFactor::UNNEST { .. } => {
return not_impl_err!(
"UNNEST table factor with offset is not supported yet"
);
}
_ => {
return not_impl_err!(
"Unsupported ast node {relation:?} in create_relation"
);
}
};
if let Some(alias) = alias {
self.apply_table_alias(plan, alias)
} else {
Ok(plan)
}
}
}