use std::cmp::Ordering::*;
use std::collections::HashMap;
use std::io::Error;
use std::io::BufReader;
use std::io::prelude::*;
use std::iter::Iterator;
use std::fs::File;
use std::string::String;
use std::convert::*;
extern crate csv;
use super::csv::StringRecord;
use super::api::*;
use super::rel::*;
use super::sql::ASTNode::*;
use super::sqltorel::*;
use super::parser::*;
use super::dataframe::*;
use super::functions::math::*;
use super::functions::geospatial::*;
#[derive(Debug)]
pub enum ExecutionError {
IoError(Error),
CsvError(csv::Error),
ParserError(ParserError),
Custom(String)
}
impl From<Error> for ExecutionError {
fn from(e: Error) -> Self {
ExecutionError::IoError(e)
}
}
impl From<String> for ExecutionError {
fn from(e: String) -> Self {
ExecutionError::Custom(e)
}
}
impl From<ParserError> for ExecutionError {
fn from(e: ParserError) -> Self {
ExecutionError::ParserError(e)
}
}
#[derive(Debug)]
pub struct CsvRelation {
file: File,
schema: Schema
}
pub struct FilterRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Expr
}
pub struct ProjectRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Vec<Expr>
}
pub struct SortRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Vec<Expr>,
}
pub struct LimitRelation {
schema: Schema,
input: Box<SimpleRelation>,
limit: usize,
}
impl<'a> CsvRelation {
pub fn open(file: File, schema: Schema) -> Result<Self,ExecutionError> {
Ok(CsvRelation { file, schema })
}
fn create_tuple(&self, r: &StringRecord) -> Result<Row,ExecutionError> {
assert_eq!(self.schema.columns.len(), r.len());
let values = self.schema.columns.iter().zip(r.into_iter()).map(|(c,s)| match c.data_type {
DataType::UnsignedLong => Value::UnsignedLong(s.parse::<u64>().unwrap()),
DataType::String => Value::String(s.to_string()),
DataType::Double => Value::Double(s.parse::<f64>().unwrap()),
_ => panic!("csv unsupported type")
}).collect();
Ok(Row::new(values))
}
}
pub trait SimpleRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row,ExecutionError>> + 'a>;
fn schema<'a>(&'a self) -> &'a Schema;
}
impl SimpleRelation for CsvRelation {
fn scan<'a>(&'a self, _ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row,ExecutionError>> + 'a> {
let buf_reader = BufReader::new(&self.file);
let csv_reader = csv::Reader::from_reader(buf_reader);
let record_iter = csv_reader.into_records();
let tuple_iter = record_iter.map(move|r| match r {
Ok(record) => self.create_tuple(&record),
Err(e) => Err(ExecutionError::CsvError(e))
});
Box::new(tuple_iter)
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for FilterRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row, ExecutionError>> + 'a> {
Box::new(self.input.scan(ctx).filter(move|t|
match t {
&Ok(ref tuple) => match ctx.evaluate(tuple, &self.schema, &self.expr) {
Ok(Value::Boolean(b)) => b,
_ => panic!("Predicate expression evaluated to non-boolean value")
},
_ => true }
))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for SortRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row, ExecutionError>> + 'a> {
let it = self.input.scan(ctx);
let mut v : Vec<Row> = vec![];
it.for_each(|item| v.push(item.unwrap()));
v.sort_by(|a,b| {
for e in &self.expr {
match e {
&Expr::Sort { ref expr, asc } => {
let a_value = ctx.evaluate(a, &self.schema, expr).unwrap();
let b_value = ctx.evaluate(b, &self.schema, expr).unwrap();
if a_value < b_value {
return if asc { Less } else { Greater };
} else if a_value > b_value {
return if asc { Greater } else { Less };
}
},
_ => panic!("wrong expression type for sort")
}
}
Equal
});
Box::new(v.into_iter().map(|r|Ok(r)))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for ProjectRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row, ExecutionError>> + 'a> {
let foo = self.input.scan(ctx).map(move|r| match r {
Ok(tuple) => {
let values = self.expr.iter()
.map(|e| match e {
&Expr::TupleValue(i) => tuple.values[i].clone(),
_ => ctx.evaluate(&tuple,&self.schema, e).unwrap() })
.collect();
Ok(Row::new(values))
},
Err(_) => r
});
Box::new(foo)
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for LimitRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row, ExecutionError>> + 'a> {
Box::new(self.input.scan(ctx).take(self.limit))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
#[derive(Debug,Clone,Serialize,Deserialize)]
pub enum ExecutionPlan {
Interactive { plan: Box<LogicalPlan> },
Partition { plan: Box<LogicalPlan>, partition_count: usize, partition_expr: Expr }
}
#[derive(Debug,Clone)]
pub struct ExecutionContext {
schemas: HashMap<String, Schema>,
functions: HashMap<String, FunctionMeta>,
}
impl ExecutionContext {
pub fn new() -> Self {
ExecutionContext {
schemas: HashMap::new(),
functions: HashMap::new()
}
}
pub fn define_schema(&mut self, name: &str, schema: &Schema) {
self.schemas.insert(name.to_string(), schema.clone());
}
pub fn define_function(&mut self, func: &ScalarFunction) {
let fm = FunctionMeta {
name: func.name(),
args: func.args(),
return_type: func.return_type()
};
self.functions.insert(fm.name.to_lowercase(), fm);
}
pub fn create_logical_plan(&self, sql: &str) -> Result<Box<LogicalPlan>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql))?;
let query_planner = SqlToRel::new(self.schemas.clone());
Ok(query_planner.sql_to_rel(&ast)?)
}
pub fn sql(&mut self, sql: &str) -> Result<Box<DataFrame>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql))?;
match ast {
SQLCreateTable { name, columns } => {
let fields : Vec<Field> = columns.iter()
.map(|c| Field::new(&c.name, convert_data_type(&c.data_type), c.allow_null))
.collect();
let schema = Schema::new(fields);
self.define_schema(&name, &schema);
Ok(Box::new(DF { ctx: Box::new(self.clone()), plan: Box::new(LogicalPlan::EmptyRelation) }))
},
_ => {
let query_planner = SqlToRel::new(self.schemas.clone());
let plan = query_planner.sql_to_rel(&ast)?;
Ok(Box::new(DF { ctx: Box::new(self.clone()), plan: plan })) }
}
}
pub fn load(&self, filename: &str, schema: &Schema) -> Result<Box<DataFrame>, ExecutionError> {
let plan = LogicalPlan::CsvFile { filename: filename.to_string(), schema: schema.clone() };
Ok(Box::new(DF { ctx: Box::new((*self).clone()), plan: Box::new(plan) }))
}
pub fn register_table(&mut self, name: String, schema: Schema) {
self.schemas.insert(name, schema);
}
pub fn create_execution_plan(&self, plan: &LogicalPlan) -> Result<Box<SimpleRelation>,ExecutionError> {
match *plan {
LogicalPlan::EmptyRelation => {
Err(ExecutionError::Custom(String::from("empty relation is not implemented yet")))
},
LogicalPlan::TableScan { ref table_name, ref schema, .. } => {
let file = File::open(format!("test/data/{}.csv", table_name))?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
},
LogicalPlan::CsvFile { ref filename, ref schema } => {
let file = File::open(filename)?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
},
LogicalPlan::Selection { ref expr, ref input, ref schema } => {
let input_rel = self.create_execution_plan(input)?;
let rel = FilterRelation {
input: input_rel,
expr: expr.clone(),
schema: schema.clone()
};
Ok(Box::new(rel))
},
LogicalPlan::Projection { ref expr, ref input, .. } => {
let input_rel = self.create_execution_plan(&input)?;
let input_schema = input_rel.schema().clone();
let project_columns: Vec<Field> = expr.iter().map(|e| {
match e {
&Expr::TupleValue(i) => input_schema.columns[i].clone(),
&Expr::ScalarFunction {ref name, .. } => Field {
name: name.clone(),
data_type: DataType::Double, nullable: true
},
_ => unimplemented!("Unsupported projection expression")
}
}).collect();
let project_schema = Schema { columns: project_columns };
let rel = ProjectRelation {
input: input_rel,
expr: expr.clone(),
schema: project_schema,
};
Ok(Box::new(rel))
}
LogicalPlan::Sort { ref expr, ref input, ref schema } => {
let input_rel = self.create_execution_plan(input)?;
let rel = SortRelation {
input: input_rel,
expr: expr.clone(),
schema: schema.clone()
};
Ok(Box::new(rel))
},
LogicalPlan::Limit { limit, ref input, ref schema, .. } => {
let input_rel = self.create_execution_plan(input)?;
let rel = LimitRelation {
input: input_rel,
limit: limit,
schema: schema.clone()
};
Ok(Box::new(rel))
}
}
}
pub fn evaluate(&self, row: &Row, schema: &Schema, expr: &Expr) -> Result<Value, Box<ExecutionError>> {
match expr {
&Expr::BinaryExpr { ref left, ref op, ref right } => {
let left_value = self.evaluate(row, schema, left)?;
let right_value = self.evaluate(row, schema, right)?;
match op {
&Operator::Eq => Ok(Value::Boolean(left_value == right_value)),
&Operator::NotEq => Ok(Value::Boolean(left_value != right_value)),
&Operator::Lt => Ok(Value::Boolean(left_value < right_value)),
&Operator::LtEq => Ok(Value::Boolean(left_value <= right_value)),
&Operator::Gt => Ok(Value::Boolean(left_value > right_value)),
&Operator::GtEq => Ok(Value::Boolean(left_value >= right_value)),
}
},
&Expr::TupleValue(index) => Ok(row.values[index].clone()),
&Expr::Literal(ref value) => Ok(value.clone()),
&Expr::Sort { ref expr, .. } => self.evaluate(row, schema, expr),
&Expr::ScalarFunction { ref name, ref args } => {
let arg_values : Vec<Value> = args.iter()
.map(|a| self.evaluate(row, schema, &a))
.collect::<Result<Vec<Value>, Box<ExecutionError>>>()?;
let func = self.load_function_impl(name.as_ref())?;
match func.execute(arg_values) {
Ok(value) => Ok(value),
Err(e) => Err(Box::new(ExecutionError::Custom(
format!("Function returned error {:?}", e))))
}
}
}
}
fn load_function_impl(&self, function_name: &str) -> Result<Box<ScalarFunction>,Box<ExecutionError>> {
match function_name.to_lowercase().as_ref() {
"sqrt" => Ok(Box::new(SqrtFunction {})),
"st_point" => Ok(Box::new(STPointFunc {})),
"st_astext" => Ok(Box::new(STAsText {})),
_ => Err(Box::new(ExecutionError::Custom(format!("Unknown function {}", function_name))))
}
}
pub fn udf(&self, name: &str, args: Vec<Expr>) -> Expr {
Expr::ScalarFunction { name: name.to_string(), args: args.clone() }
}
}
pub struct DF {
ctx: Box<ExecutionContext>,
plan: Box<LogicalPlan>
}
impl DataFrame for DF {
fn select(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Projection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { ctx: self.ctx.clone(), plan: Box::new(plan) }))
}
fn sort(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Sort {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { ctx: self.ctx.clone(), plan: Box::new(plan) }))
}
fn filter(&self, expr: Expr) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Selection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { ctx: self.ctx.clone(), plan: Box::new(plan) }))
}
fn write(&self, filename: &str) -> Result<(), DataFrameError> {
let execution_plan = self.ctx.create_execution_plan(&self.plan)?;
let mut file = File::create(filename)?;
let it = execution_plan.scan(&self.ctx);
it.for_each(|t| {
match t {
Ok(tuple) => {
let csv = format!("{}\n", tuple.to_string());
file.write(&csv.into_bytes()).unwrap(); },
Err(e) => panic!(format!("Error processing tuple: {:?}", e)) }
});
Ok(())
}
fn col(&self, column_name: &str) -> Result<Expr, DataFrameError> {
match self.plan.schema().column(column_name) {
Some((i,_)) => Ok(Expr::TupleValue(i)),
_ => Err(DataFrameError::InvalidColumn(column_name.to_string()))
}
}
fn schema(&self) -> Schema {
self.plan.schema().clone()
}
fn repartition(&self, _n: u32) -> Result<Box<DataFrame>, DataFrameError> {
unimplemented!()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sqrt() {
let mut ctx = create_context();
ctx.define_function(&SqrtFunction {});
let df = ctx.sql(&"SELECT id, sqrt(id) FROM people").unwrap();
df.write("_sqrt_out.csv").unwrap();
}
#[test]
fn test_sql_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_Point(lat, lng) FROM uk_cities").unwrap();
df.write("_uk_cities_sql.csv").unwrap();
}
#[test]
fn test_df_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let func_expr = ctx.udf("ST_Point",vec![
df.col("lat").unwrap(),
df.col("lng").unwrap()]
);
let df2 = df.select(vec![func_expr]).unwrap();
df2.write("_uk_cities_df.csv").unwrap();
}
#[test]
fn test_sort() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let df2 = df.sort(vec![
Expr::Sort { expr: Box::new(Expr::TupleValue(1)), asc: true },
Expr::Sort { expr: Box::new(Expr::TupleValue(2)), asc: true }
]).unwrap();
df2.write("_uk_cities_sorted_by_lat_lng.csv").unwrap();
}
#[test]
fn test_chaining_functions() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_AsText(ST_Point(lat, lng)) FROM uk_cities").unwrap();
df.write("_uk_cities_wkt.csv").unwrap();
}
fn create_context() -> ExecutionContext {
let mut ctx = ExecutionContext::new();
ctx.define_schema("people", &Schema::new(vec![
Field::new("id", DataType::UnsignedLong, false),
Field::new("name", DataType::String, false)]));
ctx.define_schema("uk_cities", &Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]));
ctx
}
}