use std::fs::File;
use std::sync::{Arc, Mutex};
use crate::error::Result;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
pub struct CsvExec {
path: String,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
}
impl ExecutionPlan for CsvExec {
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let mut filenames: Vec<String> = vec![];
common::build_file_list(&self.path, &mut filenames, ".csv")?;
let partitions = filenames
.iter()
.map(|filename| {
Arc::new(CsvPartition::new(
&filename,
self.schema.clone(),
self.has_header,
self.projection.clone(),
self.batch_size,
)) as Arc<dyn Partition>
})
.collect();
Ok(partitions)
}
}
impl CsvExec {
pub fn try_new(
path: &str,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
Ok(Self {
path: path.to_string(),
schema,
has_header,
projection,
batch_size,
})
}
}
struct CsvPartition {
path: String,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
}
impl CsvPartition {
fn new(
path: &str,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Self {
Self {
path: path.to_string(),
schema,
has_header,
projection,
batch_size,
}
}
}
impl Partition for CsvPartition {
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(Arc::new(Mutex::new(CsvIterator::try_new(
&self.path,
self.schema.clone(),
self.has_header,
&self.projection,
self.batch_size,
)?)))
}
}
struct CsvIterator {
reader: csv::Reader<File>,
}
impl CsvIterator {
pub fn try_new(
filename: &str,
schema: Arc<Schema>,
has_header: bool,
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let file = File::open(filename)?;
let reader = csv::Reader::new(
file,
schema.clone(),
has_header,
batch_size,
projection.clone(),
);
Ok(Self { reader })
}
}
impl BatchIterator for CsvIterator {
fn schema(&self) -> Arc<Schema> {
self.reader.schema()
}
fn next(&mut self) -> Result<Option<RecordBatch>> {
Ok(self.reader.next()?)
}
}