use arrow::datatypes::SchemaRef;
use std::sync::Arc;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{ExecutionPlan, Partitioning};
use crate::scheduler::pipeline::{
execution::ExecutionPipeline, repartition::RepartitionPipeline, Pipeline,
};
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct OutputLink {
pub pipeline: usize,
pub child: usize,
}
#[derive(Debug)]
pub struct RoutablePipeline {
pub pipeline: Box<dyn Pipeline>,
pub output: Option<OutputLink>,
}
#[derive(Debug)]
pub struct PipelinePlan {
pub schema: SchemaRef,
pub output_partitions: usize,
pub pipelines: Vec<RoutablePipeline>,
}
struct OperatorGroup {
output: Option<OutputLink>,
root: Arc<dyn ExecutionPlan>,
depth: usize,
}
pub struct PipelinePlanner {
task_context: Arc<TaskContext>,
schema: SchemaRef,
output_partitions: usize,
completed: Vec<RoutablePipeline>,
to_visit: Vec<(Arc<dyn ExecutionPlan>, Option<OutputLink>)>,
execution_operators: Option<OperatorGroup>,
}
impl PipelinePlanner {
pub fn new(plan: Arc<dyn ExecutionPlan>, task_context: Arc<TaskContext>) -> Self {
let schema = plan.schema();
let output_partitions = plan.output_partitioning().partition_count();
Self {
completed: vec![],
to_visit: vec![(plan, None)],
task_context,
execution_operators: None,
schema,
output_partitions,
}
}
fn flush_exec(&mut self) -> Result<usize> {
let group = self.execution_operators.take().unwrap();
let node_idx = self.completed.len();
self.completed.push(RoutablePipeline {
pipeline: Box::new(ExecutionPipeline::new(
group.root,
self.task_context.clone(),
group.depth,
)?),
output: group.output,
});
Ok(node_idx)
}
fn visit_exec(
&mut self,
plan: Arc<dyn ExecutionPlan>,
parent: Option<OutputLink>,
) -> Result<()> {
let children = plan.children();
match self.execution_operators.as_mut() {
Some(buffer) => {
assert_eq!(parent, buffer.output, "QueryBuilder out of sync");
buffer.depth += 1;
}
None => {
self.execution_operators = Some(OperatorGroup {
output: parent,
root: plan,
depth: 0,
})
}
}
match children.len() {
1 => {
self.to_visit
.push((children.into_iter().next().unwrap(), parent))
}
_ => {
let node = self.flush_exec()?;
self.enqueue_children(children, node);
}
}
Ok(())
}
fn enqueue_children(
&mut self,
children: Vec<Arc<dyn ExecutionPlan>>,
parent_node_idx: usize,
) {
for (child_idx, child) in children.into_iter().enumerate() {
self.to_visit.push((
child,
Some(OutputLink {
pipeline: parent_node_idx,
child: child_idx,
}),
))
}
}
fn push_pipeline(
&mut self,
node: RoutablePipeline,
children: Vec<Arc<dyn ExecutionPlan>>,
) {
let node_idx = self.completed.len();
self.completed.push(node);
self.enqueue_children(children, node_idx)
}
fn push_repartition(
&mut self,
input: Partitioning,
output: Partitioning,
parent: Option<OutputLink>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<()> {
let parent = match &self.execution_operators {
Some(buffer) => {
assert_eq!(buffer.output, parent, "QueryBuilder out of sync");
Some(OutputLink {
pipeline: self.flush_exec()?,
child: 0, })
}
None => parent,
};
let node = Box::new(RepartitionPipeline::try_new(input, output)?);
self.push_pipeline(
RoutablePipeline {
pipeline: node,
output: parent,
},
children,
);
Ok(())
}
fn visit_operator(
&mut self,
plan: Arc<dyn ExecutionPlan>,
parent: Option<OutputLink>,
) -> Result<()> {
if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
self.push_repartition(
repartition.input().output_partitioning(),
repartition.output_partitioning(),
parent,
repartition.children(),
)
} else if let Some(coalesce) =
plan.as_any().downcast_ref::<CoalescePartitionsExec>()
{
self.push_repartition(
coalesce.input().output_partitioning(),
Partitioning::RoundRobinBatch(1),
parent,
coalesce.children(),
)
} else {
self.visit_exec(plan, parent)
}
}
pub fn build(mut self) -> Result<PipelinePlan> {
while let Some((plan, parent)) = self.to_visit.pop() {
self.visit_operator(plan, parent)?;
}
if self.execution_operators.is_some() {
self.flush_exec()?;
}
Ok(PipelinePlan {
schema: self.schema,
output_partitions: self.output_partitions,
pipelines: self.completed,
})
}
}