use std::collections::{BTreeSet, HashMap, HashSet};
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{Column, DFSchema, Result};
use datafusion_expr::expr_rewriter::replace_col;
use datafusion_expr::{logical_plan::LogicalPlan, Expr};
use log::{debug, trace};
pub use datafusion_expr::expr_rewriter::NamePreserver;
#[deprecated(
since = "40.0.0",
note = "please use OptimizerRule::apply_order with ApplyOrder::BottomUp instead"
)]
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut new_inputs = Vec::with_capacity(plan.inputs().len());
let mut plan_is_changed = false;
for input in plan.inputs() {
if optimizer.supports_rewrite() {
let new_input = optimizer.rewrite(input.clone(), config)?;
plan_is_changed = plan_is_changed || new_input.transformed;
new_inputs.push(new_input.data);
} else {
#[allow(deprecated)]
let new_input = optimizer.try_optimize(input, config)?;
plan_is_changed = plan_is_changed || new_input.is_some();
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
}
if plan_is_changed {
let exprs = plan.expressions();
plan.with_new_exprs(exprs, new_inputs).map(Some)
} else {
Ok(None)
}
}
pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) -> bool {
let column_refs = expr.column_refs();
schema_cols
.iter()
.filter(|c| column_refs.contains(c))
.count()
== column_refs.len()
}
pub(crate) fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: &DFSchema,
) -> Result<BTreeSet<Column>> {
exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
let mut using_cols: Vec<Column> = vec![];
for col in expr.column_refs().into_iter() {
if subquery_schema.has_column(col) {
using_cols.push(col.clone());
}
}
cols.extend(using_cols);
Result::<_>::Ok(cols)
})
}
pub(crate) fn replace_qualified_name(
expr: Expr,
cols: &BTreeSet<Column>,
subquery_alias: &str,
) -> Result<Expr> {
let alias_cols: Vec<Column> = cols
.iter()
.map(|col| Column::new(Some(subquery_alias), &col.name))
.collect();
let replace_map: HashMap<&Column, &Column> =
cols.iter().zip(alias_cols.iter()).collect();
replace_col(expr, &replace_map)
}
pub fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
trace!("{description}::\n{}\n", plan.display_indent_schema());
}