use core::fmt;
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::expressions::PhysicalSortExpr;
use super::{
common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::error::Result;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use crate::execution::context::TaskContext;
use datafusion_common::DataFusionError;
use futures::Stream;
pub struct MemoryExec {
partitions: Vec<Vec<RecordBatch>>,
schema: SchemaRef,
projected_schema: SchemaRef,
projection: Option<Vec<usize>>,
}
impl fmt::Debug for MemoryExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "partitions: [...]")?;
write!(f, "schema: {:?}", self.projected_schema)?;
write!(f, "projection: {:?}", self.projection)
}
}
impl ExecutionPlan for MemoryExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions.len())
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn relies_on_input_order(&self) -> bool {
false
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(format!(
"Children cannot be replaced in {:?}",
self
)))
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(MemoryStream::try_new(
self.partitions[partition].clone(),
self.projected_schema.clone(),
self.projection.clone(),
)?))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
let partitions: Vec<_> =
self.partitions.iter().map(|b| b.len()).collect();
write!(
f,
"MemoryExec: partitions={}, partition_sizes={:?}",
partitions.len(),
partitions
)
}
}
}
fn statistics(&self) -> Statistics {
common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
)
}
}
impl MemoryExec {
pub fn try_new(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
let projected_schema = project_schema(&schema, projection.as_ref())?;
Ok(Self {
partitions: partitions.to_vec(),
schema,
projected_schema,
projection,
})
}
}
pub struct MemoryStream {
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
index: usize,
}
impl MemoryStream {
pub fn try_new(
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
Ok(Self {
data,
schema,
projection,
index: 0,
})
}
}
impl Stream for MemoryStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.data.len() {
self.index += 1;
let batch = &self.data[self.index - 1];
let batch = match self.projection.as_ref() {
Some(columns) => batch.project(columns)?,
None => batch.clone(),
};
Some(Ok(batch))
} else {
None
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.data.len(), Some(self.data.len()))
}
}
impl RecordBatchStream for MemoryStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::ColumnStatistics;
use crate::prelude::SessionContext;
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
fn mock_data() -> Result<(SchemaRef, RecordBatch)> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from(vec![None, None, Some(9)])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)?;
Ok((schema, batch))
}
#[tokio::test]
async fn test_with_projection() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let (schema, batch) = mock_data()?;
let executor = MemoryExec::try_new(&[vec![batch]], schema, Some(vec![2, 1]))?;
let statistics = executor.statistics();
assert_eq!(statistics.num_rows, Some(3));
assert_eq!(
statistics.column_statistics,
Some(vec![
ColumnStatistics {
null_count: Some(2),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
])
);
let mut it = executor.execute(0, task_ctx)?;
let batch2 = it.next().await.unwrap()?;
assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
assert_eq!("b", batch2.schema().field(1).name());
assert_eq!(2, batch2.num_columns());
Ok(())
}
#[tokio::test]
async fn test_without_projection() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let (schema, batch) = mock_data()?;
let executor = MemoryExec::try_new(&[vec![batch]], schema, None)?;
let statistics = executor.statistics();
assert_eq!(statistics.num_rows, Some(3));
assert_eq!(
statistics.column_statistics,
Some(vec![
ColumnStatistics {
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(2),
max_value: None,
min_value: None,
distinct_count: None,
},
ColumnStatistics {
null_count: Some(0),
max_value: None,
min_value: None,
distinct_count: None,
},
])
);
let mut it = executor.execute(0, task_ctx)?;
let batch1 = it.next().await.unwrap()?;
assert_eq!(4, batch1.schema().fields().len());
assert_eq!(4, batch1.num_columns());
Ok(())
}
}