use arrow::compute::SortOptions;
use arrow::datatypes::DataType;
use std::cmp::Ordering;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{
any::Any,
fmt::{self, Debug, Display, Formatter},
sync::Arc,
};
use datafusion_common::{not_impl_err, Result};
use crate::expr::WindowFunction;
use crate::{
function::WindowFunctionSimplification, Expr, PartitionEvaluator,
PartitionEvaluatorFactory, ReturnTypeFunction, Signature,
};
#[derive(Debug, Clone, PartialOrd)]
pub struct WindowUDF {
inner: Arc<dyn WindowUDFImpl>,
}
impl Display for WindowUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}
impl PartialEq for WindowUDF {
fn eq(&self, other: &Self) -> bool {
self.inner.equals(other.inner.as_ref())
}
}
impl Eq for WindowUDF {}
impl Hash for WindowUDF {
fn hash<H: Hasher>(&self, state: &mut H) {
self.inner.hash_value().hash(state)
}
}
impl WindowUDF {
#[deprecated(since = "34.0.0", note = "please implement WindowUDFImpl instead")]
pub fn new(
name: &str,
signature: &Signature,
return_type: &ReturnTypeFunction,
partition_evaluator_factory: &PartitionEvaluatorFactory,
) -> Self {
Self::new_from_impl(WindowUDFLegacyWrapper {
name: name.to_owned(),
signature: signature.clone(),
return_type: Arc::clone(return_type),
partition_evaluator_factory: Arc::clone(partition_evaluator_factory),
})
}
pub fn new_from_impl<F>(fun: F) -> WindowUDF
where
F: WindowUDFImpl + 'static,
{
Self {
inner: Arc::new(fun),
}
}
pub fn inner(&self) -> &Arc<dyn WindowUDFImpl> {
&self.inner
}
pub fn with_aliases(self, aliases: impl IntoIterator<Item = &'static str>) -> Self {
Self::new_from_impl(AliasedWindowUDFImpl::new(Arc::clone(&self.inner), aliases))
}
pub fn call(&self, args: Vec<Expr>) -> Expr {
let fun = crate::WindowFunctionDefinition::WindowUDF(Arc::new(self.clone()));
Expr::WindowFunction(WindowFunction::new(fun, args))
}
pub fn name(&self) -> &str {
self.inner.name()
}
pub fn aliases(&self) -> &[String] {
self.inner.aliases()
}
pub fn signature(&self) -> &Signature {
self.inner.signature()
}
pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
self.inner.return_type(args)
}
pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
self.inner.simplify()
}
pub fn partition_evaluator_factory(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}
pub fn nullable(&self) -> bool {
self.inner.nullable()
}
pub fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}
pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
}
impl<F> From<F> for WindowUDF
where
F: WindowUDFImpl + Send + Sync + 'static,
{
fn from(fun: F) -> Self {
Self::new_from_impl(fun)
}
}
pub trait WindowUDFImpl: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn name(&self) -> &str;
fn signature(&self) -> &Signature;
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
fn aliases(&self) -> &[String] {
&[]
}
fn simplify(&self) -> Option<WindowFunctionSimplification> {
None
}
fn equals(&self, other: &dyn WindowUDFImpl) -> bool {
self.name() == other.name() && self.signature() == other.signature()
}
fn hash_value(&self) -> u64 {
let hasher = &mut DefaultHasher::new();
self.name().hash(hasher);
self.signature().hash(hasher);
hasher.finish()
}
fn nullable(&self) -> bool {
true
}
fn sort_options(&self) -> Option<SortOptions> {
None
}
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}
}
impl PartialEq for dyn WindowUDFImpl {
fn eq(&self, other: &Self) -> bool {
self.equals(other)
}
}
impl PartialOrd for dyn WindowUDFImpl {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name().partial_cmp(other.name()) {
Some(Ordering::Equal) => self.signature().partial_cmp(other.signature()),
cmp => cmp,
}
}
}
#[derive(Debug)]
struct AliasedWindowUDFImpl {
inner: Arc<dyn WindowUDFImpl>,
aliases: Vec<String>,
}
impl AliasedWindowUDFImpl {
pub fn new(
inner: Arc<dyn WindowUDFImpl>,
new_aliases: impl IntoIterator<Item = &'static str>,
) -> Self {
let mut aliases = inner.aliases().to_vec();
aliases.extend(new_aliases.into_iter().map(|s| s.to_string()));
Self { inner, aliases }
}
}
impl WindowUDFImpl for AliasedWindowUDFImpl {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
self.inner.name()
}
fn signature(&self) -> &Signature {
self.inner.signature()
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
self.inner.return_type(arg_types)
}
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn simplify(&self) -> Option<WindowFunctionSimplification> {
self.inner.simplify()
}
fn equals(&self, other: &dyn WindowUDFImpl) -> bool {
if let Some(other) = other.as_any().downcast_ref::<AliasedWindowUDFImpl>() {
self.inner.equals(other.inner.as_ref()) && self.aliases == other.aliases
} else {
false
}
}
fn hash_value(&self) -> u64 {
let hasher = &mut DefaultHasher::new();
self.inner.hash_value().hash(hasher);
self.aliases.hash(hasher);
hasher.finish()
}
fn nullable(&self) -> bool {
self.inner.nullable()
}
fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
}
pub struct WindowUDFLegacyWrapper {
name: String,
signature: Signature,
return_type: ReturnTypeFunction,
partition_evaluator_factory: PartitionEvaluatorFactory,
}
impl Debug for WindowUDFLegacyWrapper {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("WindowUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("return_type", &"<func>")
.field("partition_evaluator_factory", &"<func>")
.finish_non_exhaustive()
}
}
impl WindowUDFImpl for WindowUDFLegacyWrapper {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let res = (self.return_type)(arg_types)?;
Ok(res.as_ref().clone())
}
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
(self.partition_evaluator_factory)()
}
}
#[cfg(test)]
mod test {
use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl};
use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_expr_common::signature::{Signature, Volatility};
use std::any::Any;
use std::cmp::Ordering;
#[derive(Debug, Clone)]
struct AWindowUDF {
signature: Signature,
}
impl AWindowUDF {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Int32],
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for AWindowUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"a"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
unimplemented!()
}
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!()
}
}
#[derive(Debug, Clone)]
struct BWindowUDF {
signature: Signature,
}
impl BWindowUDF {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Int32],
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for BWindowUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"b"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
unimplemented!()
}
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!()
}
}
#[test]
fn test_partial_ord() {
let a1 = WindowUDF::from(AWindowUDF::new());
let a2 = WindowUDF::from(AWindowUDF::new());
assert_eq!(a1.partial_cmp(&a2), Some(Ordering::Equal));
let b1 = WindowUDF::from(BWindowUDF::new());
assert!(a1 < b1);
assert!(!(a1 == b1));
}
}