macro_rules! export_functions {
($(($FUNC:ident, $($arg:ident)*, $DOC:expr)),*) => {
pub mod expr_fn {
$(
#[doc = $DOC]
/// Return $name(arg)
pub fn $FUNC($($arg: datafusion_expr::Expr),*) -> datafusion_expr::Expr {
super::$FUNC().call(vec![$($arg),*],)
}
)*
}
pub fn functions() -> Vec<std::sync::Arc<datafusion_expr::ScalarUDF>> {
vec![
$(
$FUNC(),
)*
]
}
};
}
macro_rules! make_udf_function {
($UDF:ty, $GNAME:ident, $NAME:ident) => {
static $GNAME: std::sync::OnceLock<std::sync::Arc<datafusion_expr::ScalarUDF>> =
std::sync::OnceLock::new();
pub fn $NAME() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
$GNAME
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new(),
))
})
.clone()
}
};
}
macro_rules! make_stub_package {
($name:ident, $feature:literal) => {
#[cfg(not(feature = $feature))]
#[doc = concat!("Disabled. Enable via feature flag `", $feature, "`")]
pub mod $name {
use datafusion_expr::ScalarUDF;
use log::debug;
use std::sync::Arc;
pub fn functions() -> Vec<Arc<ScalarUDF>> {
debug!("{} functions disabled", stringify!($name));
vec![]
}
}
};
}
macro_rules! make_function_scalar_inputs_return_type {
($ARG: expr, $NAME:expr, $ARG_TYPE:ident, $RETURN_TYPE:ident, $FUNC: block) => {{
let arg = downcast_arg!($ARG, $NAME, $ARG_TYPE);
arg.iter()
.map(|a| match a {
Some(a) => Some($FUNC(a)),
_ => None,
})
.collect::<$RETURN_TYPE>()
}};
}
macro_rules! downcast_arg {
($ARG:expr, $NAME:expr, $ARRAY_TYPE:ident) => {{
$ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| {
DataFusionError::Internal(format!(
"could not cast {} to {}",
$NAME,
std::any::type_name::<$ARRAY_TYPE>()
))
})?
}};
}
macro_rules! make_math_unary_udf {
($UDF:ident, $GNAME:ident, $NAME:ident, $UNARY_FUNC:ident) => {
make_udf_function!($NAME::$UDF, $GNAME, $NAME);
mod $NAME {
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;
#[derive(Debug)]
pub struct $UDF {
signature: Signature,
}
impl $UDF {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::uniform(
1,
vec![Float64, Float32],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for $UDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
stringify!($NAME)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let arg_type = &arg_types[0];
match arg_type {
DataType::Float32 => Ok(DataType::Float32),
_ => Ok(DataType::Float64),
}
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
self.name(),
Float64Array,
Float64Array,
{ f64::$UNARY_FUNC }
))
}
DataType::Float32 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
self.name(),
Float32Array,
Float32Array,
{ f32::$UNARY_FUNC }
))
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
self.name()
)
}
};
Ok(ColumnarValue::Array(arr))
}
}
}
};
}