use std::sync::{Arc, Mutex};
use crate::error::Result;
use crate::execution::physical_plan::{ExecutionPlan, Partition};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
pub struct MemoryExec {
partitions: Vec<Vec<RecordBatch>>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
}
impl ExecutionPlan for MemoryExec {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let partitions = self
.partitions
.iter()
.map(|vec| {
Arc::new(MemoryPartition::new(
vec.clone(),
self.schema.clone(),
self.projection.clone(),
)) as Arc<dyn Partition>
})
.collect();
Ok(partitions)
}
}
impl MemoryExec {
pub fn try_new(
partitions: &Vec<Vec<RecordBatch>>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
Ok(Self {
partitions: partitions.clone(),
schema,
projection,
})
}
}
struct MemoryPartition {
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
}
impl MemoryPartition {
fn new(
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Self {
Self {
data,
schema,
projection,
}
}
}
impl Partition for MemoryPartition {
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
self.data.clone(),
self.schema.clone(),
self.projection.clone(),
)?)))
}
}
struct MemoryIterator {
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
index: usize,
}
impl MemoryIterator {
pub fn try_new(
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
Ok(Self {
data: data.clone(),
schema: schema.clone(),
projection,
index: 0,
})
}
}
impl RecordBatchReader for MemoryIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
if self.index < self.data.len() {
self.index += 1;
let batch = &self.data[self.index - 1];
match &self.projection {
Some(columns) => Ok(Some(RecordBatch::try_new(
self.schema.clone(),
columns.iter().map(|i| batch.column(*i).clone()).collect(),
)?)),
None => Ok(Some(batch.clone())),
}
} else {
Ok(None)
}
}
}