use std::sync::Arc;
use crate::arrow::datatypes::{Field, Schema};
use crate::error::{ExecutionError, Result};
use crate::logicalplan::Expr::Literal;
use crate::logicalplan::ScalarValue;
use crate::logicalplan::{Expr, LogicalPlan};
use crate::table::*;
pub struct TableImpl {
plan: Arc<LogicalPlan>,
}
impl TableImpl {
pub fn new(plan: Arc<LogicalPlan>) -> Self {
Self { plan }
}
}
impl Table for TableImpl {
fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<Table>> {
let schema = self.plan.schema();
let mut projection_index: Vec<usize> = Vec::with_capacity(columns.len());
let mut expr: Vec<Expr> = Vec::with_capacity(columns.len());
for column in columns {
match schema.column_with_name(column) {
Some((i, _)) => {
projection_index.push(i);
expr.push(Expr::Column(i));
}
_ => {
return Err(ExecutionError::InvalidColumn(format!(
"No column named '{}'",
column
)));
}
}
}
Ok(Arc::new(TableImpl::new(Arc::new(
LogicalPlan::Projection {
expr,
input: self.plan.clone(),
schema: projection(&schema, &projection_index)?,
},
))))
}
fn limit(&self, n: usize) -> Result<Arc<Table>> {
Ok(Arc::new(TableImpl::new(Arc::new(LogicalPlan::Limit {
expr: Literal(ScalarValue::UInt32(n as u32)),
input: self.plan.clone(),
schema: self.plan.schema().clone(),
}))))
}
fn to_logical_plan(&self) -> Arc<LogicalPlan> {
self.plan.clone()
}
}
fn projection(schema: &Schema, projection: &Vec<usize>) -> Result<Arc<Schema>> {
let mut fields: Vec<Field> = Vec::with_capacity(projection.len());
for i in projection {
if *i < schema.fields().len() {
fields.push(schema.field(*i).clone());
} else {
return Err(ExecutionError::InvalidColumn(format!(
"Invalid column index {} in projection",
i
)));
}
}
Ok(Arc::new(Schema::new(fields)))
}