macro_rules! export_functions {
($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => {
$(
// switch to single-function cases below
export_functions!(single $FUNC, $DOC, $($arg)*);
)*
};
(single $FUNC:ident, $DOC:expr, $arg:ident,) => {
#[doc = $DOC]
pub fn $FUNC($arg: Vec<datafusion_expr::Expr>) -> datafusion_expr::Expr {
super::$FUNC().call($arg)
}
};
(single $FUNC:ident, $DOC:expr, $($arg:ident)*) => {
#[doc = $DOC]
pub fn $FUNC($($arg: datafusion_expr::Expr),*) -> datafusion_expr::Expr {
super::$FUNC().call(vec![$($arg),*])
}
};
}
macro_rules! make_udf_function {
($UDF:ty, $GNAME:ident, $NAME:ident) => {
static $GNAME: std::sync::OnceLock<std::sync::Arc<datafusion_expr::ScalarUDF>> =
std::sync::OnceLock::new();
#[doc = "Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) for "]
#[doc = stringify!($UDF)]
pub fn $NAME() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
$GNAME
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new(),
))
})
.clone()
}
};
}
macro_rules! make_stub_package {
($name:ident, $feature:literal) => {
#[cfg(not(feature = $feature))]
#[doc = concat!("Disabled. Enable via feature flag `", $feature, "`")]
pub mod $name {
use datafusion_expr::ScalarUDF;
use log::debug;
use std::sync::Arc;
pub fn functions() -> Vec<Arc<ScalarUDF>> {
debug!("{} functions disabled", stringify!($name));
vec![]
}
}
};
}
macro_rules! make_function_scalar_inputs_return_type {
($ARG: expr, $NAME:expr, $ARG_TYPE:ident, $RETURN_TYPE:ident, $FUNC: block) => {{
let arg = downcast_arg!($ARG, $NAME, $ARG_TYPE);
arg.iter()
.map(|a| match a {
Some(a) => Some($FUNC(a)),
_ => None,
})
.collect::<$RETURN_TYPE>()
}};
}
macro_rules! downcast_arg {
($ARG:expr, $NAME:expr, $ARRAY_TYPE:ident) => {{
$ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| {
DataFusionError::Internal(format!(
"could not cast {} to {}",
$NAME,
std::any::type_name::<$ARRAY_TYPE>()
))
})?
}};
}
macro_rules! make_math_unary_udf {
($UDF:ident, $GNAME:ident, $NAME:ident, $UNARY_FUNC:ident, $OUTPUT_ORDERING:expr, $EVALUATE_BOUNDS:expr) => {
make_udf_function!($NAME::$UDF, $GNAME, $NAME);
mod $NAME {
use std::any::Any;
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
pub struct $UDF {
signature: Signature,
}
impl $UDF {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::uniform(
1,
vec![Float64, Float32],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for $UDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
stringify!($NAME)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let arg_type = &arg_types[0];
match arg_type {
DataType::Float32 => Ok(DataType::Float32),
_ => Ok(DataType::Float64),
}
}
fn output_ordering(
&self,
input: &[ExprProperties],
) -> Result<SortProperties> {
$OUTPUT_ORDERING(input)
}
fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
$EVALUATE_BOUNDS(inputs)
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
self.name(),
Float64Array,
Float64Array,
{ f64::$UNARY_FUNC }
))
}
DataType::Float32 => {
Arc::new(make_function_scalar_inputs_return_type!(
&args[0],
self.name(),
Float32Array,
Float32Array,
{ f32::$UNARY_FUNC }
))
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
self.name()
)
}
};
Ok(ColumnarValue::Array(arr))
}
}
}
};
}
macro_rules! make_math_binary_udf {
($UDF:ident, $GNAME:ident, $NAME:ident, $BINARY_FUNC:ident, $OUTPUT_ORDERING:expr) => {
make_udf_function!($NAME::$UDF, $GNAME, $NAME);
mod $NAME {
use std::any::Any;
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, Float64Array};
use arrow::datatypes::DataType;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
pub struct $UDF {
signature: Signature,
}
impl $UDF {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![
Exact(vec![Float32, Float32]),
Exact(vec![Float64, Float64]),
],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for $UDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
stringify!($NAME)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let arg_type = &arg_types[0];
match arg_type {
DataType::Float32 => Ok(DataType::Float32),
_ => Ok(DataType::Float64),
}
}
fn output_ordering(
&self,
input: &[ExprProperties],
) -> Result<SortProperties> {
$OUTPUT_ORDERING(input)
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"y",
"x",
Float64Array,
{ f64::$BINARY_FUNC }
)),
DataType::Float32 => Arc::new(make_function_inputs2!(
&args[0],
&args[1],
"y",
"x",
Float32Array,
{ f32::$BINARY_FUNC }
)),
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
self.name()
)
}
};
Ok(ColumnarValue::Array(arr))
}
}
}
};
}
macro_rules! make_function_scalar_inputs {
($ARG: expr, $NAME:expr, $ARRAY_TYPE:ident, $FUNC: block) => {{
let arg = downcast_arg!($ARG, $NAME, $ARRAY_TYPE);
arg.iter()
.map(|a| match a {
Some(a) => Some($FUNC(a)),
_ => None,
})
.collect::<$ARRAY_TYPE>()
}};
}
macro_rules! make_function_inputs2 {
($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE:ident, $FUNC: block) => {{
let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE);
let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE);
arg1.iter()
.zip(arg2.iter())
.map(|(a1, a2)| match (a1, a2) {
(Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)),
_ => None,
})
.collect::<$ARRAY_TYPE>()
}};
($ARG1: expr, $ARG2: expr, $NAME1:expr, $NAME2: expr, $ARRAY_TYPE1:ident, $ARRAY_TYPE2:ident, $FUNC: block) => {{
let arg1 = downcast_arg!($ARG1, $NAME1, $ARRAY_TYPE1);
let arg2 = downcast_arg!($ARG2, $NAME2, $ARRAY_TYPE2);
arg1.iter()
.zip(arg2.iter())
.map(|(a1, a2)| match (a1, a2) {
(Some(a1), Some(a2)) => Some($FUNC(a1, a2.try_into().ok()?)),
_ => None,
})
.collect::<$ARRAY_TYPE1>()
}};
}