use arrow::compute::SortOptions;
use std::cmp::Ordering;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{
any::Any,
fmt::{self, Debug, Display, Formatter},
sync::Arc,
};
use arrow::datatypes::{DataType, FieldRef};
use crate::expr::WindowFunction;
use crate::{
function::WindowFunctionSimplification, Expr, PartitionEvaluator, Signature,
};
use datafusion_common::{not_impl_err, Result};
use datafusion_doc::Documentation;
use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
#[derive(Debug, Clone, PartialOrd)]
pub struct WindowUDF {
inner: Arc<dyn WindowUDFImpl>,
}
impl Display for WindowUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}
impl PartialEq for WindowUDF {
fn eq(&self, other: &Self) -> bool {
self.inner.equals(other.inner.as_ref())
}
}
impl Eq for WindowUDF {}
impl Hash for WindowUDF {
fn hash<H: Hasher>(&self, state: &mut H) {
self.inner.hash_value().hash(state)
}
}
impl WindowUDF {
pub fn new_from_impl<F>(fun: F) -> WindowUDF
where
F: WindowUDFImpl + 'static,
{
Self::new_from_shared_impl(Arc::new(fun))
}
pub fn new_from_shared_impl(fun: Arc<dyn WindowUDFImpl>) -> WindowUDF {
Self { inner: fun }
}
pub fn inner(&self) -> &Arc<dyn WindowUDFImpl> {
&self.inner
}
pub fn with_aliases(self, aliases: impl IntoIterator<Item = &'static str>) -> Self {
Self::new_from_impl(AliasedWindowUDFImpl::new(Arc::clone(&self.inner), aliases))
}
pub fn call(&self, args: Vec<Expr>) -> Expr {
let fun = crate::WindowFunctionDefinition::WindowUDF(Arc::new(self.clone()));
Expr::from(WindowFunction::new(fun, args))
}
pub fn name(&self) -> &str {
self.inner.name()
}
pub fn aliases(&self) -> &[String] {
self.inner.aliases()
}
pub fn signature(&self) -> &Signature {
self.inner.signature()
}
pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
self.inner.simplify()
}
pub fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
self.inner.expressions(expr_args)
}
pub fn partition_evaluator_factory(
&self,
partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator(partition_evaluator_args)
}
pub fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
self.inner.field(field_args)
}
pub fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}
pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
pub fn reverse_expr(&self) -> ReversedUDWF {
self.inner.reverse_expr()
}
pub fn documentation(&self) -> Option<&Documentation> {
self.inner.documentation()
}
}
impl<F> From<F> for WindowUDF
where
F: WindowUDFImpl + Send + Sync + 'static,
{
fn from(fun: F) -> Self {
Self::new_from_impl(fun)
}
}
pub trait WindowUDFImpl: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn name(&self) -> &str;
fn signature(&self) -> &Signature;
fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
expr_args.input_exprs().into()
}
fn partition_evaluator(
&self,
partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>>;
fn aliases(&self) -> &[String] {
&[]
}
fn simplify(&self) -> Option<WindowFunctionSimplification> {
None
}
fn equals(&self, other: &dyn WindowUDFImpl) -> bool {
self.name() == other.name() && self.signature() == other.signature()
}
fn hash_value(&self) -> u64 {
let hasher = &mut DefaultHasher::new();
self.name().hash(hasher);
self.signature().hash(hasher);
hasher.finish()
}
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef>;
fn sort_options(&self) -> Option<SortOptions> {
None
}
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}
fn reverse_expr(&self) -> ReversedUDWF {
ReversedUDWF::NotSupported
}
fn documentation(&self) -> Option<&Documentation> {
None
}
}
pub enum ReversedUDWF {
Identical,
NotSupported,
Reversed(Arc<WindowUDF>),
}
impl PartialEq for dyn WindowUDFImpl {
fn eq(&self, other: &Self) -> bool {
self.equals(other)
}
}
impl PartialOrd for dyn WindowUDFImpl {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name().partial_cmp(other.name()) {
Some(Ordering::Equal) => self.signature().partial_cmp(other.signature()),
cmp => cmp,
}
}
}
#[derive(Debug)]
struct AliasedWindowUDFImpl {
inner: Arc<dyn WindowUDFImpl>,
aliases: Vec<String>,
}
impl AliasedWindowUDFImpl {
pub fn new(
inner: Arc<dyn WindowUDFImpl>,
new_aliases: impl IntoIterator<Item = &'static str>,
) -> Self {
let mut aliases = inner.aliases().to_vec();
aliases.extend(new_aliases.into_iter().map(|s| s.to_string()));
Self { inner, aliases }
}
}
impl WindowUDFImpl for AliasedWindowUDFImpl {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
self.inner.name()
}
fn signature(&self) -> &Signature {
self.inner.signature()
}
fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
expr_args
.input_exprs()
.first()
.map_or(vec![], |expr| vec![Arc::clone(expr)])
}
fn partition_evaluator(
&self,
partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator(partition_evaluator_args)
}
fn aliases(&self) -> &[String] {
&self.aliases
}
fn simplify(&self) -> Option<WindowFunctionSimplification> {
self.inner.simplify()
}
fn equals(&self, other: &dyn WindowUDFImpl) -> bool {
if let Some(other) = other.as_any().downcast_ref::<AliasedWindowUDFImpl>() {
self.inner.equals(other.inner.as_ref()) && self.aliases == other.aliases
} else {
false
}
}
fn hash_value(&self) -> u64 {
let hasher = &mut DefaultHasher::new();
self.inner.hash_value().hash(hasher);
self.aliases.hash(hasher);
hasher.finish()
}
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
self.inner.field(field_args)
}
fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
fn documentation(&self) -> Option<&Documentation> {
self.inner.documentation()
}
}
pub mod window_doc_sections {
use datafusion_doc::DocSection;
pub fn doc_sections() -> Vec<DocSection> {
vec![
DOC_SECTION_AGGREGATE,
DOC_SECTION_RANKING,
DOC_SECTION_ANALYTICAL,
]
}
pub const DOC_SECTION_AGGREGATE: DocSection = DocSection {
include: true,
label: "Aggregate Functions",
description: Some("All aggregate functions can be used as window functions."),
};
pub const DOC_SECTION_RANKING: DocSection = DocSection {
include: true,
label: "Ranking Functions",
description: None,
};
pub const DOC_SECTION_ANALYTICAL: DocSection = DocSection {
include: true,
label: "Analytical Functions",
description: None,
};
}
#[cfg(test)]
mod test {
use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl};
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::Result;
use datafusion_expr_common::signature::{Signature, Volatility};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use std::any::Any;
use std::cmp::Ordering;
#[derive(Debug, Clone)]
struct AWindowUDF {
signature: Signature,
}
impl AWindowUDF {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Int32],
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for AWindowUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"a"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!()
}
fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
unimplemented!()
}
}
#[derive(Debug, Clone)]
struct BWindowUDF {
signature: Signature,
}
impl BWindowUDF {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Int32],
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for BWindowUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"b"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!()
}
fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
unimplemented!()
}
}
#[test]
fn test_partial_ord() {
let a1 = WindowUDF::from(AWindowUDF::new());
let a2 = WindowUDF::from(AWindowUDF::new());
assert_eq!(a1.partial_cmp(&a2), Some(Ordering::Equal));
let b1 = WindowUDF::from(BWindowUDF::new());
assert!(a1 < b1);
assert!(!(a1 == b1));
}
}