pub use super::Operator;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{window_frames, DFField, DFSchema, LogicalPlan};
use crate::physical_plan::{
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
window_functions,
};
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use std::collections::{HashMap, HashSet};
use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
pub relation: Option<String>,
pub name: String,
}
impl Column {
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
name: name.into(),
}
}
pub fn from_qualified_name(flat_name: &str) -> Self {
use sqlparser::tokenizer::Token;
let dialect = sqlparser::dialect::GenericDialect {};
let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
if let Ok(tokens) = tokenizer.tokenize() {
if let [Token::Word(relation), Token::Period, Token::Word(name)] =
tokens.as_slice()
{
return Column {
relation: Some(relation.value.clone()),
name: name.value.clone(),
};
}
}
Column {
relation: None,
name: String::from(flat_name),
}
}
pub fn flat_name(&self) -> String {
match &self.relation {
Some(r) => format!("{}.{}", r, self.name),
None => self.name.clone(),
}
}
pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
let schemas = plan.all_schemas();
let using_columns = plan.using_columns()?;
self.normalize_with_schemas(&schemas, &using_columns)
}
fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}
for schema in schemas {
let fields = schema.fields_with_unqualified_name(&self.name);
match fields.len() {
0 => continue,
1 => {
return Ok(fields[0].qualified_column());
}
_ => {
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
if all_matched {
return Ok(fields[0].qualified_column());
}
}
}
}
}
Err(DataFusionError::Plan(format!(
"Column {} not found in provided schemas",
self
)))
}
}
impl From<&str> for Column {
fn from(c: &str) -> Self {
Self::from_qualified_name(c)
}
}
impl FromStr for Column {
type Err = Infallible;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(s.into())
}
}
impl fmt::Display for Column {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.relation {
Some(r) => write!(f, "#{}.{}", r, self.name),
None => write!(f, "#{}", self.name),
}
}
}
#[derive(Clone, PartialEq)]
pub enum Expr {
Alias(Box<Expr>, String),
Column(Column),
ScalarVariable(Vec<String>),
Literal(ScalarValue),
BinaryExpr {
left: Box<Expr>,
op: Operator,
right: Box<Expr>,
},
Not(Box<Expr>),
IsNotNull(Box<Expr>),
IsNull(Box<Expr>),
Negative(Box<Expr>),
Between {
expr: Box<Expr>,
negated: bool,
low: Box<Expr>,
high: Box<Expr>,
},
Case {
expr: Option<Box<Expr>>,
when_then_expr: Vec<(Box<Expr>, Box<Expr>)>,
else_expr: Option<Box<Expr>>,
},
Cast {
expr: Box<Expr>,
data_type: DataType,
},
TryCast {
expr: Box<Expr>,
data_type: DataType,
},
Sort {
expr: Box<Expr>,
asc: bool,
nulls_first: bool,
},
ScalarFunction {
fun: functions::BuiltinScalarFunction,
args: Vec<Expr>,
},
ScalarUDF {
fun: Arc<ScalarUDF>,
args: Vec<Expr>,
},
AggregateFunction {
fun: aggregates::AggregateFunction,
args: Vec<Expr>,
distinct: bool,
},
WindowFunction {
fun: window_functions::WindowFunction,
args: Vec<Expr>,
partition_by: Vec<Expr>,
order_by: Vec<Expr>,
window_frame: Option<window_frames::WindowFrame>,
},
AggregateUDF {
fun: Arc<AggregateUDF>,
args: Vec<Expr>,
},
InList {
expr: Box<Expr>,
list: Vec<Expr>,
negated: bool,
},
Wildcard,
}
impl Expr {
pub fn get_type(&self, schema: &DFSchema) -> Result<DataType> {
match self {
Expr::Alias(expr, _) => expr.get_type(schema),
Expr::Column(c) => Ok(schema.field_from_column(c)?.data_type().clone()),
Expr::ScalarVariable(_) => Ok(DataType::Utf8),
Expr::Literal(l) => Ok(l.get_datatype()),
Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
Expr::Cast { data_type, .. } => Ok(data_type.clone()),
Expr::TryCast { data_type, .. } => Ok(data_type.clone()),
Expr::ScalarUDF { fun, args } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok((fun.return_type)(&data_types)?.as_ref().clone())
}
Expr::ScalarFunction { fun, args } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
functions::return_type(fun, &data_types)
}
Expr::WindowFunction { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
window_functions::return_type(fun, &data_types)
}
Expr::AggregateFunction { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
aggregates::return_type(fun, &data_types)
}
Expr::AggregateUDF { fun, args, .. } => {
let data_types = args
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
Ok((fun.return_type)(&data_types)?.as_ref().clone())
}
Expr::Not(_) => Ok(DataType::Boolean),
Expr::Negative(expr) => expr.get_type(schema),
Expr::IsNull(_) => Ok(DataType::Boolean),
Expr::IsNotNull(_) => Ok(DataType::Boolean),
Expr::BinaryExpr {
ref left,
ref right,
ref op,
} => binary_operator_data_type(
&left.get_type(schema)?,
op,
&right.get_type(schema)?,
),
Expr::Sort { ref expr, .. } => expr.get_type(schema),
Expr::Between { .. } => Ok(DataType::Boolean),
Expr::InList { .. } => Ok(DataType::Boolean),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
}
}
pub fn nullable(&self, input_schema: &DFSchema) -> Result<bool> {
match self {
Expr::Alias(expr, _) => expr.nullable(input_schema),
Expr::Column(c) => Ok(input_schema.field_from_column(c)?.is_nullable()),
Expr::Literal(value) => Ok(value.is_null()),
Expr::ScalarVariable(_) => Ok(true),
Expr::Case {
when_then_expr,
else_expr,
..
} => {
let then_nullable = when_then_expr
.iter()
.map(|(_, t)| t.nullable(input_schema))
.collect::<Result<Vec<_>>>()?;
if then_nullable.contains(&true) {
Ok(true)
} else if let Some(e) = else_expr {
e.nullable(input_schema)
} else {
Ok(false)
}
}
Expr::Cast { expr, .. } => expr.nullable(input_schema),
Expr::TryCast { .. } => Ok(true),
Expr::ScalarFunction { .. } => Ok(true),
Expr::ScalarUDF { .. } => Ok(true),
Expr::WindowFunction { .. } => Ok(true),
Expr::AggregateFunction { .. } => Ok(true),
Expr::AggregateUDF { .. } => Ok(true),
Expr::Not(expr) => expr.nullable(input_schema),
Expr::Negative(expr) => expr.nullable(input_schema),
Expr::IsNull(_) => Ok(false),
Expr::IsNotNull(_) => Ok(false),
Expr::BinaryExpr {
ref left,
ref right,
..
} => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
Expr::Sort { ref expr, .. } => expr.nullable(input_schema),
Expr::Between { ref expr, .. } => expr.nullable(input_schema),
Expr::InList { ref expr, .. } => expr.nullable(input_schema),
Expr::Wildcard => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
}
}
pub fn name(&self, input_schema: &DFSchema) -> Result<String> {
create_name(self, input_schema)
}
pub fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
match self {
Expr::Column(c) => Ok(DFField::new(
c.relation.as_deref(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
_ => Ok(DFField::new(
None,
&self.name(input_schema)?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
}
}
pub fn cast_to(self, cast_to_type: &DataType, schema: &DFSchema) -> Result<Expr> {
let this_type = self.get_type(schema)?;
if this_type == *cast_to_type {
Ok(self)
} else if can_cast_types(&this_type, cast_to_type) {
Ok(Expr::Cast {
expr: Box::new(self),
data_type: cast_to_type.clone(),
})
} else {
Err(DataFusionError::Plan(format!(
"Cannot automatically convert {:?} to {:?}",
this_type, cast_to_type
)))
}
}
pub fn eq(self, other: Expr) -> Expr {
binary_expr(self, Operator::Eq, other)
}
pub fn not_eq(self, other: Expr) -> Expr {
binary_expr(self, Operator::NotEq, other)
}
pub fn gt(self, other: Expr) -> Expr {
binary_expr(self, Operator::Gt, other)
}
pub fn gt_eq(self, other: Expr) -> Expr {
binary_expr(self, Operator::GtEq, other)
}
pub fn lt(self, other: Expr) -> Expr {
binary_expr(self, Operator::Lt, other)
}
pub fn lt_eq(self, other: Expr) -> Expr {
binary_expr(self, Operator::LtEq, other)
}
pub fn and(self, other: Expr) -> Expr {
binary_expr(self, Operator::And, other)
}
pub fn or(self, other: Expr) -> Expr {
binary_expr(self, Operator::Or, other)
}
#[allow(clippy::should_implement_trait)]
pub fn not(self) -> Expr {
Expr::Not(Box::new(self))
}
pub fn modulus(self, other: Expr) -> Expr {
binary_expr(self, Operator::Modulus, other)
}
pub fn like(self, other: Expr) -> Expr {
binary_expr(self, Operator::Like, other)
}
pub fn not_like(self, other: Expr) -> Expr {
binary_expr(self, Operator::NotLike, other)
}
pub fn alias(self, name: &str) -> Expr {
Expr::Alias(Box::new(self), name.to_owned())
}
pub fn in_list(self, list: Vec<Expr>, negated: bool) -> Expr {
Expr::InList {
expr: Box::new(self),
list,
negated,
}
}
#[allow(clippy::wrong_self_convention)]
pub fn is_null(self) -> Expr {
Expr::IsNull(Box::new(self))
}
#[allow(clippy::wrong_self_convention)]
pub fn is_not_null(self) -> Expr {
Expr::IsNotNull(Box::new(self))
}
pub fn sort(self, asc: bool, nulls_first: bool) -> Expr {
Expr::Sort {
expr: Box::new(self),
asc,
nulls_first,
}
}
pub fn accept<V: ExpressionVisitor>(&self, visitor: V) -> Result<V> {
let visitor = match visitor.pre_visit(self)? {
Recursion::Continue(visitor) => visitor,
Recursion::Stop(visitor) => return Ok(visitor),
};
let visitor = match self {
Expr::Alias(expr, _) => expr.accept(visitor),
Expr::Column(_) => Ok(visitor),
Expr::ScalarVariable(..) => Ok(visitor),
Expr::Literal(..) => Ok(visitor),
Expr::BinaryExpr { left, right, .. } => {
let visitor = left.accept(visitor)?;
right.accept(visitor)
}
Expr::Not(expr) => expr.accept(visitor),
Expr::IsNotNull(expr) => expr.accept(visitor),
Expr::IsNull(expr) => expr.accept(visitor),
Expr::Negative(expr) => expr.accept(visitor),
Expr::Between {
expr, low, high, ..
} => {
let visitor = expr.accept(visitor)?;
let visitor = low.accept(visitor)?;
high.accept(visitor)
}
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let visitor = if let Some(expr) = expr.as_ref() {
expr.accept(visitor)
} else {
Ok(visitor)
}?;
let visitor = when_then_expr.iter().try_fold(
visitor,
|visitor, (when, then)| {
let visitor = when.accept(visitor)?;
then.accept(visitor)
},
)?;
if let Some(else_expr) = else_expr.as_ref() {
else_expr.accept(visitor)
} else {
Ok(visitor)
}
}
Expr::Cast { expr, .. } => expr.accept(visitor),
Expr::TryCast { expr, .. } => expr.accept(visitor),
Expr::Sort { expr, .. } => expr.accept(visitor),
Expr::ScalarFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::ScalarUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
let visitor = args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = partition_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = order_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
Ok(visitor)
}
Expr::AggregateFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::AggregateUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::InList { expr, list, .. } => {
let visitor = expr.accept(visitor)?;
list.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))
}
Expr::Wildcard => Ok(visitor),
}?;
visitor.post_visit(self)
}
pub fn rewrite<R>(self, rewriter: &mut R) -> Result<Self>
where
R: ExprRewriter,
{
if !rewriter.pre_visit(&self)? {
return Ok(self);
};
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(_) => self.clone(),
Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
Expr::Negative(expr) => Expr::Negative(rewrite_boxed(expr, rewriter)?),
Expr::Between {
expr,
low,
high,
negated,
} => Expr::Between {
expr: rewrite_boxed(expr, rewriter)?,
low: rewrite_boxed(low, rewriter)?,
high: rewrite_boxed(high, rewriter)?,
negated,
},
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let expr = rewrite_option_box(expr, rewriter)?;
let when_then_expr = when_then_expr
.into_iter()
.map(|(when, then)| {
Ok((
rewrite_boxed(when, rewriter)?,
rewrite_boxed(then, rewriter)?,
))
})
.collect::<Result<Vec<_>>>()?;
let else_expr = rewrite_option_box(else_expr, rewriter)?;
Expr::Case {
expr,
when_then_expr,
else_expr,
}
}
Expr::Cast { expr, data_type } => Expr::Cast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::TryCast { expr, data_type } => Expr::TryCast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::Sort {
expr,
asc,
nulls_first,
} => Expr::Sort {
expr: rewrite_boxed(expr, rewriter)?,
asc,
nulls_first,
},
Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::WindowFunction {
args,
fun,
partition_by,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
partition_by: rewrite_vec(partition_by, rewriter)?,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expr::AggregateFunction {
args,
fun,
distinct,
} => Expr::AggregateFunction {
args: rewrite_vec(args, rewriter)?,
fun,
distinct,
},
Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::InList {
expr,
list,
negated,
} => Expr::InList {
expr: rewrite_boxed(expr, rewriter)?,
list,
negated,
},
Expr::Wildcard => Expr::Wildcard,
};
rewriter.mutate(expr)
}
}
#[allow(clippy::boxed_local)]
fn rewrite_boxed<R>(boxed_expr: Box<Expr>, rewriter: &mut R) -> Result<Box<Expr>>
where
R: ExprRewriter,
{
let expr: Expr = *boxed_expr;
let rewritten_expr = expr.rewrite(rewriter)?;
Ok(Box::new(rewritten_expr))
}
fn rewrite_option_box<R>(
option_box: Option<Box<Expr>>,
rewriter: &mut R,
) -> Result<Option<Box<Expr>>>
where
R: ExprRewriter,
{
option_box
.map(|expr| rewrite_boxed(expr, rewriter))
.transpose()
}
fn rewrite_vec<R>(v: Vec<Expr>, rewriter: &mut R) -> Result<Vec<Expr>>
where
R: ExprRewriter,
{
v.into_iter().map(|expr| expr.rewrite(rewriter)).collect()
}
pub enum Recursion<V: ExpressionVisitor> {
Continue(V),
Stop(V),
}
pub trait ExpressionVisitor: Sized {
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>>;
fn post_visit(self, _expr: &Expr) -> Result<Self> {
Ok(self)
}
}
pub trait ExprRewriter: Sized {
fn pre_visit(&mut self, _expr: &Expr) -> Result<bool> {
Ok(true)
}
fn mutate(&mut self, expr: Expr) -> Result<Expr>;
}
pub struct CaseBuilder {
expr: Option<Box<Expr>>,
when_expr: Vec<Expr>,
then_expr: Vec<Expr>,
else_expr: Option<Box<Expr>>,
}
impl CaseBuilder {
pub fn when(&mut self, when: Expr, then: Expr) -> CaseBuilder {
self.when_expr.push(when);
self.then_expr.push(then);
CaseBuilder {
expr: self.expr.clone(),
when_expr: self.when_expr.clone(),
then_expr: self.then_expr.clone(),
else_expr: self.else_expr.clone(),
}
}
pub fn otherwise(&mut self, else_expr: Expr) -> Result<Expr> {
self.else_expr = Some(Box::new(else_expr));
self.build()
}
pub fn end(&self) -> Result<Expr> {
self.build()
}
}
impl CaseBuilder {
fn build(&self) -> Result<Expr> {
let mut then_expr = self.then_expr.clone();
if let Some(e) = &self.else_expr {
then_expr.push(e.as_ref().to_owned());
}
let then_types: Vec<DataType> = then_expr
.iter()
.map(|e| match e {
Expr::Literal(_) => e.get_type(&DFSchema::empty()),
_ => Ok(DataType::Null),
})
.collect::<Result<Vec<_>>>()?;
if then_types.contains(&DataType::Null) {
} else {
let unique_types: HashSet<&DataType> = then_types.iter().collect();
if unique_types.len() != 1 {
return Err(DataFusionError::Plan(format!(
"CASE expression 'then' values had multiple data types: {:?}",
unique_types
)));
}
}
Ok(Expr::Case {
expr: self.expr.clone(),
when_then_expr: self
.when_expr
.iter()
.zip(self.then_expr.iter())
.map(|(w, t)| (Box::new(w.clone()), Box::new(t.clone())))
.collect(),
else_expr: self.else_expr.clone(),
})
}
}
pub fn case(expr: Expr) -> CaseBuilder {
CaseBuilder {
expr: Some(Box::new(expr)),
when_expr: vec![],
then_expr: vec![],
else_expr: None,
}
}
pub fn when(when: Expr, then: Expr) -> CaseBuilder {
CaseBuilder {
expr: None,
when_expr: vec![when],
then_expr: vec![then],
else_expr: None,
}
}
pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(l),
op,
right: Box::new(r),
}
}
pub fn and(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(left),
op: Operator::And,
right: Box::new(right),
}
}
pub fn combine_filters(filters: &[Expr]) -> Option<Expr> {
if filters.is_empty() {
return None;
}
let combined_filter = filters
.iter()
.skip(1)
.fold(filters[0].clone(), |acc, filter| and(acc, filter.clone()));
Some(combined_filter)
}
pub fn or(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(left),
op: Operator::Or,
right: Box::new(right),
}
}
pub fn col(ident: &str) -> Expr {
Expr::Column(ident.into())
}
pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
match e {
Expr::Column(_) => e,
Expr::Alias(inner_expr, name) => {
Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name)
}
_ => match e.name(input_schema) {
Ok(name) => match input_schema.field_with_unqualified_name(&name) {
Ok(field) => Expr::Column(field.qualified_column()),
Err(_) => e,
},
Err(_) => e,
},
}
}
pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {
struct ColumnReplacer<'a> {
replace_map: &'a HashMap<&'a Column, &'a Column>,
}
impl<'a> ExprRewriter for ColumnReplacer<'a> {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
if let Expr::Column(c) = &expr {
match self.replace_map.get(c) {
Some(new_c) => Ok(Expr::Column((*new_c).to_owned())),
None => Ok(expr),
}
} else {
Ok(expr)
}
}
}
e.rewrite(&mut ColumnReplacer { replace_map })
}
pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
normalize_col_with_schemas(expr, &plan.all_schemas(), &plan.using_columns()?)
}
fn normalize_col_with_schemas(
expr: Expr,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
) -> Result<Expr> {
struct ColumnNormalizer<'a> {
schemas: &'a [&'a Arc<DFSchema>],
using_columns: &'a [HashSet<Column>],
}
impl<'a> ExprRewriter for ColumnNormalizer<'a> {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
if let Expr::Column(c) = expr {
Ok(Expr::Column(c.normalize_with_schemas(
self.schemas,
self.using_columns,
)?))
} else {
Ok(expr)
}
}
}
expr.rewrite(&mut ColumnNormalizer {
schemas,
using_columns,
})
}
#[inline]
pub fn normalize_cols(
exprs: impl IntoIterator<Item = Expr>,
plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
exprs.into_iter().map(|e| normalize_col(e, plan)).collect()
}
pub fn unnormalize_col(expr: Expr) -> Expr {
struct RemoveQualifier {}
impl ExprRewriter for RemoveQualifier {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
if let Expr::Column(col) = expr {
Ok(Expr::Column(Column {
relation: None,
name: col.name,
}))
} else {
Ok(expr)
}
}
}
expr.rewrite(&mut RemoveQualifier {})
.expect("Unnormalize is infallable")
}
#[inline]
pub fn unnormalize_cols(exprs: impl IntoIterator<Item = Expr>) -> Vec<Expr> {
exprs.into_iter().map(unnormalize_col).collect()
}
pub fn min(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Min,
distinct: false,
args: vec![expr],
}
}
pub fn max(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Max,
distinct: false,
args: vec![expr],
}
}
pub fn sum(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Sum,
distinct: false,
args: vec![expr],
}
}
pub fn avg(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Avg,
distinct: false,
args: vec![expr],
}
}
pub fn count(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Count,
distinct: false,
args: vec![expr],
}
}
pub fn count_distinct(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Count,
distinct: true,
args: vec![expr],
}
}
pub fn in_list(expr: Expr, list: Vec<Expr>, negated: bool) -> Expr {
Expr::InList {
expr: Box::new(expr),
list,
negated,
}
}
pub trait Literal {
fn lit(&self) -> Expr;
}
impl Literal for &str {
fn lit(&self) -> Expr {
Expr::Literal(ScalarValue::Utf8(Some((*self).to_owned())))
}
}
impl Literal for String {
fn lit(&self) -> Expr {
Expr::Literal(ScalarValue::Utf8(Some((*self).to_owned())))
}
}
impl Literal for ScalarValue {
fn lit(&self) -> Expr {
Expr::Literal(self.clone())
}
}
macro_rules! make_literal {
($TYPE:ty, $SCALAR:ident, $DOC: expr) => {
#[doc = $DOC]
impl Literal for $TYPE {
fn lit(&self) -> Expr {
Expr::Literal(ScalarValue::$SCALAR(Some(self.clone())))
}
}
};
}
make_literal!(bool, Boolean, "literal expression containing a bool");
make_literal!(f32, Float32, "literal expression containing an f32");
make_literal!(f64, Float64, "literal expression containing an f64");
make_literal!(i8, Int8, "literal expression containing an i8");
make_literal!(i16, Int16, "literal expression containing an i16");
make_literal!(i32, Int32, "literal expression containing an i32");
make_literal!(i64, Int64, "literal expression containing an i64");
make_literal!(u8, UInt8, "literal expression containing a u8");
make_literal!(u16, UInt16, "literal expression containing a u16");
make_literal!(u32, UInt32, "literal expression containing a u32");
make_literal!(u64, UInt64, "literal expression containing a u64");
pub fn lit<T: Literal>(n: T) -> Expr {
n.lit()
}
pub fn concat(args: &[Expr]) -> Expr {
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::Concat,
args: args.to_vec(),
}
}
pub fn concat_ws(sep: impl Into<String>, values: &[Expr]) -> Expr {
let mut args = vec![lit(sep.into())];
args.extend_from_slice(values);
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::ConcatWithSeparator,
args,
}
}
pub fn random() -> Expr {
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::Random,
args: vec![],
}
}
macro_rules! unary_scalar_expr {
($ENUM:ident, $FUNC:ident) => {
#[doc = "this scalar function is not documented yet"]
pub fn $FUNC(e: Expr) -> Expr {
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::$ENUM,
args: vec![e],
}
}
};
}
macro_rules! binary_scalar_expr {
($ENUM:ident, $FUNC:ident) => {
#[doc = "this scalar function is not documented yet"]
pub fn $FUNC(arg1: Expr, arg2: Expr) -> Expr {
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::$ENUM,
args: vec![arg1, arg2],
}
}
};
}
unary_scalar_expr!(Sqrt, sqrt);
unary_scalar_expr!(Sin, sin);
unary_scalar_expr!(Cos, cos);
unary_scalar_expr!(Tan, tan);
unary_scalar_expr!(Asin, asin);
unary_scalar_expr!(Acos, acos);
unary_scalar_expr!(Atan, atan);
unary_scalar_expr!(Floor, floor);
unary_scalar_expr!(Ceil, ceil);
unary_scalar_expr!(Now, now);
unary_scalar_expr!(Round, round);
unary_scalar_expr!(Trunc, trunc);
unary_scalar_expr!(Abs, abs);
unary_scalar_expr!(Signum, signum);
unary_scalar_expr!(Exp, exp);
unary_scalar_expr!(Log2, log2);
unary_scalar_expr!(Log10, log10);
unary_scalar_expr!(Ln, ln);
unary_scalar_expr!(Ascii, ascii);
unary_scalar_expr!(BitLength, bit_length);
unary_scalar_expr!(Btrim, btrim);
unary_scalar_expr!(CharacterLength, character_length);
unary_scalar_expr!(CharacterLength, length);
unary_scalar_expr!(Chr, chr);
unary_scalar_expr!(InitCap, initcap);
unary_scalar_expr!(Left, left);
unary_scalar_expr!(Lower, lower);
unary_scalar_expr!(Lpad, lpad);
unary_scalar_expr!(Ltrim, ltrim);
unary_scalar_expr!(MD5, md5);
unary_scalar_expr!(OctetLength, octet_length);
unary_scalar_expr!(RegexpMatch, regexp_match);
unary_scalar_expr!(RegexpReplace, regexp_replace);
unary_scalar_expr!(Replace, replace);
unary_scalar_expr!(Repeat, repeat);
unary_scalar_expr!(Reverse, reverse);
unary_scalar_expr!(Right, right);
unary_scalar_expr!(Rpad, rpad);
unary_scalar_expr!(Rtrim, rtrim);
unary_scalar_expr!(SHA224, sha224);
unary_scalar_expr!(SHA256, sha256);
unary_scalar_expr!(SHA384, sha384);
unary_scalar_expr!(SHA512, sha512);
unary_scalar_expr!(SplitPart, split_part);
unary_scalar_expr!(StartsWith, starts_with);
unary_scalar_expr!(Strpos, strpos);
unary_scalar_expr!(Substr, substr);
unary_scalar_expr!(ToHex, to_hex);
unary_scalar_expr!(Translate, translate);
unary_scalar_expr!(Trim, trim);
unary_scalar_expr!(Upper, upper);
binary_scalar_expr!(DatePart, date_part);
binary_scalar_expr!(DateTrunc, date_trunc);
pub fn array(args: Vec<Expr>) -> Expr {
Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::Array,
args,
}
}
pub fn create_udf(
name: &str,
input_types: Vec<DataType>,
return_type: Arc<DataType>,
fun: ScalarFunctionImplementation,
) -> ScalarUDF {
let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(return_type.clone()));
ScalarUDF::new(name, &Signature::Exact(input_types), &return_type, &fun)
}
#[allow(clippy::rc_buffer)]
pub fn create_udaf(
name: &str,
input_type: DataType,
return_type: Arc<DataType>,
accumulator: AccumulatorFunctionImplementation,
state_type: Arc<Vec<DataType>>,
) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(move |_| Ok(return_type.clone()));
let state_type: StateTypeFunction = Arc::new(move |_| Ok(state_type.clone()));
AggregateUDF::new(
name,
&Signature::Exact(vec![input_type]),
&return_type,
&accumulator,
&state_type,
)
}
fn fmt_function(
f: &mut fmt::Formatter,
fun: &str,
distinct: bool,
args: &[Expr],
) -> fmt::Result {
let args: Vec<String> = args.iter().map(|arg| format!("{:?}", arg)).collect();
let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};
write!(f, "{}({}{})", fun, distinct_str, args.join(", "))
}
impl fmt::Debug for Expr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
Expr::Column(c) => write!(f, "{}", c),
Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")),
Expr::Literal(v) => write!(f, "{:?}", v),
Expr::Case {
expr,
when_then_expr,
else_expr,
..
} => {
write!(f, "CASE ")?;
if let Some(e) = expr {
write!(f, "{:?} ", e)?;
}
for (w, t) in when_then_expr {
write!(f, "WHEN {:?} THEN {:?} ", w, t)?;
}
if let Some(e) = else_expr {
write!(f, "ELSE {:?} ", e)?;
}
write!(f, "END")
}
Expr::Cast { expr, data_type } => {
write!(f, "CAST({:?} AS {:?})", expr, data_type)
}
Expr::TryCast { expr, data_type } => {
write!(f, "TRY_CAST({:?} AS {:?})", expr, data_type)
}
Expr::Not(expr) => write!(f, "NOT {:?}", expr),
Expr::Negative(expr) => write!(f, "(- {:?})", expr),
Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {:?} {:?}", left, op, right)
}
Expr::Sort {
expr,
asc,
nulls_first,
} => {
if *asc {
write!(f, "{:?} ASC", expr)?;
} else {
write!(f, "{:?} DESC", expr)?;
}
if *nulls_first {
write!(f, " NULLS FIRST")
} else {
write!(f, " NULLS LAST")
}
}
Expr::ScalarFunction { fun, args, .. } => {
fmt_function(f, &fun.to_string(), false, args)
}
Expr::ScalarUDF { fun, ref args, .. } => {
fmt_function(f, &fun.name, false, args)
}
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
} => {
fmt_function(f, &fun.to_string(), false, args)?;
if !partition_by.is_empty() {
write!(f, " PARTITION BY {:?}", partition_by)?;
}
if !order_by.is_empty() {
write!(f, " ORDER BY {:?}", order_by)?;
}
if let Some(window_frame) = window_frame {
write!(
f,
" {} BETWEEN {} AND {}",
window_frame.units,
window_frame.start_bound,
window_frame.end_bound
)?;
}
Ok(())
}
Expr::AggregateFunction {
fun,
distinct,
ref args,
..
} => fmt_function(f, &fun.to_string(), *distinct, args),
Expr::AggregateUDF { fun, ref args, .. } => {
fmt_function(f, &fun.name, false, args)
}
Expr::Between {
expr,
negated,
low,
high,
} => {
if *negated {
write!(f, "{:?} NOT BETWEEN {:?} AND {:?}", expr, low, high)
} else {
write!(f, "{:?} BETWEEN {:?} AND {:?}", expr, low, high)
}
}
Expr::InList {
expr,
list,
negated,
} => {
if *negated {
write!(f, "{:?} NOT IN ({:?})", expr, list)
} else {
write!(f, "{:?} IN ({:?})", expr, list)
}
}
Expr::Wildcard => write!(f, "*"),
}
}
}
fn create_function_name(
fun: &str,
distinct: bool,
args: &[Expr],
input_schema: &DFSchema,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_name(e, input_schema))
.collect::<Result<_>>()?;
let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
}
fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
match e {
Expr::Alias(_, name) => Ok(name.clone()),
Expr::Column(c) => Ok(c.flat_name()),
Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{:?}", value)),
Expr::BinaryExpr { left, op, right } => {
let left = create_name(left, input_schema)?;
let right = create_name(right, input_schema)?;
Ok(format!("{} {:?} {}", left, op, right))
}
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let mut name = "CASE ".to_string();
if let Some(e) = expr {
name += &format!("{:?} ", e);
}
for (w, t) in when_then_expr {
name += &format!("WHEN {:?} THEN {:?} ", w, t);
}
if let Some(e) = else_expr {
name += &format!("ELSE {:?} ", e);
}
name += "END";
Ok(name)
}
Expr::Cast { expr, data_type } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("CAST({} AS {:?})", expr, data_type))
}
Expr::TryCast { expr, data_type } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("TRY_CAST({} AS {:?})", expr, data_type))
}
Expr::Not(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("NOT {}", expr))
}
Expr::Negative(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("(- {})", expr))
}
Expr::IsNull(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NULL", expr))
}
Expr::IsNotNull(expr) => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NOT NULL", expr))
}
Expr::ScalarFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args, input_schema)
}
Expr::ScalarUDF { fun, args, .. } => {
create_function_name(&fun.name, false, args, input_schema)
}
Expr::WindowFunction {
fun,
args,
window_frame,
partition_by,
order_by,
} => {
let mut parts: Vec<String> = vec![create_function_name(
&fun.to_string(),
false,
args,
input_schema,
)?];
if !partition_by.is_empty() {
parts.push(format!("PARTITION BY {:?}", partition_by));
}
if !order_by.is_empty() {
parts.push(format!("ORDER BY {:?}", order_by));
}
if let Some(window_frame) = window_frame {
parts.push(format!("{}", window_frame));
}
Ok(parts.join(" "))
}
Expr::AggregateFunction {
fun,
distinct,
args,
..
} => create_function_name(&fun.to_string(), *distinct, args, input_schema),
Expr::AggregateUDF { fun, args } => {
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(create_name(e, input_schema)?);
}
Ok(format!("{}({})", fun.name, names.join(",")))
}
Expr::InList {
expr,
list,
negated,
} => {
let expr = create_name(expr, input_schema)?;
let list = list.iter().map(|expr| create_name(expr, input_schema));
if *negated {
Ok(format!("{} NOT IN ({:?})", expr, list))
} else {
Ok(format!("{} IN ({:?})", expr, list))
}
}
other => Err(DataFusionError::NotImplemented(format!(
"Create name does not support logical expression {:?}",
other
))),
}
}
pub fn exprlist_to_fields<'a>(
expr: impl IntoIterator<Item = &'a Expr>,
input_schema: &DFSchema,
) -> Result<Vec<DFField>> {
expr.into_iter().map(|e| e.to_field(input_schema)).collect()
}
#[cfg(test)]
mod tests {
use super::super::{col, lit, when};
use super::*;
#[test]
fn case_when_same_literal_then_types() -> Result<()> {
let _ = when(col("state").eq(lit("CO")), lit(303))
.when(col("state").eq(lit("NY")), lit(212))
.end()?;
Ok(())
}
#[test]
fn case_when_different_literal_then_types() {
let maybe_expr = when(col("state").eq(lit("CO")), lit(303))
.when(col("state").eq(lit("NY")), lit("212"))
.end();
assert!(maybe_expr.is_err());
}
#[test]
fn rewriter_visit() {
let mut rewriter = RecordingRewriter::default();
col("state").eq(lit("CO")).rewrite(&mut rewriter).unwrap();
assert_eq!(
rewriter.v,
vec![
"Previsited #state Eq Utf8(\"CO\")",
"Previsited #state",
"Mutated #state",
"Previsited Utf8(\"CO\")",
"Mutated Utf8(\"CO\")",
"Mutated #state Eq Utf8(\"CO\")"
]
)
}
#[test]
fn filter_is_null_and_is_not_null() {
let col_null = col("col1");
let col_not_null = col("col2");
assert_eq!(format!("{:?}", col_null.is_null()), "#col1 IS NULL");
assert_eq!(
format!("{:?}", col_not_null.is_not_null()),
"#col2 IS NOT NULL"
);
}
#[derive(Default)]
struct RecordingRewriter {
v: Vec<String>,
}
impl ExprRewriter for RecordingRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
self.v.push(format!("Mutated {:?}", expr));
Ok(expr)
}
fn pre_visit(&mut self, expr: &Expr) -> Result<bool> {
self.v.push(format!("Previsited {:?}", expr));
Ok(true)
}
}
#[test]
fn rewriter_rewrite() {
let mut rewriter = FooBarRewriter {};
let rewritten = col("state").eq(lit("foo")).rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, col("state").eq(lit("bar")));
let rewritten = col("state").eq(lit("baz")).rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, col("state").eq(lit("baz")));
}
struct FooBarRewriter {}
impl ExprRewriter for FooBarRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::Literal(scalar) => {
if let ScalarValue::Utf8(Some(utf8_val)) = scalar {
let utf8_val = if utf8_val == "foo" {
"bar".to_string()
} else {
utf8_val
};
Ok(lit(utf8_val))
} else {
Ok(Expr::Literal(scalar))
}
}
expr => Ok(expr),
}
}
}
#[test]
fn normalize_cols() {
let expr = col("a") + col("b") + col("c");
let schema_a =
DFSchema::new(vec![make_field("tableA", "a"), make_field("tableA", "aa")])
.unwrap();
let schema_c =
DFSchema::new(vec![make_field("tableC", "cc"), make_field("tableC", "c")])
.unwrap();
let schema_b = DFSchema::new(vec![make_field("tableB", "b")]).unwrap();
let schema_f =
DFSchema::new(vec![make_field("tableC", "f"), make_field("tableC", "ff")])
.unwrap();
let schemas = vec![schema_c, schema_f, schema_b, schema_a]
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let schemas = schemas.iter().collect::<Vec<_>>();
let normalized_expr = normalize_col_with_schemas(expr, &schemas, &[]).unwrap();
assert_eq!(
normalized_expr,
col("tableA.a") + col("tableB.b") + col("tableC.c")
);
}
#[test]
fn normalize_cols_priority() {
let expr = col("a") + col("b");
let schema_a = DFSchema::new(vec![make_field("tableA", "a")]).unwrap();
let schema_b = DFSchema::new(vec![make_field("tableB", "b")]).unwrap();
let schema_a2 = DFSchema::new(vec![make_field("tableA2", "a")]).unwrap();
let schemas = vec![schema_a2, schema_b, schema_a]
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let schemas = schemas.iter().collect::<Vec<_>>();
let normalized_expr = normalize_col_with_schemas(expr, &schemas, &[]).unwrap();
assert_eq!(normalized_expr, col("tableA2.a") + col("tableB.b"));
}
#[test]
fn normalize_cols_non_exist() {
let expr = col("a") + col("b");
let schema_a = DFSchema::new(vec![make_field("tableA", "a")]).unwrap();
let schemas = vec![schema_a].into_iter().map(Arc::new).collect::<Vec<_>>();
let schemas = schemas.iter().collect::<Vec<_>>();
let error = normalize_col_with_schemas(expr, &schemas, &[])
.unwrap_err()
.to_string();
assert_eq!(
error,
"Error during planning: Column #b not found in provided schemas"
);
}
#[test]
fn unnormalize_cols() {
let expr = col("tableA.a") + col("tableB.b");
let unnormalized_expr = unnormalize_col(expr);
assert_eq!(unnormalized_expr, col("a") + col("b"));
}
fn make_field(relation: &str, column: &str) -> DFField {
DFField::new(Some(relation), column, DataType::Int8, false)
}
macro_rules! test_unary_scalar_expr {
($ENUM:ident, $FUNC:ident) => {{
if let Expr::ScalarFunction { fun, args } = $FUNC(col("tableA.a")) {
let name = functions::BuiltinScalarFunction::$ENUM;
assert_eq!(name, fun);
assert_eq!(1, args.len());
} else {
assert!(false, "unexpected");
}
}};
}
#[test]
fn scalar_function_definitions() {
test_unary_scalar_expr!(Sqrt, sqrt);
test_unary_scalar_expr!(Sin, sin);
test_unary_scalar_expr!(Cos, cos);
test_unary_scalar_expr!(Tan, tan);
test_unary_scalar_expr!(Asin, asin);
test_unary_scalar_expr!(Acos, acos);
test_unary_scalar_expr!(Atan, atan);
test_unary_scalar_expr!(Floor, floor);
test_unary_scalar_expr!(Ceil, ceil);
test_unary_scalar_expr!(Now, now);
test_unary_scalar_expr!(Round, round);
test_unary_scalar_expr!(Trunc, trunc);
test_unary_scalar_expr!(Abs, abs);
test_unary_scalar_expr!(Signum, signum);
test_unary_scalar_expr!(Exp, exp);
test_unary_scalar_expr!(Log2, log2);
test_unary_scalar_expr!(Log10, log10);
test_unary_scalar_expr!(Ln, ln);
test_unary_scalar_expr!(Ascii, ascii);
test_unary_scalar_expr!(BitLength, bit_length);
test_unary_scalar_expr!(Btrim, btrim);
test_unary_scalar_expr!(CharacterLength, character_length);
test_unary_scalar_expr!(CharacterLength, length);
test_unary_scalar_expr!(Chr, chr);
test_unary_scalar_expr!(InitCap, initcap);
test_unary_scalar_expr!(Left, left);
test_unary_scalar_expr!(Lower, lower);
test_unary_scalar_expr!(Lpad, lpad);
test_unary_scalar_expr!(Ltrim, ltrim);
test_unary_scalar_expr!(MD5, md5);
test_unary_scalar_expr!(OctetLength, octet_length);
test_unary_scalar_expr!(RegexpMatch, regexp_match);
test_unary_scalar_expr!(RegexpReplace, regexp_replace);
test_unary_scalar_expr!(Replace, replace);
test_unary_scalar_expr!(Repeat, repeat);
test_unary_scalar_expr!(Reverse, reverse);
test_unary_scalar_expr!(Right, right);
test_unary_scalar_expr!(Rpad, rpad);
test_unary_scalar_expr!(Rtrim, rtrim);
test_unary_scalar_expr!(SHA224, sha224);
test_unary_scalar_expr!(SHA256, sha256);
test_unary_scalar_expr!(SHA384, sha384);
test_unary_scalar_expr!(SHA512, sha512);
test_unary_scalar_expr!(SplitPart, split_part);
test_unary_scalar_expr!(StartsWith, starts_with);
test_unary_scalar_expr!(Strpos, strpos);
test_unary_scalar_expr!(Substr, substr);
test_unary_scalar_expr!(ToHex, to_hex);
test_unary_scalar_expr!(Translate, translate);
test_unary_scalar_expr!(Trim, trim);
test_unary_scalar_expr!(Upper, upper);
}
}