use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use arrow::array::*;
use arrow::compute::array_ops::limit;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use crate::error::{ExecutionError, Result};
use crate::execution::relation::Relation;
pub(super) struct LimitRelation {
input: Rc<RefCell<Relation>>,
schema: Arc<Schema>,
limit: usize,
num_consumed_rows: usize,
}
impl LimitRelation {
pub fn new(input: Rc<RefCell<Relation>>, limit: usize, schema: Arc<Schema>) -> Self {
Self {
input,
schema,
limit,
num_consumed_rows: 0,
}
}
}
impl Relation for LimitRelation {
fn next(&mut self) -> Result<Option<RecordBatch>> {
match self.input.borrow_mut().next()? {
Some(batch) => {
let capacity = self.limit - self.num_consumed_rows;
if capacity <= 0 {
return Ok(None);
}
if batch.num_rows() >= capacity {
let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
.map(|i| match limit(batch.column(i), capacity) {
Ok(result) => Ok(result),
Err(error) => Err(ExecutionError::from(error)),
})
.collect();
let limited_batch: RecordBatch =
RecordBatch::try_new(self.schema.clone(), limited_columns?)?;
self.num_consumed_rows += capacity;
Ok(Some(limited_batch))
} else {
self.num_consumed_rows += batch.num_rows();
Ok(Some(batch))
}
}
None => Ok(None),
}
}
fn schema(&self) -> &Arc<Schema> {
&self.schema
}
}