use std::collections::HashMap;
use std::io::Error;
use std::io::BufReader;
use std::io::prelude::*;
use std::iter::Iterator;
use std::fs::File;
use std::string::String;
use std::convert::*;
extern crate csv;
use super::csv::StringRecord;
use super::api::*;
use super::rel::*;
use super::parser::*;
use super::sqltorel::*;
use super::dataframe::*;
use super::functions::math::*;
use super::functions::geospatial::*;
#[derive(Debug)]
pub enum ExecutionError {
IoError(Error),
CsvError(csv::Error),
Custom(String)
}
impl From<Error> for ExecutionError {
fn from(e: Error) -> Self {
ExecutionError::IoError(e)
}
}
impl From<String> for ExecutionError {
fn from(e: String) -> Self {
ExecutionError::Custom(e)
}
}
#[derive(Debug)]
pub struct CsvRelation {
file: File,
schema: Schema
}
pub struct FilterRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Expr
}
pub struct ProjectRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Vec<Expr>
}
impl<'a> CsvRelation {
pub fn open(file: File, schema: Schema) -> Result<Self,ExecutionError> {
Ok(CsvRelation { file, schema })
}
fn create_tuple(&self, r: &StringRecord) -> Result<Row,ExecutionError> {
assert_eq!(self.schema.columns.len(), r.len());
let values = self.schema.columns.iter().zip(r.into_iter()).map(|(c,s)| match c.data_type {
DataType::UnsignedLong => Value::UnsignedLong(s.parse::<u64>().unwrap()),
DataType::String => Value::String(s.to_string()),
DataType::Double => Value::Double(s.parse::<f64>().unwrap()),
_ => panic!("csv unsupported type")
}).collect();
Ok(Row::new(values))
}
}
pub trait SimpleRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row,ExecutionError>> + 'a>;
fn schema<'a>(&'a self) -> &'a Schema;
}
impl SimpleRelation for CsvRelation {
fn scan<'a>(&'a self, _ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row,ExecutionError>> + 'a> {
let buf_reader = BufReader::new(&self.file);
let csv_reader = csv::Reader::from_reader(buf_reader);
let record_iter = csv_reader.into_records();
let tuple_iter = record_iter.map(move|r| match r {
Ok(record) => self.create_tuple(&record),
Err(e) => Err(ExecutionError::CsvError(e))
});
Box::new(tuple_iter)
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for FilterRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row, ExecutionError>> + 'a> {
Box::new(self.input.scan(ctx).filter(move|t|
match t {
&Ok(ref tuple) => match ctx.evaluate(tuple, &self.schema, &self.expr) {
Ok(Value::Boolean(b)) => b,
_ => panic!("Predicate expression evaluated to non-boolean value")
},
_ => true }
))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for ProjectRelation {
fn scan<'a>(&'a self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Row, ExecutionError>> + 'a> {
let foo = self.input.scan(ctx).map(move|r| match r {
Ok(tuple) => {
let values = self.expr.iter()
.map(|e| match e {
&Expr::TupleValue(i) => tuple.values[i].clone(),
_ => ctx.evaluate(&tuple,&self.schema, e).unwrap() })
.collect();
Ok(Row::new(values))
},
Err(_) => r
});
Box::new(foo)
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
#[derive(Debug,Clone)]
pub struct ExecutionContext {
schemas: HashMap<String, Schema>,
functions: HashMap<String, FunctionMeta>,
}
impl ExecutionContext {
pub fn new() -> Self {
ExecutionContext {
schemas: HashMap::new(),
functions: HashMap::new()
}
}
pub fn define_schema(&mut self, name: &str, schema: &Schema) {
self.schemas.insert(name.to_string(), schema.clone());
}
pub fn define_function(&mut self, func: &ScalarFunction) {
let fm = FunctionMeta {
name: func.name(),
args: func.args(),
return_type: func.return_type()
};
self.functions.insert(fm.name.to_lowercase(), fm);
}
pub fn sql(&self, sql: &str) -> Result<Box<DataFrame>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql)).unwrap();
let query_planner = SqlToRel::new(self.schemas.clone());
let plan = query_planner.sql_to_rel(&ast).unwrap();
Ok(Box::new(DF { ctx: Box::new(self.clone()), plan: plan })) }
pub fn load(&self, filename: &str, schema: &Schema) -> Result<Box<DataFrame>, ExecutionError> {
let plan = Rel::CsvFile { filename: filename.to_string(), schema: schema.clone() };
Ok(Box::new(DF { ctx: Box::new((*self).clone()), plan: Box::new(plan) }))
}
pub fn register_table(&mut self, name: String, schema: Schema) {
self.schemas.insert(name, schema);
}
pub fn create_execution_plan(&self, plan: &Rel) -> Result<Box<SimpleRelation>,ExecutionError> {
match *plan {
Rel::EmptyRelation => {
panic!()
},
Rel::TableScan { ref table_name, ref schema, .. } => {
let file = File::open(format!("test/data/{}.csv", table_name))?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
},
Rel::CsvFile { ref filename, ref schema } => {
let file = File::open(filename)?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
},
Rel::Selection { ref expr, ref input, ref schema } => {
let input_rel = self.create_execution_plan(input)?;
let rel = FilterRelation {
input: input_rel,
expr: expr.clone(),
schema: schema.clone()
};
Ok(Box::new(rel))
},
Rel::Projection { ref expr, ref input, .. } => {
let input_rel = self.create_execution_plan(&input)?;
let input_schema = input_rel.schema().clone();
let project_columns: Vec<Field> = expr.iter().map(|e| {
match e {
&Expr::TupleValue(i) => input_schema.columns[i].clone(),
&Expr::ScalarFunction {ref name, .. } => Field {
name: name.clone(),
data_type: DataType::Double, nullable: true
},
_ => unimplemented!("Unsupported projection expression")
}
}).collect();
let project_schema = Schema { columns: project_columns };
let rel = ProjectRelation {
input: input_rel,
expr: expr.clone(),
schema: project_schema,
};
Ok(Box::new(rel))
}
}
}
pub fn evaluate(&self, tuple: &Row, tt: &Schema, rex: &Expr) -> Result<Value, Box<ExecutionError>> {
match rex {
&Expr::BinaryExpr { ref left, ref op, ref right } => {
let left_value = self.evaluate(tuple, tt, left)?;
let right_value = self.evaluate(tuple, tt, right)?;
match op {
&Operator::Eq => Ok(Value::Boolean(left_value == right_value)),
&Operator::NotEq => Ok(Value::Boolean(left_value != right_value)),
&Operator::Lt => Ok(Value::Boolean(left_value < right_value)),
&Operator::LtEq => Ok(Value::Boolean(left_value <= right_value)),
&Operator::Gt => Ok(Value::Boolean(left_value > right_value)),
&Operator::GtEq => Ok(Value::Boolean(left_value >= right_value)),
}
},
&Expr::TupleValue(index) => Ok(tuple.values[index].clone()),
&Expr::Literal(ref value) => Ok(value.clone()),
&Expr::ScalarFunction { ref name, ref args } => {
let arg_values : Vec<Value> = args.iter()
.map(|a| self.evaluate(tuple, tt, &a))
.collect::<Result<Vec<Value>, Box<ExecutionError>>>()?;
let func = self.load_function_impl(name.as_ref())?;
match func.execute(arg_values) {
Ok(value) => Ok(value),
Err(_) => Err(Box::new(ExecutionError::Custom("TBD".to_string()))) }
}
}
}
fn load_function_impl(&self, function_name: &str) -> Result<Box<ScalarFunction>,Box<ExecutionError>> {
match function_name.to_lowercase().as_ref() {
"sqrt" => Ok(Box::new(SqrtFunction {})),
"st_point" => Ok(Box::new(STPointFunc {})),
"st_astext" => Ok(Box::new(STAsText {})),
_ => Err(Box::new(ExecutionError::Custom(format!("Unknown function {}", function_name))))
}
}
}
pub struct DF {
ctx: Box<ExecutionContext>,
plan: Box<Rel>
}
impl DataFrame for DF {
fn select(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = Rel::Projection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { ctx: self.ctx.clone(), plan: Box::new(plan) }))
}
fn filter(&self, expr: Expr) -> Result<Box<DataFrame>, DataFrameError> {
let plan = Rel::Selection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { ctx: self.ctx.clone(), plan: Box::new(plan) }))
}
fn write(&self, filename: &str) -> Result<(), DataFrameError> {
let execution_plan = self.ctx.create_execution_plan(&self.plan)?;
println!("Writing csv to {}", filename);
let mut file = File::create(filename)?;
let it = execution_plan.scan(&self.ctx);
it.for_each(|t| {
match t {
Ok(tuple) => {
let csv = format!("{}\n", tuple.to_string());
file.write(&csv.into_bytes()).unwrap(); },
Err(e) => panic!(format!("Error processing tuple: {:?}", e)) }
});
Ok(())
}
fn col(&self, column_name: &str) -> Result<Expr, DataFrameError> {
match &self.plan.as_ref() {
&&Rel::CsvFile { ref schema, .. } => match schema.column(column_name) {
Some((i,_)) => Ok(Expr::TupleValue(i)),
_ => Err(DataFrameError::InvalidColumn(column_name.to_string()))
},
_ => Err(DataFrameError::NotImplemented)
}
}
fn schema(&self) -> Schema {
self.plan.schema().clone()
}
fn repartition(&self, _n: u32) -> Result<Box<DataFrame>, DataFrameError> {
unimplemented!()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sqrt() {
let mut ctx = create_context();
ctx.define_function(&SqrtFunction {});
let df = ctx.sql(&"SELECT id, sqrt(id) FROM people").unwrap();
df.write("_sqrt_out.csv").unwrap();
}
#[test]
fn test_sql_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_Point(lat, lng) FROM uk_cities").unwrap();
df.write("_uk_cities_sql.csv").unwrap();
}
#[test]
fn test_df_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let func_expr = Expr::ScalarFunction {
name: "ST_Point".to_string(),
args: vec![df.col("lat").unwrap(), df.col("lng").unwrap()]
};
let df2 = df.select(vec![func_expr]).unwrap();
df2.write("_uk_cities_df.csv").unwrap();
}
#[test]
fn test_chaining_functions() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_AsText(ST_Point(lat, lng)) FROM uk_cities").unwrap();
df.write("_uk_cities_wkt.csv").unwrap();
}
fn create_context() -> ExecutionContext {
let mut ctx = ExecutionContext::new();
ctx.define_schema("people", &Schema::new(vec![
Field::new("id", DataType::UnsignedLong, false),
Field::new("name", DataType::String, false)]));
ctx.define_schema("uk_cities", &Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]));
ctx
}
}