use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use crate::expr::create_name;
use crate::interval_arithmetic::Interval;
use crate::simplify::{ExprSimplifyResult, SimplifyInfo};
use crate::sort_properties::{ExprProperties, SortProperties};
use crate::{
ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature,
};
use arrow::datatypes::DataType;
use datafusion_common::{not_impl_err, ExprSchema, Result};
#[derive(Debug, Clone)]
pub struct ScalarUDF {
inner: Arc<dyn ScalarUDFImpl>,
}
impl PartialEq for ScalarUDF {
fn eq(&self, other: &Self) -> bool {
self.name() == other.name() && self.signature() == other.signature()
}
}
impl Eq for ScalarUDF {}
impl std::hash::Hash for ScalarUDF {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name().hash(state);
self.signature().hash(state);
}
}
impl ScalarUDF {
#[deprecated(since = "34.0.0", note = "please implement ScalarUDFImpl instead")]
pub fn new(
name: &str,
signature: &Signature,
return_type: &ReturnTypeFunction,
fun: &ScalarFunctionImplementation,
) -> Self {
Self::new_from_impl(ScalarUdfLegacyWrapper {
name: name.to_owned(),
signature: signature.clone(),
return_type: Arc::clone(return_type),
fun: Arc::clone(fun),
})
}
pub fn new_from_impl<F>(fun: F) -> ScalarUDF
where
F: ScalarUDFImpl + 'static,
{
Self {
inner: Arc::new(fun),
}
}
pub fn inner(&self) -> &Arc<dyn ScalarUDFImpl> {
&self.inner
}
pub fn with_aliases(self, aliases: impl IntoIterator<Item = &'static str>) -> Self {
Self::new_from_impl(AliasedScalarUDFImpl::new(Arc::clone(&self.inner), aliases))
}
pub fn call(&self, args: Vec<Expr>) -> Expr {
Expr::ScalarFunction(crate::expr::ScalarFunction::new_udf(
Arc::new(self.clone()),
args,
))
}
pub fn name(&self) -> &str {
self.inner.name()
}
pub fn display_name(&self, args: &[Expr]) -> Result<String> {
self.inner.display_name(args)
}
pub fn aliases(&self) -> &[String] {
self.inner.aliases()
}
pub fn signature(&self) -> &Signature {
self.inner.signature()
}
pub fn return_type_from_exprs(
&self,
args: &[Expr],
schema: &dyn ExprSchema,
arg_types: &[DataType],
) -> Result<DataType> {
self.inner.return_type_from_exprs(args, schema, arg_types)
}
pub fn simplify(
&self,
args: Vec<Expr>,
info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
self.inner.simplify(args, info)
}
pub fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
self.inner.invoke(args)
}
pub fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
self.inner.invoke_no_args(number_rows)
}
pub fn fun(&self) -> ScalarFunctionImplementation {
let captured = Arc::clone(&self.inner);
Arc::new(move |args| captured.invoke(args))
}
pub fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
pub fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
self.inner.evaluate_bounds(inputs)
}
pub fn propagate_constraints(
&self,
interval: &Interval,
inputs: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
self.inner.propagate_constraints(interval, inputs)
}
pub fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties> {
self.inner.output_ordering(inputs)
}
pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
}
impl<F> From<F> for ScalarUDF
where
F: ScalarUDFImpl + Send + Sync + 'static,
{
fn from(fun: F) -> Self {
Self::new_from_impl(fun)
}
}
pub trait ScalarUDFImpl: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn name(&self) -> &str;
fn display_name(&self, args: &[Expr]) -> Result<String> {
let names: Vec<String> = args.iter().map(create_name).collect::<Result<_>>()?;
Ok(format!("{}({})", self.name(), names.join(",")))
}
fn signature(&self) -> &Signature;
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
fn return_type_from_exprs(
&self,
_args: &[Expr],
_schema: &dyn ExprSchema,
arg_types: &[DataType],
) -> Result<DataType> {
self.return_type(arg_types)
}
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>;
fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
not_impl_err!(
"Function {} does not implement invoke_no_args but called",
self.name()
)
}
fn aliases(&self) -> &[String] {
&[]
}
fn simplify(
&self,
args: Vec<Expr>,
_info: &dyn SimplifyInfo,
) -> Result<ExprSimplifyResult> {
Ok(ExprSimplifyResult::Original(args))
}
fn short_circuits(&self) -> bool {
false
}
fn evaluate_bounds(&self, _input: &[&Interval]) -> Result<Interval> {
Interval::make_unbounded(&DataType::Null)
}
fn propagate_constraints(
&self,
_interval: &Interval,
_inputs: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
Ok(Some(vec![]))
}
fn output_ordering(&self, _inputs: &[ExprProperties]) -> Result<SortProperties> {
Ok(SortProperties::Unordered)
}
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}
}
#[derive(Debug)]
struct AliasedScalarUDFImpl {
inner: Arc<dyn ScalarUDFImpl>,
aliases: Vec<String>,
}
impl AliasedScalarUDFImpl {
pub fn new(
inner: Arc<dyn ScalarUDFImpl>,
new_aliases: impl IntoIterator<Item = &'static str>,
) -> Self {
let mut aliases = inner.aliases().to_vec();
aliases.extend(new_aliases.into_iter().map(|s| s.to_string()));
Self { inner, aliases }
}
}
impl ScalarUDFImpl for AliasedScalarUDFImpl {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
self.inner.name()
}
fn signature(&self) -> &Signature {
self.inner.signature()
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
self.inner.return_type(arg_types)
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
self.inner.invoke(args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
struct ScalarUdfLegacyWrapper {
name: String,
signature: Signature,
return_type: ReturnTypeFunction,
fun: ScalarFunctionImplementation,
}
impl Debug for ScalarUdfLegacyWrapper {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ScalarUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("fun", &"<FUNC>")
.finish()
}
}
impl ScalarUDFImpl for ScalarUdfLegacyWrapper {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let res = (self.return_type)(arg_types)?;
Ok(res.as_ref().clone())
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
(self.fun)(args)
}
fn aliases(&self) -> &[String] {
&[]
}
}