use std::fs::File;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::thread;
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::{ExecutionPlan, Partition};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use parquet::file::reader::SerializedFileReader;
use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
pub struct ParquetExec {
filenames: Vec<String>,
schema: SchemaRef,
projection: Vec<usize>,
batch_size: usize,
}
impl ParquetExec {
pub fn try_new(
path: &str,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
if filenames.is_empty() {
Err(ExecutionError::General("No files found".to_string()))
} else {
let file = File::open(&filenames[0])?;
let file_reader = Rc::new(SerializedFileReader::new(file)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let projection = match projection {
Some(p) => p,
None => (0..schema.fields().len()).collect(),
};
let projected_schema = Schema::new(
projection
.iter()
.map(|i| schema.field(*i).clone())
.collect(),
);
Ok(Self {
filenames,
schema: Arc::new(projected_schema),
projection,
batch_size,
})
}
}
}
impl ExecutionPlan for ParquetExec {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let partitions = self
.filenames
.iter()
.map(|filename| {
Arc::new(ParquetPartition::new(
&filename,
self.projection.clone(),
self.schema.clone(),
self.batch_size,
)) as Arc<dyn Partition>
})
.collect();
Ok(partitions)
}
}
struct ParquetPartition {
iterator: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
}
impl ParquetPartition {
pub fn new(
filename: &str,
projection: Vec<usize>,
schema: SchemaRef,
batch_size: usize,
) -> Self {
let (response_tx, response_rx): (
Sender<ArrowResult<Option<RecordBatch>>>,
Receiver<ArrowResult<Option<RecordBatch>>>,
) = bounded(2);
let filename = filename.to_string();
thread::spawn(move || {
if let Err(e) = read_file(&filename, projection, batch_size, response_tx) {
println!("Parquet reader thread terminated due to error: {:?}", e);
}
});
let iterator = Arc::new(Mutex::new(ParquetIterator {
schema,
response_rx,
}));
Self { iterator }
}
}
fn send_result(
response_tx: &Sender<ArrowResult<Option<RecordBatch>>>,
result: ArrowResult<Option<RecordBatch>>,
) -> Result<()> {
response_tx
.send(result)
.map_err(|e| ExecutionError::ExecutionError(format!("{:?}", e)))?;
Ok(())
}
fn read_file(
filename: &str,
projection: Vec<usize>,
batch_size: usize,
response_tx: Sender<ArrowResult<Option<RecordBatch>>>,
) -> Result<()> {
let file = File::open(&filename)?;
let file_reader = Rc::new(SerializedFileReader::new(file)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let mut batch_reader =
arrow_reader.get_record_reader_by_columns(projection.clone(), batch_size)?;
loop {
match batch_reader.next_batch() {
Ok(Some(batch)) => send_result(&response_tx, Ok(Some(batch)))?,
Ok(None) => {
send_result(&response_tx, Ok(None))?;
break;
}
Err(e) => {
let err_msg = format!("Error reading batch from {}: {:?}", filename, e);
send_result(
&response_tx,
Err(ArrowError::ParquetError(err_msg.clone())),
)?;
return Err(ExecutionError::ExecutionError(err_msg));
}
}
}
Ok(())
}
impl Partition for ParquetPartition {
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
Ok(self.iterator.clone())
}
}
struct ParquetIterator {
schema: SchemaRef,
response_rx: Receiver<ArrowResult<Option<RecordBatch>>>,
}
impl RecordBatchReader for ParquetIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
match self.response_rx.recv() {
Ok(batch) => batch,
Err(RecvError) => Ok(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[test]
fn test() -> Result<()> {
let testdata =
env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
let filename = format!("{}/alltypes_plain.parquet", testdata);
let parquet_exec = ParquetExec::try_new(&filename, Some(vec![0, 1, 2]), 1024)?;
let partitions = parquet_exec.partitions()?;
assert_eq!(partitions.len(), 1);
let results = partitions[0].execute()?;
let mut results = results.lock().unwrap();
let batch = results.next_batch()?.unwrap();
assert_eq!(8, batch.num_rows());
assert_eq!(3, batch.num_columns());
let schema = batch.schema();
let field_names: Vec<&str> =
schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names);
let batch = results.next_batch()?;
assert!(batch.is_none());
let batch = results.next_batch()?;
assert!(batch.is_none());
let batch = results.next_batch()?;
assert!(batch.is_none());
Ok(())
}
}