use crate::{
Expr, PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction, Signature,
WindowFrame,
};
use arrow::datatypes::DataType;
use datafusion_common::Result;
use std::{
any::Any,
fmt::{self, Debug, Display, Formatter},
sync::Arc,
};
#[derive(Debug, Clone)]
pub struct WindowUDF {
inner: Arc<dyn WindowUDFImpl>,
}
impl Display for WindowUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}
impl PartialEq for WindowUDF {
fn eq(&self, other: &Self) -> bool {
self.name() == other.name() && self.signature() == other.signature()
}
}
impl Eq for WindowUDF {}
impl std::hash::Hash for WindowUDF {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name().hash(state);
self.signature().hash(state);
}
}
impl WindowUDF {
#[deprecated(since = "34.0.0", note = "please implement WindowUDFImpl instead")]
pub fn new(
name: &str,
signature: &Signature,
return_type: &ReturnTypeFunction,
partition_evaluator_factory: &PartitionEvaluatorFactory,
) -> Self {
Self::new_from_impl(WindowUDFLegacyWrapper {
name: name.to_owned(),
signature: signature.clone(),
return_type: return_type.clone(),
partition_evaluator_factory: partition_evaluator_factory.clone(),
})
}
pub fn new_from_impl<F>(fun: F) -> WindowUDF
where
F: WindowUDFImpl + 'static,
{
Self {
inner: Arc::new(fun),
}
}
pub fn inner(&self) -> Arc<dyn WindowUDFImpl> {
self.inner.clone()
}
pub fn with_aliases(self, aliases: impl IntoIterator<Item = &'static str>) -> Self {
Self::new_from_impl(AliasedWindowUDFImpl::new(self.inner.clone(), aliases))
}
pub fn call(
&self,
args: Vec<Expr>,
partition_by: Vec<Expr>,
order_by: Vec<Expr>,
window_frame: WindowFrame,
) -> Expr {
let fun = crate::WindowFunctionDefinition::WindowUDF(Arc::new(self.clone()));
Expr::WindowFunction(crate::expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment: None,
})
}
pub fn name(&self) -> &str {
self.inner.name()
}
pub fn aliases(&self) -> &[String] {
self.inner.aliases()
}
pub fn signature(&self) -> &Signature {
self.inner.signature()
}
pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
self.inner.return_type(args)
}
pub fn partition_evaluator_factory(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}
}
impl<F> From<F> for WindowUDF
where
F: WindowUDFImpl + Send + Sync + 'static,
{
fn from(fun: F) -> Self {
Self::new_from_impl(fun)
}
}
pub trait WindowUDFImpl: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn name(&self) -> &str;
fn signature(&self) -> &Signature;
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
fn aliases(&self) -> &[String] {
&[]
}
}
#[derive(Debug)]
struct AliasedWindowUDFImpl {
inner: Arc<dyn WindowUDFImpl>,
aliases: Vec<String>,
}
impl AliasedWindowUDFImpl {
pub fn new(
inner: Arc<dyn WindowUDFImpl>,
new_aliases: impl IntoIterator<Item = &'static str>,
) -> Self {
let mut aliases = inner.aliases().to_vec();
aliases.extend(new_aliases.into_iter().map(|s| s.to_string()));
Self { inner, aliases }
}
}
impl WindowUDFImpl for AliasedWindowUDFImpl {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
self.inner.name()
}
fn signature(&self) -> &Signature {
self.inner.signature()
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
self.inner.return_type(arg_types)
}
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
pub struct WindowUDFLegacyWrapper {
name: String,
signature: Signature,
return_type: ReturnTypeFunction,
partition_evaluator_factory: PartitionEvaluatorFactory,
}
impl Debug for WindowUDFLegacyWrapper {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("WindowUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("return_type", &"<func>")
.field("partition_evaluator_factory", &"<func>")
.finish_non_exhaustive()
}
}
impl WindowUDFImpl for WindowUDFLegacyWrapper {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let res = (self.return_type)(arg_types)?;
Ok(res.as_ref().clone())
}
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
(self.partition_evaluator_factory)()
}
}