use std::clone::Clone;
use std::collections::HashMap;
use std::io::{BufWriter, Error};
use std::io::prelude::*;
use std::iter::Iterator;
use std::fs::File;
use std::rc::Rc;
use std::str;
use std::string::String;
use std::convert::*;
extern crate futures;
extern crate hyper;
extern crate serde;
extern crate serde_json;
extern crate tokio_core;
use self::futures::{Future, Stream};
use self::hyper::Client;
use self::tokio_core::reactor::Core;
use self::hyper::{Method, Request};
use self::hyper::header::{ContentLength, ContentType};
use super::api::*;
use super::datasource::csv::CsvRelation;
use super::rel::*;
use super::sql::ASTNode::*;
use super::sqltorel::*;
use super::parser::*;
use super::cluster::*;
use super::dataframe::*;
use super::functions::math::*;
use super::functions::geospatial::*;
#[derive(Debug,Clone)]
pub enum DFConfig {
Local { data_dir: String },
Remote { etcd: String },
}
#[derive(Debug)]
pub enum ExecutionError {
IoError(Error),
ParserError(ParserError),
Custom(String)
}
impl From<Error> for ExecutionError {
fn from(e: Error) -> Self {
ExecutionError::IoError(e)
}
}
impl From<String> for ExecutionError {
fn from(e: String) -> Self {
ExecutionError::Custom(e)
}
}
impl From<ParserError> for ExecutionError {
fn from(e: ParserError) -> Self {
ExecutionError::ParserError(e)
}
}
pub trait Batch {
fn col_count(&self) -> usize;
fn row_count(&self) -> usize;
fn column(&self, index: usize) -> &Rc<ColumnData>;
fn row_slice(&self, index: usize) -> Vec<Value>;
}
pub struct ColumnBatch {
pub columns: Vec<Rc<ColumnData>>
}
impl Batch for ColumnBatch {
fn col_count(&self) -> usize {
self.columns.len()
}
fn row_count(&self) -> usize {
self.columns[0].len()
}
fn column(&self, index: usize) -> &Rc<ColumnData> {
&self.columns[index]
}
fn row_slice(&self, index: usize) -> Vec<Value> {
self.columns.iter().map(|c| c.get_value(index)).collect()
}
}
#[derive(Debug,Clone)]
pub enum ColumnData {
BroadcastVariable(Value),
Boolean(Vec<bool>),
Float(Vec<f32>),
Double(Vec<f64>),
Int(Vec<i32>),
UnsignedInt(Vec<u32>),
Long(Vec<i64>),
UnsignedLong(Vec<u64>),
String(Vec<String>),
ComplexValue(Vec<ColumnData>)
}
impl ColumnData {
pub fn len(&self) -> usize {
match self {
&ColumnData::BroadcastVariable(_) => 1,
&ColumnData::Boolean(ref v) => v.len(),
&ColumnData::Float(ref v) => v.len(),
&ColumnData::Double(ref v) => v.len(),
&ColumnData::Int(ref v) => v.len(),
&ColumnData::UnsignedInt(ref v) => v.len(),
&ColumnData::Long(ref v) => v.len(),
&ColumnData::UnsignedLong(ref v) => v.len(),
&ColumnData::String(ref v) => v.len(),
&ColumnData::ComplexValue(ref v) => v[0].len(),
}
}
pub fn eq(&self, other: &ColumnData) -> Vec<bool> {
match (self, other) {
(&ColumnData::Float(ref l), &ColumnData::BroadcastVariable(Value::Float(b))) => l.iter().map(|a| a==&b).collect(),
(&ColumnData::Double(ref l), &ColumnData::BroadcastVariable(Value::Double(b))) => l.iter().map(|a| a==&b).collect(),
(&ColumnData::Int(ref l), &ColumnData::BroadcastVariable(Value::Int(b))) => l.iter().map(|a| a==&b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::BroadcastVariable(Value::UnsignedInt(b))) => l.iter().map(|a| a==&b).collect(),
(&ColumnData::Long(ref l), &ColumnData::BroadcastVariable(Value::Long(b))) => l.iter().map(|a| a==&b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::BroadcastVariable(Value::UnsignedLong(b))) => l.iter().map(|a| a==&b).collect(),
(&ColumnData::String(ref l), &ColumnData::BroadcastVariable(Value::String(ref b))) => l.iter().map(|a| a==b).collect(),
(&ColumnData::Float(ref l), &ColumnData::Float(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
(&ColumnData::Double(ref l), &ColumnData::Double(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
(&ColumnData::Int(ref l), &ColumnData::Int(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::UnsignedInt(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
(&ColumnData::Long(ref l), &ColumnData::Long(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::UnsignedLong(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
(&ColumnData::String(ref l), &ColumnData::String(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a==b).collect(),
_ => panic!(format!("ColumnData.eq() Type mismatch: {:?} vs {:?}", self, other))
}
}
pub fn not_eq(&self, other: &ColumnData) -> Vec<bool> {
match (self, other) {
(&ColumnData::Float(ref l), &ColumnData::BroadcastVariable(Value::Float(b))) => l.iter().map(|a| a!=&b).collect(),
(&ColumnData::Double(ref l), &ColumnData::BroadcastVariable(Value::Double(b))) => l.iter().map(|a| a!=&b).collect(),
(&ColumnData::Int(ref l), &ColumnData::BroadcastVariable(Value::Int(b))) => l.iter().map(|a| a!=&b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::BroadcastVariable(Value::UnsignedInt(b))) => l.iter().map(|a| a!=&b).collect(),
(&ColumnData::Long(ref l), &ColumnData::BroadcastVariable(Value::Long(b))) => l.iter().map(|a| a!=&b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::BroadcastVariable(Value::UnsignedLong(b))) => l.iter().map(|a| a!=&b).collect(),
(&ColumnData::String(ref l), &ColumnData::BroadcastVariable(Value::String(ref b))) => l.iter().map(|a| a!=b).collect(),
(&ColumnData::Float(ref l), &ColumnData::Float(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
(&ColumnData::Double(ref l), &ColumnData::Double(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
(&ColumnData::Int(ref l), &ColumnData::Int(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::UnsignedInt(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
(&ColumnData::Long(ref l), &ColumnData::Long(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::UnsignedLong(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
(&ColumnData::String(ref l), &ColumnData::String(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a!=b).collect(),
_ => panic!(format!("ColumnData.eq() Type mismatch: {:?} vs {:?}", self, other))
}
}
pub fn lt(&self, other: &ColumnData) -> Vec<bool> {
match (self, other) {
(&ColumnData::Float(ref l), &ColumnData::BroadcastVariable(Value::Float(b))) => l.iter().map(|a| a<&b).collect(),
(&ColumnData::Double(ref l), &ColumnData::BroadcastVariable(Value::Double(b))) => l.iter().map(|a| a<&b).collect(),
(&ColumnData::Int(ref l), &ColumnData::BroadcastVariable(Value::Int(b))) => l.iter().map(|a| a<&b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::BroadcastVariable(Value::UnsignedInt(b))) => l.iter().map(|a| a<&b).collect(),
(&ColumnData::Long(ref l), &ColumnData::BroadcastVariable(Value::Long(b))) => l.iter().map(|a| a<&b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::BroadcastVariable(Value::UnsignedLong(b))) => l.iter().map(|a| a<&b).collect(),
(&ColumnData::String(ref l), &ColumnData::BroadcastVariable(Value::String(ref b))) => l.iter().map(|a| a<b).collect(),
(&ColumnData::Float(ref l), &ColumnData::Float(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
(&ColumnData::Double(ref l), &ColumnData::Double(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
(&ColumnData::Int(ref l), &ColumnData::Int(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::UnsignedInt(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
(&ColumnData::Long(ref l), &ColumnData::Long(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::UnsignedLong(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
(&ColumnData::String(ref l), &ColumnData::String(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<b).collect(),
_ => panic!(format!("ColumnData.lt() Type mismatch: {:?} vs {:?}", self, other))
}
}
pub fn lt_eq(&self, other: &ColumnData) -> Vec<bool> {
match (self, other) {
(&ColumnData::Float(ref l), &ColumnData::BroadcastVariable(Value::Float(b))) => l.iter().map(|a| a<=&b).collect(),
(&ColumnData::Double(ref l), &ColumnData::BroadcastVariable(Value::Double(b))) => l.iter().map(|a| a<=&b).collect(),
(&ColumnData::Int(ref l), &ColumnData::BroadcastVariable(Value::Int(b))) => l.iter().map(|a| a<=&b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::BroadcastVariable(Value::UnsignedInt(b))) => l.iter().map(|a| a<=&b).collect(),
(&ColumnData::Long(ref l), &ColumnData::BroadcastVariable(Value::Long(b))) => l.iter().map(|a| a<=&b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::BroadcastVariable(Value::UnsignedLong(b))) => l.iter().map(|a| a<=&b).collect(),
(&ColumnData::String(ref l), &ColumnData::BroadcastVariable(Value::String(ref b))) => l.iter().map(|a| a<=b).collect(),
(&ColumnData::Float(ref l), &ColumnData::Float(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
(&ColumnData::Double(ref l), &ColumnData::Double(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
(&ColumnData::Int(ref l), &ColumnData::Int(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::UnsignedInt(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
(&ColumnData::Long(ref l), &ColumnData::Long(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::UnsignedLong(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
(&ColumnData::String(ref l), &ColumnData::String(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a<=b).collect(),
_ => panic!(format!("ColumnData.lt_eq() Type mismatch: {:?} vs {:?}", self, other))
}
}
pub fn gt(&self, other: &ColumnData) -> Vec<bool> {
match (self, other) {
(&ColumnData::Float(ref l), &ColumnData::BroadcastVariable(Value::Float(b))) => l.iter().map(|a| a>&b).collect(),
(&ColumnData::Double(ref l), &ColumnData::BroadcastVariable(Value::Double(b))) => l.iter().map(|a| a>&b).collect(),
(&ColumnData::Int(ref l), &ColumnData::BroadcastVariable(Value::Int(b))) => l.iter().map(|a| a>&b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::BroadcastVariable(Value::UnsignedInt(b))) => l.iter().map(|a| a>&b).collect(),
(&ColumnData::Long(ref l), &ColumnData::BroadcastVariable(Value::Long(b))) => l.iter().map(|a| a>&b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::BroadcastVariable(Value::UnsignedLong(b))) => l.iter().map(|a| a>&b).collect(),
(&ColumnData::String(ref l), &ColumnData::BroadcastVariable(Value::String(ref b))) => l.iter().map(|a| a>b).collect(),
(&ColumnData::Float(ref l), &ColumnData::Float(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
(&ColumnData::Double(ref l), &ColumnData::Double(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
(&ColumnData::Int(ref l), &ColumnData::Int(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::UnsignedInt(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
(&ColumnData::Long(ref l), &ColumnData::Long(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::UnsignedLong(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
(&ColumnData::String(ref l), &ColumnData::String(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>b).collect(),
_ => panic!(format!("ColumnData.gt() Type mismatch: {:?} vs {:?}", self, other))
}
}
pub fn gt_eq(&self, other: &ColumnData) -> Vec<bool> {
match (self, other) {
(&ColumnData::Float(ref l), &ColumnData::BroadcastVariable(Value::Float(b))) => l.iter().map(|a| a>=&b).collect(),
(&ColumnData::Double(ref l), &ColumnData::BroadcastVariable(Value::Double(b))) => l.iter().map(|a| a>=&b).collect(),
(&ColumnData::Int(ref l), &ColumnData::BroadcastVariable(Value::Int(b))) => l.iter().map(|a| a>=&b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::BroadcastVariable(Value::UnsignedInt(b))) => l.iter().map(|a| a>=&b).collect(),
(&ColumnData::Long(ref l), &ColumnData::BroadcastVariable(Value::Long(b))) => l.iter().map(|a| a>=&b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::BroadcastVariable(Value::UnsignedLong(b))) => l.iter().map(|a| a>=&b).collect(),
(&ColumnData::String(ref l), &ColumnData::BroadcastVariable(Value::String(ref b))) => l.iter().map(|a| a>=b).collect(),
(&ColumnData::Float(ref l), &ColumnData::Float(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
(&ColumnData::Double(ref l), &ColumnData::Double(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
(&ColumnData::Int(ref l), &ColumnData::Int(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
(&ColumnData::UnsignedInt(ref l), &ColumnData::UnsignedInt(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
(&ColumnData::Long(ref l), &ColumnData::Long(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
(&ColumnData::UnsignedLong(ref l), &ColumnData::UnsignedLong(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
(&ColumnData::String(ref l), &ColumnData::String(ref r)) => l.iter().zip(r.iter()).map(|(a,b)| a>=b).collect(),
_ => panic!(format!("ColumnData.gt_eq() Type mismatch: {:?} vs {:?}", self, other))
}
}
pub fn get_value(&self, index: usize) -> Value {
let v = match self {
&ColumnData::BroadcastVariable(ref v) => v.clone(),
&ColumnData::Boolean(ref v) => Value::Boolean(v[index]),
&ColumnData::Float(ref v) => Value::Float(v[index]),
&ColumnData::Double(ref v) => Value::Double(v[index]),
&ColumnData::Int(ref v) => Value::Int(v[index]),
&ColumnData::UnsignedInt(ref v) => Value::UnsignedInt(v[index]),
&ColumnData::Long(ref v) => Value::Long(v[index]),
&ColumnData::UnsignedLong(ref v) => Value::UnsignedLong(v[index]),
&ColumnData::String(ref v) => Value::String(v[index].clone()),
&ColumnData::ComplexValue(ref v) => {
let fields = v.iter().map(|field| field.get_value(index)).collect();
Value::ComplexValue(fields)
}
};
v
}
pub fn filter(&self, bools: &ColumnData) -> ColumnData {
match bools {
&ColumnData::Boolean(ref b) => match self {
&ColumnData::Boolean(ref v) => ColumnData::Boolean(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::Float(ref v) => ColumnData::Float(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::Double(ref v) => ColumnData::Double(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::Int(ref v) => ColumnData::Int(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::UnsignedInt(ref v) => ColumnData::UnsignedInt(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::Long(ref v) => ColumnData::Long(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::UnsignedLong(ref v) => ColumnData::UnsignedLong(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| *v).collect()),
&ColumnData::String(ref v) => ColumnData::String(v.iter().zip(b.iter()).filter(|&(_,f)| *f).map(|(v,_)| v.clone()).collect()),
_ => unimplemented!()
},
_ => panic!()
}
}
}
pub type CompiledExpr = Box<Fn(&Batch) -> Rc<ColumnData>>;
pub fn compile_expr(ctx: &ExecutionContext, expr: &Expr) -> Result<CompiledExpr, ExecutionError> {
match expr {
&Expr::Literal(ref lit) => {
let literal_value = lit.clone();
Ok(Box::new(move |_| {
Rc::new(ColumnData::BroadcastVariable(literal_value.clone()))
}))
}
&Expr::Column(index) => Ok(Box::new(move |batch: &Batch| {
(*batch.column(index)).clone()
})),
&Expr::BinaryExpr { ref left, ref op, ref right } => {
let left_expr = compile_expr(ctx,left)?;
let right_expr = compile_expr(ctx,right)?;
match op {
&Operator::Eq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch);
let right_values = right_expr(batch);
Rc::new(ColumnData::Boolean(left_values.eq(&right_values)))
})),
&Operator::NotEq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch);
let right_values = right_expr(batch);
Rc::new(ColumnData::Boolean(left_values.not_eq(&right_values)))
})),
&Operator::Lt => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch);
let right_values = right_expr(batch);
Rc::new(ColumnData::Boolean(left_values.lt(&right_values)))
})),
&Operator::LtEq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch);
let right_values = right_expr(batch);
Rc::new(ColumnData::Boolean(left_values.lt_eq(&right_values)))
})),
&Operator::Gt => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch);
let right_values = right_expr(batch);
Rc::new(ColumnData::Boolean(left_values.gt(&right_values)))
})),
&Operator::GtEq => Ok(Box::new(move |batch: &Batch| {
let left_values = left_expr(batch);
let right_values = right_expr(batch);
Rc::new(ColumnData::Boolean(left_values.gt_eq(&right_values)))
})),
_ => return Err(ExecutionError::Custom(format!("Unsupported binary operator '{:?}'", op)))
}
}
&Expr::Sort { ref expr, .. } => {
compile_expr(ctx, expr)
},
&Expr::ScalarFunction { ref name, ref args } => {
let compiled_args : Result<Vec<CompiledExpr>, ExecutionError> = args.iter()
.map(|e| compile_expr(ctx,e))
.collect();
let compiled_args_ok = compiled_args?;
let func = ctx.load_function_impl(name.as_ref())?;
Ok(Box::new(move |batch| {
let arg_values : Vec<Rc<ColumnData>> = compiled_args_ok.iter()
.map(|expr| expr(batch))
.collect();
func.execute(arg_values).unwrap()
}))
}
}
}
pub struct FilterRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: CompiledExpr
}
pub struct ProjectRelation {
schema: Schema,
input: Box<SimpleRelation>,
expr: Vec<CompiledExpr>
}
pub struct LimitRelation {
schema: Schema,
input: Box<SimpleRelation>,
limit: usize,
}
pub trait SimpleRelation {
fn scan<'a>(&'a mut self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Box<Batch>,ExecutionError>> + 'a>;
fn schema<'a>(&'a self) -> &'a Schema;
}
impl SimpleRelation for FilterRelation {
fn scan<'a>(&'a mut self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Box<Batch>, ExecutionError>> + 'a> {
let filter_expr = &self.expr;
Box::new(self.input.scan(ctx).map(move |b| {
match b {
Ok(ref batch) => {
let filter_eval: Rc<ColumnData> = (*filter_expr)(batch.as_ref());
let filtered_columns : Vec<Rc<ColumnData>> = (0 .. batch.col_count())
.map(move|column_index| { Rc::new(batch.column(column_index).filter(&filter_eval)) })
.collect();
let filtered_batch : Box<Batch> = Box::new(ColumnBatch { columns: filtered_columns });
Ok(filtered_batch)
}
_ => panic!()
}
}))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for ProjectRelation {
fn scan<'a>(&'a mut self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Box<Batch>, ExecutionError>> + 'a> {
let project_expr = &self.expr;
let projection_iter = self.input.scan(ctx).map(move|r| match r {
Ok(ref batch) => {
let projected_columns : Vec<Rc<ColumnData>> = project_expr.iter()
.map(|e| (*e)(batch.as_ref()))
.collect();
let projected_batch : Box<Batch> = Box::new(ColumnBatch { columns: projected_columns.clone() });
Ok(projected_batch)
},
Err(_) => r
});
Box::new(projection_iter)
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
impl SimpleRelation for LimitRelation {
fn scan<'a>(&'a mut self, ctx: &'a ExecutionContext) -> Box<Iterator<Item=Result<Box<Batch>, ExecutionError>> + 'a> {
Box::new(self.input.scan(ctx).take(self.limit))
}
fn schema<'a>(&'a self) -> &'a Schema {
&self.schema
}
}
#[derive(Debug,Clone,Serialize,Deserialize)]
pub enum PhysicalPlan {
Interactive { plan: Box<LogicalPlan> },
Write { plan: Box<LogicalPlan>, filename: String },
}
#[derive(Debug,Clone)]
pub enum ExecutionResult {
Unit,
Count(usize),
}
#[derive(Debug,Clone)]
pub struct ExecutionContext {
schemas: HashMap<String, Schema>,
functions: HashMap<String, FunctionMeta>,
config: DFConfig
}
impl ExecutionContext {
pub fn local(data_dir: String) -> Self {
ExecutionContext {
schemas: HashMap::new(),
functions: HashMap::new(),
config: DFConfig::Local { data_dir }
}
}
pub fn remote(etcd: String) -> Self {
ExecutionContext {
schemas: HashMap::new(),
functions: HashMap::new(),
config: DFConfig::Remote { etcd: etcd }
}
}
pub fn define_schema(&mut self, name: &str, schema: &Schema) {
self.schemas.insert(name.to_string(), schema.clone());
}
pub fn define_function(&mut self, func: &ScalarFunction) {
let fm = FunctionMeta {
name: func.name(),
args: func.args(),
return_type: func.return_type()
};
self.functions.insert(fm.name.to_lowercase(), fm);
}
pub fn create_logical_plan(&self, sql: &str) -> Result<Box<LogicalPlan>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql))?;
let query_planner = SqlToRel::new(self.schemas.clone());
Ok(query_planner.sql_to_rel(&ast)?)
}
pub fn sql(&mut self, sql: &str) -> Result<Box<DataFrame>, ExecutionError> {
let ast = Parser::parse_sql(String::from(sql))?;
match ast {
SQLCreateTable { name, columns } => {
let fields : Vec<Field> = columns.iter()
.map(|c| Field::new(&c.name, convert_data_type(&c.data_type), c.allow_null))
.collect();
let schema = Schema::new(fields);
self.define_schema(&name, &schema);
Ok(Box::new(DF { plan: Box::new(LogicalPlan::EmptyRelation) }))
},
_ => {
let query_planner = SqlToRel::new(self.schemas.clone());
let plan = query_planner.sql_to_rel(&ast)?;
Ok(Box::new(DF { plan: plan }))
}
}
}
pub fn load(&self, filename: &str, schema: &Schema) -> Result<Box<DataFrame>, ExecutionError> {
let plan = LogicalPlan::CsvFile { filename: filename.to_string(), schema: schema.clone() };
Ok(Box::new(DF { plan: Box::new(plan) }))
}
pub fn register_table(&mut self, name: String, schema: Schema) {
self.schemas.insert(name, schema);
}
pub fn create_execution_plan(&self, data_dir: String, plan: &LogicalPlan) -> Result<Box<SimpleRelation>,ExecutionError> {
match *plan {
LogicalPlan::EmptyRelation => {
Err(ExecutionError::Custom(String::from("empty relation is not implemented yet")))
},
LogicalPlan::TableScan { ref table_name, ref schema, .. } => {
let filename = format!("{}/{}.csv", data_dir, table_name);
let file = File::open(filename)?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
},
LogicalPlan::CsvFile { ref filename, ref schema } => {
let file = File::open(filename)?;
let rel = CsvRelation::open(file, schema.clone())?;
Ok(Box::new(rel))
},
LogicalPlan::Selection { ref expr, ref input, ref schema } => {
let input_rel = self.create_execution_plan(data_dir,input)?;
let rel = FilterRelation {
input: input_rel,
expr: compile_expr(&self, expr)?,
schema: schema.clone()
};
Ok(Box::new(rel))
},
LogicalPlan::Projection { ref expr, ref input, .. } => {
let input_rel = self.create_execution_plan(data_dir,&input)?;
let input_schema = input_rel.schema().clone();
let project_columns: Vec<Field> = expr.iter().map(|e| {
match e {
&Expr::Column(i) => input_schema.columns[i].clone(),
&Expr::ScalarFunction {ref name, .. } => Field {
name: name.clone(),
data_type: DataType::Double, nullable: true
},
_ => unimplemented!("Unsupported projection expression")
}
}).collect();
let project_schema = Schema { columns: project_columns };
let compiled_expr : Result<Vec<CompiledExpr>, ExecutionError> = expr.iter()
.map(|e| compile_expr(&self,e))
.collect();
let rel = ProjectRelation {
input: input_rel,
expr: compiled_expr?,
schema: project_schema,
};
Ok(Box::new(rel))
}
LogicalPlan::Limit { limit, ref input, ref schema, .. } => {
let input_rel = self.create_execution_plan(data_dir,input)?;
let rel = LimitRelation {
input: input_rel,
limit: limit,
schema: schema.clone()
};
Ok(Box::new(rel))
}
_ => unimplemented!()
}
}
fn load_function_impl(&self, function_name: &str) -> Result<Box<ScalarFunction>,ExecutionError> {
match function_name.to_lowercase().as_ref() {
"sqrt" => Ok(Box::new(SqrtFunction {})),
"st_point" => Ok(Box::new(STPointFunc {})),
"st_astext" => Ok(Box::new(STAsText {})),
_ => Err(ExecutionError::Custom(format!("Unknown function {}", function_name)))
}
}
pub fn udf(&self, name: &str, args: Vec<Expr>) -> Expr {
Expr::ScalarFunction { name: name.to_string(), args: args.clone() }
}
pub fn write(&self, df: Box<DataFrame>, filename: &str) -> Result<usize, DataFrameError> {
let physical_plan = PhysicalPlan::Write {
plan: df.plan().clone(),
filename: filename.to_string()
};
match self.execute(&physical_plan)? {
ExecutionResult::Count(count) => Ok(count),
_ => Err(DataFrameError::NotImplemented) }
}
pub fn execute(&self, physical_plan: &PhysicalPlan) -> Result<ExecutionResult, ExecutionError> {
match &self.config {
&DFConfig::Local { ref data_dir } => self.execute_local(physical_plan, data_dir.clone()),
&DFConfig::Remote { ref etcd } => self.execute_remote(physical_plan, etcd.clone())
}
}
fn execute_local(&self, physical_plan: &PhysicalPlan, data_dir: String) -> Result<ExecutionResult, ExecutionError> {
match physical_plan {
&PhysicalPlan::Interactive { .. } => {
Err(ExecutionError::Custom(format!("not implemented")))
}
&PhysicalPlan::Write { ref plan, ref filename} => {
let file = File::create(filename)?;
let mut writer = BufWriter::with_capacity(8*1024*1024,file);
let mut execution_plan = self.create_execution_plan(data_dir, plan)?;
let it = execution_plan.scan(self);
let mut count : usize = 0;
it.for_each(|t| {
match t {
Ok(ref batch) => {
for i in 0 .. batch.row_count() {
let row = batch.row_slice(i);
let csv = row.into_iter().map(|v| v.to_string()).collect::<Vec<String>>().join(",");
writer.write(&csv.into_bytes()).unwrap();
writer.write(b"\n").unwrap();
count += 1;
}
},
Err(e) => panic!(format!("Error processing row: {:?}", e)) }
});
Ok(ExecutionResult::Count(count))
}
}
}
fn execute_remote(&self, physical_plan: &PhysicalPlan, etcd: String) -> Result<ExecutionResult, ExecutionError> {
let workers = get_worker_list(&etcd);
match workers {
Ok(ref list) if list.len() > 0 => {
let worker_uri = format!("http://{}", list[0]);
match worker_uri.parse() {
Ok(uri) => {
let mut core = Core::new().unwrap();
let client = Client::new(&core.handle());
match serde_json::to_string(&physical_plan) {
Ok(json) => {
let mut req = Request::new(Method::Post, uri);
req.headers_mut().set(ContentType::json());
req.headers_mut().set(ContentLength(json.len() as u64));
req.set_body(json);
let post = client.request(req).and_then(|res| {
res.body().concat2()
});
match core.run(post) {
Ok(result) => {
let result = str::from_utf8(&result).unwrap();
println!("{}", result);
Ok(ExecutionResult::Unit)
}
Err(e) => Err(ExecutionError::Custom(format!("error: {}", e)))
}
}
Err(e) => Err(ExecutionError::Custom(format!("error: {}", e)))
}
}
Err(e) => Err(ExecutionError::Custom(format!("error: {}", e)))
}
}
Ok(_) => Err(ExecutionError::Custom(format!("No workers found in cluster"))),
Err(e) => Err(ExecutionError::Custom(format!("Failed to find a worker node: {}", e)))
}
}
}
pub struct DF {
pub plan: Box<LogicalPlan>
}
impl DataFrame for DF {
fn select(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Projection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { plan: Box::new(plan) }))
}
fn sort(&self, expr: Vec<Expr>) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Sort {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { plan: Box::new(plan) }))
}
fn filter(&self, expr: Expr) -> Result<Box<DataFrame>, DataFrameError> {
let plan = LogicalPlan::Selection {
expr: expr,
input: self.plan.clone(),
schema: self.plan.schema().clone()
};
Ok(Box::new(DF { plan: Box::new(plan) }))
}
fn col(&self, column_name: &str) -> Result<Expr, DataFrameError> {
match self.plan.schema().column(column_name) {
Some((i,_)) => Ok(Expr::Column(i)),
_ => Err(DataFrameError::InvalidColumn(column_name.to_string()))
}
}
fn schema(&self) -> Schema {
self.plan.schema().clone()
}
fn repartition(&self, _n: u32) -> Result<Box<DataFrame>, DataFrameError> {
unimplemented!()
}
fn plan(&self) -> Box<LogicalPlan> {
self.plan.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sqrt() {
let mut ctx = create_context();
ctx.define_function(&SqrtFunction {});
let df = ctx.sql(&"SELECT id, sqrt(id) FROM people").unwrap();
ctx.write(df,"_sqrt_out.csv").unwrap();
}
#[test]
fn test_sql_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_Point(lat, lng) FROM uk_cities").unwrap();
ctx.write(df,"_uk_cities_sql.csv").unwrap();
}
#[test]
fn test_df_udf_udt() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let func_expr = ctx.udf("ST_Point",vec![
df.col("lat").unwrap(),
df.col("lng").unwrap()]
);
let df2 = df.select(vec![func_expr]).unwrap();
ctx.write(df2,"_uk_cities_df.csv").unwrap();
}
#[test]
fn test_filter() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let schema = Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]);
let df = ctx.load("test/data/uk_cities.csv", &schema).unwrap();
let df2 = df.filter(Expr::BinaryExpr {
left: Box::new(Expr::Column(1)), op: Operator::Gt,
right: Box::new(Expr::Literal(Value::Double(52.0)))
}).unwrap();
ctx.write(df2,"_uk_cities_filtered_gt_52.csv").unwrap();
}
#[test]
fn test_chaining_functions() {
let mut ctx = create_context();
ctx.define_function(&STPointFunc {});
let df = ctx.sql(&"SELECT ST_AsText(ST_Point(lat, lng)) FROM uk_cities").unwrap();
ctx.write(df,"_uk_cities_wkt.csv").unwrap();
}
fn create_context() -> ExecutionContext {
let mut ctx = ExecutionContext::local("./test/data".to_string());
ctx.define_schema("people", &Schema::new(vec![
Field::new("id", DataType::UnsignedLong, false),
Field::new("name", DataType::String, false)]));
ctx.define_schema("uk_cities", &Schema::new(vec![
Field::new("city", DataType::String, false),
Field::new("lat", DataType::Double, false),
Field::new("lng", DataType::Double, false)]));
ctx
}
}