use crate::planner::{
idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel,
};
use crate::utils::normalize_ident;
use datafusion_common::{
Column, DFSchema, DataFusionError, OwnedTableReference, Result, ScalarValue,
};
use datafusion_expr::{Case, Expr, GetIndexedField};
use sqlparser::ast::{Expr as SQLExpr, Ident};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_identifier_to_expr(&self, id: Ident) -> Result<Expr> {
if id.value.starts_with('@') {
let var_names = vec![id.value];
let ty = self
.schema_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"variable {var_names:?} has no type information"
))
})?;
Ok(Expr::ScalarVariable(ty, var_names))
} else {
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident(id),
}))
}
}
pub(super) fn sql_compound_identifier_to_expr(
&self,
ids: Vec<Ident>,
schema: &DFSchema,
) -> Result<Expr> {
if ids[0].value.starts_with('@') {
let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect();
let ty = self
.schema_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"variable {var_names:?} has no type information"
))
})?;
Ok(Expr::ScalarVariable(ty, var_names))
} else {
let (name, relation) = match idents_to_table_reference(
ids,
self.options.enable_ident_normalization,
)? {
OwnedTableReference::Partial { schema, table } => (table, schema),
r @ OwnedTableReference::Bare { .. }
| r @ OwnedTableReference::Full { .. } => {
return Err(DataFusionError::Plan(format!(
"Unsupported compound identifier '{r:?}'",
)));
}
};
match schema.field_with_qualified_name(&relation, &name) {
Ok(_) => {
Ok(Expr::Column(Column {
relation: Some(relation),
name,
}))
}
Err(_) => {
if let Some(field) =
schema.fields().iter().find(|f| f.name().eq(&relation))
{
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(Expr::Column(field.qualified_column())),
ScalarValue::Utf8(Some(name)),
)))
} else {
Ok(Expr::Column(Column {
relation: Some(relation),
name,
}))
}
}
}
}
}
pub(super) fn sql_case_identifier_to_expr(
&self,
operand: Option<Box<SQLExpr>>,
conditions: Vec<SQLExpr>,
results: Vec<SQLExpr>,
else_result: Option<Box<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let expr = if let Some(e) = operand {
Some(Box::new(self.sql_expr_to_logical_expr(
*e,
schema,
planner_context,
)?))
} else {
None
};
let when_expr = conditions
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let then_expr = results
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let else_expr = if let Some(e) = else_result {
Some(Box::new(self.sql_expr_to_logical_expr(
*e,
schema,
planner_context,
)?))
} else {
None
};
Ok(Expr::Case(Case::new(
expr,
when_expr
.iter()
.zip(then_expr.iter())
.map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
.collect(),
else_expr,
)))
}
}