use super::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::with_new_children_if_necessary;
use crate::{
error::Result,
physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
};
use std::sync::Arc;
#[derive(Default)]
pub struct AddCoalescePartitionsExec {}
impl AddCoalescePartitionsExec {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::SessionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
Ok(plan.clone())
} else {
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => {
with_new_children_if_necessary(plan, children)
}
Distribution::HashPartitioned(_) => {
with_new_children_if_necessary(plan, children)
}
Distribution::SinglePartition => with_new_children_if_necessary(
plan,
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
})
.collect(),
),
}
}
}
fn name(&self) -> &str {
"add_merge_exec"
}
}