use chrono::{DateTime, Utc};
use datafusion_common::Result;
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace};
use std::sync::Arc;
pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan>;
fn name(&self) -> &str;
}
#[derive(Debug)]
pub struct OptimizerConfig {
pub query_execution_start_time: DateTime<Utc>,
}
impl OptimizerConfig {
pub fn new() -> Self {
Self {
query_execution_start_time: chrono::Utc::now(),
}
}
}
impl Default for OptimizerConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
impl Optimizer {
pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for rule in &self.rules {
new_plan = rule.optimize(&new_plan, optimizer_config)?;
observer(&new_plan, rule.as_ref());
debug!("After apply {} rule:\n", rule.name());
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", new_plan);
Ok(new_plan)
}
}