use std::sync::{Arc, Mutex};
use crate::error::Result;
use crate::execution::physical_plan::{ExecutionPlan, Partition};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatchReader;
pub struct DatasourceExec {
schema: SchemaRef,
partitions: Vec<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>,
}
impl DatasourceExec {
pub fn new(
schema: SchemaRef,
partitions: Vec<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>,
) -> Self {
Self { schema, partitions }
}
}
impl ExecutionPlan for DatasourceExec {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
Ok(self
.partitions
.iter()
.map(|it| {
Arc::new(DatasourcePartition::new(it.clone())) as Arc<dyn Partition>
})
.collect::<Vec<_>>())
}
}
pub struct DatasourcePartition {
batch_iter: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
}
impl DatasourcePartition {
fn new(batch_iter: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>) -> Self {
Self { batch_iter }
}
}
impl Partition for DatasourcePartition {
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
Ok(self.batch_iter.clone())
}
}