macro_rules! export_functions {
($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => {
$(
// switch to single-function cases below
export_functions!(single $FUNC, $DOC, $($arg)*);
)*
};
(single $FUNC:ident, $DOC:expr, $arg:ident,) => {
#[doc = $DOC]
pub fn $FUNC($arg: Vec<datafusion_expr::Expr>) -> datafusion_expr::Expr {
super::$FUNC().call($arg)
}
};
(single $FUNC:ident, $DOC:expr, $($arg:ident)*) => {
#[doc = $DOC]
pub fn $FUNC($($arg: datafusion_expr::Expr),*) -> datafusion_expr::Expr {
super::$FUNC().call(vec![$($arg),*])
}
};
}
macro_rules! make_udf_function {
($UDF:ty, $NAME:ident) => {
#[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))]
pub fn $NAME() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
static INSTANCE: std::sync::LazyLock<
std::sync::Arc<datafusion_expr::ScalarUDF>,
> = std::sync::LazyLock::new(|| {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new(),
))
});
std::sync::Arc::clone(&INSTANCE)
}
};
}
macro_rules! make_stub_package {
($name:ident, $feature:literal) => {
#[cfg(not(feature = $feature))]
#[doc = concat!("Disabled. Enable via feature flag `", $feature, "`")]
pub mod $name {
use datafusion_expr::ScalarUDF;
use log::debug;
use std::sync::Arc;
pub fn functions() -> Vec<Arc<ScalarUDF>> {
debug!("{} functions disabled", stringify!($name));
vec![]
}
}
};
}
#[macro_export]
macro_rules! downcast_named_arg {
($ARG:expr, $NAME:expr, $ARRAY_TYPE:ident) => {{
$ARG.as_any().downcast_ref::<$ARRAY_TYPE>().ok_or_else(|| {
internal_datafusion_err!(
"could not cast {} to {}",
$NAME,
std::any::type_name::<$ARRAY_TYPE>()
)
})?
}};
}
#[macro_export]
macro_rules! downcast_arg {
($ARG:expr, $ARRAY_TYPE:ident) => {{
downcast_named_arg!($ARG, "", $ARRAY_TYPE)
}};
}
macro_rules! make_math_unary_udf {
($UDF:ident, $NAME:ident, $UNARY_FUNC:ident, $OUTPUT_ORDERING:expr, $EVALUATE_BOUNDS:expr, $GET_DOC:expr) => {
make_udf_function!($NAME::$UDF, $NAME);
mod $NAME {
use std::any::Any;
use std::sync::Arc;
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{DataType, Float32Type, Float64Type};
use datafusion_common::{exec_err, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
#[derive(Debug)]
pub struct $UDF {
signature: Signature,
}
impl $UDF {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::uniform(
1,
vec![Float64, Float32],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for $UDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
stringify!($NAME)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let arg_type = &arg_types[0];
match arg_type {
DataType::Float32 => Ok(DataType::Float32),
_ => Ok(DataType::Float64),
}
}
fn output_ordering(
&self,
input: &[ExprProperties],
) -> Result<SortProperties> {
$OUTPUT_ORDERING(input)
}
fn evaluate_bounds(&self, inputs: &[&Interval]) -> Result<Interval> {
$EVALUATE_BOUNDS(inputs)
}
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(
args[0]
.as_primitive::<Float64Type>()
.unary::<_, Float64Type>(|x: f64| f64::$UNARY_FUNC(x)),
) as ArrayRef,
DataType::Float32 => Arc::new(
args[0]
.as_primitive::<Float32Type>()
.unary::<_, Float32Type>(|x: f32| f32::$UNARY_FUNC(x)),
) as ArrayRef,
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
self.name()
)
}
};
Ok(ColumnarValue::Array(arr))
}
fn documentation(&self) -> Option<&Documentation> {
Some($GET_DOC())
}
}
}
};
}
macro_rules! make_math_binary_udf {
($UDF:ident, $NAME:ident, $BINARY_FUNC:ident, $OUTPUT_ORDERING:expr, $GET_DOC:expr) => {
make_udf_function!($NAME::$UDF, $NAME);
mod $NAME {
use std::any::Any;
use std::sync::Arc;
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{DataType, Float32Type, Float64Type};
use datafusion_common::{exec_err, Result};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::TypeSignature;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
#[derive(Debug)]
pub struct $UDF {
signature: Signature,
}
impl $UDF {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![Float32, Float32]),
TypeSignature::Exact(vec![Float64, Float64]),
],
Volatility::Immutable,
),
}
}
}
impl ScalarUDFImpl for $UDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
stringify!($NAME)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let arg_type = &arg_types[0];
match arg_type {
DataType::Float32 => Ok(DataType::Float32),
_ => Ok(DataType::Float64),
}
}
fn output_ordering(
&self,
input: &[ExprProperties],
) -> Result<SortProperties> {
$OUTPUT_ORDERING(input)
}
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
let y = args[0].as_primitive::<Float64Type>();
let x = args[1].as_primitive::<Float64Type>();
let result = arrow::compute::binary::<_, _, _, Float64Type>(
y,
x,
|y, x| f64::$BINARY_FUNC(y, x),
)?;
Arc::new(result) as _
}
DataType::Float32 => {
let y = args[0].as_primitive::<Float32Type>();
let x = args[1].as_primitive::<Float32Type>();
let result = arrow::compute::binary::<_, _, _, Float32Type>(
y,
x,
|y, x| f32::$BINARY_FUNC(y, x),
)?;
Arc::new(result) as _
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {}",
self.name()
)
}
};
Ok(ColumnarValue::Array(arr))
}
fn documentation(&self) -> Option<&Documentation> {
Some($GET_DOC())
}
}
}
};
}