use std::collections::{BTreeSet, HashMap, HashSet};
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{Column, DFSchema, DFSchemaRef, Result};
use datafusion_expr::expr_rewriter::replace_col;
use datafusion_expr::utils as expr_utils;
use datafusion_expr::{logical_plan::LogicalPlan, Expr, Operator};
use log::{debug, trace};
#[deprecated(
since = "40.0.0",
note = "please use OptimizerRule::apply_order with ApplyOrder::BottomUp instead"
)]
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut new_inputs = Vec::with_capacity(plan.inputs().len());
let mut plan_is_changed = false;
for input in plan.inputs() {
if optimizer.supports_rewrite() {
let new_input = optimizer.rewrite(input.clone(), config)?;
plan_is_changed = plan_is_changed || new_input.transformed;
new_inputs.push(new_input.data);
} else {
#[allow(deprecated)]
let new_input = optimizer.try_optimize(input, config)?;
plan_is_changed = plan_is_changed || new_input.is_some();
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
}
if plan_is_changed {
let exprs = plan.expressions();
plan.with_new_exprs(exprs, new_inputs).map(Some)
} else {
Ok(None)
}
}
pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) -> bool {
let column_refs = expr.column_refs();
schema_cols
.iter()
.filter(|c| column_refs.contains(c))
.count()
== column_refs.len()
}
pub(crate) fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: DFSchemaRef,
) -> Result<BTreeSet<Column>> {
exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
let mut using_cols: Vec<Column> = vec![];
for col in expr.column_refs().into_iter() {
if subquery_schema.has_column(col) {
using_cols.push(col.clone());
}
}
cols.extend(using_cols);
Result::<_>::Ok(cols)
})
}
pub(crate) fn replace_qualified_name(
expr: Expr,
cols: &BTreeSet<Column>,
subquery_alias: &str,
) -> Result<Expr> {
let alias_cols: Vec<Column> = cols
.iter()
.map(|col| {
Column::from_qualified_name(format!("{}.{}", subquery_alias, col.name))
})
.collect();
let replace_map: HashMap<&Column, &Column> =
cols.iter().zip(alias_cols.iter()).collect();
replace_col(expr, &replace_map)
}
pub fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
trace!("{description}::\n{}\n", plan.display_indent_schema());
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::split_conjunction` instead"
)]
pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
expr_utils::split_conjunction(expr)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::split_conjunction_owned` instead"
)]
pub fn split_conjunction_owned(expr: Expr) -> Vec<Expr> {
expr_utils::split_conjunction_owned(expr)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::split_binary_owned` instead"
)]
pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
expr_utils::split_binary_owned(expr, op)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::split_binary` instead"
)]
pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
expr_utils::split_binary(expr, op)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::conjunction` instead"
)]
pub fn conjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
expr_utils::conjunction(filters)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::disjunction` instead"
)]
pub fn disjunction(filters: impl IntoIterator<Item = Expr>) -> Option<Expr> {
expr_utils::disjunction(filters)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::add_filter` instead"
)]
pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> Result<LogicalPlan> {
expr_utils::add_filter(plan, predicates)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::find_join_exprs` instead"
)]
pub fn find_join_exprs(exprs: Vec<&Expr>) -> Result<(Vec<Expr>, Vec<Expr>)> {
expr_utils::find_join_exprs(exprs)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::only_or_err` instead"
)]
pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
expr_utils::only_or_err(slice)
}
#[deprecated(
since = "34.0.0",
note = "use `datafusion_expr::utils::merge_schema` instead"
)]
pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema {
expr_utils::merge_schema(inputs)
}
pub struct NamePreserver {
use_alias: bool,
}
pub struct SavedName(Option<String>);
impl NamePreserver {
pub fn new(plan: &LogicalPlan) -> Self {
Self {
use_alias: !matches!(plan, LogicalPlan::Filter(_) | LogicalPlan::Join(_)),
}
}
pub fn new_for_projection() -> Self {
Self { use_alias: true }
}
pub fn save(&self, expr: &Expr) -> Result<SavedName> {
let original_name = if self.use_alias {
Some(expr.name_for_alias()?)
} else {
None
};
Ok(SavedName(original_name))
}
}
impl SavedName {
pub fn restore(self, expr: Expr) -> Result<Expr> {
let Self(original_name) = self;
match original_name {
Some(name) => expr.alias_if_changed(name),
None => Ok(expr),
}
}
}