use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use arrow::array::*;
use arrow::compute::array_ops::filter;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use crate::error::{ExecutionError, Result};
use crate::execution::expression::CompiledExpr;
use crate::execution::relation::Relation;
pub(super) struct FilterRelation {
schema: Arc<Schema>,
input: Rc<RefCell<Relation>>,
expr: CompiledExpr,
}
impl FilterRelation {
pub fn new(
input: Rc<RefCell<Relation>>,
expr: CompiledExpr,
schema: Arc<Schema>,
) -> Self {
Self {
schema,
input,
expr,
}
}
}
impl Relation for FilterRelation {
fn next(&mut self) -> Result<Option<RecordBatch>> {
match self.input.borrow_mut().next()? {
Some(batch) => {
match self
.expr
.invoke(&batch)?
.as_any()
.downcast_ref::<BooleanArray>()
{
Some(filter_bools) => {
let filtered_columns: Result<Vec<ArrayRef>> = (0..batch
.num_columns())
.map(|i| {
match filter(batch.column(i).as_ref(), &filter_bools) {
Ok(result) => Ok(result),
Err(error) => Err(ExecutionError::from(error)),
}
})
.collect();
let filtered_batch: RecordBatch =
RecordBatch::try_new(self.schema.clone(), filtered_columns?)?;
Ok(Some(filtered_batch))
}
_ => Err(ExecutionError::ExecutionError(
"Filter expression did not evaluate to boolean".to_string(),
)),
}
}
None => Ok(None),
}
}
fn schema(&self) -> &Arc<Schema> {
&self.schema
}
}