use crate::{Accumulator, Expr};
use crate::{
AccumulatorFactoryFunction, ReturnTypeFunction, Signature, StateTypeFunction,
};
use arrow::datatypes::DataType;
use datafusion_common::Result;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
#[derive(Clone)]
pub struct AggregateUDF {
name: String,
signature: Signature,
return_type: ReturnTypeFunction,
accumulator: AccumulatorFactoryFunction,
state_type: StateTypeFunction,
}
impl Debug for AggregateUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("AggregateUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("fun", &"<FUNC>")
.finish()
}
}
impl PartialEq for AggregateUDF {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.signature == other.signature
}
}
impl Eq for AggregateUDF {}
impl std::hash::Hash for AggregateUDF {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.signature.hash(state);
}
}
impl AggregateUDF {
pub fn new(
name: &str,
signature: &Signature,
return_type: &ReturnTypeFunction,
accumulator: &AccumulatorFactoryFunction,
state_type: &StateTypeFunction,
) -> Self {
Self {
name: name.to_owned(),
signature: signature.clone(),
return_type: return_type.clone(),
accumulator: accumulator.clone(),
state_type: state_type.clone(),
}
}
pub fn call(&self, args: Vec<Expr>) -> Expr {
Expr::AggregateFunction(crate::expr::AggregateFunction::new_udf(
Arc::new(self.clone()),
args,
false,
None,
None,
))
}
pub fn name(&self) -> &str {
&self.name
}
pub fn signature(&self) -> &Signature {
&self.signature
}
pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
let res = (self.return_type)(args)?;
Ok(res.as_ref().clone())
}
pub fn accumulator(&self, return_type: &DataType) -> Result<Box<dyn Accumulator>> {
(self.accumulator)(return_type)
}
pub fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>> {
let res = (self.state_type)(return_type)?;
Ok(Arc::try_unwrap(res).unwrap_or_else(|res| res.as_ref().clone()))
}
}