use std::sync::Arc;
use std::{any::Any, time::Instant};
use crate::{
error::{DataFusionError, Result},
physical_plan::{
display::DisplayableExecutionPlan, DisplayFormatType, ExecutionPlan,
Partitioning, Statistics,
},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;
use super::expressions::PhysicalSortExpr;
use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
#[derive(Debug, Clone)]
pub struct AnalyzeExec {
verbose: bool,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
}
impl AnalyzeExec {
pub fn new(verbose: bool, input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
AnalyzeExec {
verbose,
input,
schema,
}
}
}
impl ExecutionPlan for AnalyzeExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn relies_on_input_order(&self) -> bool {
false
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(
self.verbose,
children.pop().unwrap(),
self.schema.clone(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"AnalyzeExec invalid partition. Expected 0, got {}",
partition
)));
}
let input_partitions = self.input.output_partitioning().partition_count();
if input_partitions != 1 {
return Err(DataFusionError::Internal(format!(
"AnalyzeExec invalid number of input partitions. Expected 1, got {}",
input_partitions
)));
}
let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
let captured_input = self.input.clone();
let mut input_stream = captured_input.execute(0, context)?;
let captured_schema = self.schema.clone();
let verbose = self.verbose;
let join_handle = tokio::task::spawn(async move {
let start = Instant::now();
let mut total_rows = 0;
while let Some(b) = input_stream.next().await {
match b {
Ok(batch) => {
total_rows += batch.num_rows();
}
b @ Err(_) => {
if tx.send(b).await.is_err() {
return;
}
}
}
}
let end = Instant::now();
let mut type_builder = StringBuilder::new(1);
let mut plan_builder = StringBuilder::new(1);
type_builder.append_value("Plan with Metrics").unwrap();
let annotated_plan =
DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();
if verbose {
type_builder.append_value("Plan with Full Metrics").unwrap();
let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();
type_builder.append_value("Output Rows").unwrap();
plan_builder.append_value(total_rows.to_string()).unwrap();
type_builder.append_value("Duration").unwrap();
plan_builder
.append_value(format!("{:?}", end - start))
.unwrap();
}
let maybe_batch = RecordBatch::try_new(
captured_schema,
vec![
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
],
);
tx.send(maybe_batch).await.ok();
});
Ok(RecordBatchReceiverStream::create(
&self.schema,
rx,
join_handle,
))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "AnalyzeExec verbose={}", self.verbose)
}
}
}
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use futures::FutureExt;
use crate::prelude::SessionContext;
use crate::{
physical_plan::collect,
test::{
assert_is_pending,
exec::{assert_strong_count_converges_to_zero, BlockingExec},
},
};
use super::*;
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
let analyze_exec = Arc::new(AnalyzeExec::new(true, blocking_exec, schema));
let fut = collect(analyze_exec, task_ctx);
let mut fut = fut.boxed();
assert_is_pending(&mut fut);
drop(fut);
assert_strong_count_converges_to_zero(refs).await;
Ok(())
}
}