extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate serde;
extern crate serde_json;
use std::str;
use std::time::Instant;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;
use hyper::{Method, Request};
use hyper::header::{ContentLength, ContentType};
extern crate rprompt;
extern crate datafusion;
use datafusion::exec::*;
use datafusion::parser::*;
use datafusion::sql::ASTNode::*;
fn main() {
println!("DataFusion Console");
let mut console = Console { ctx: ExecutionContext::new() };
loop {
match rprompt::prompt_reply_stdout("$ ") {
Ok(command) => match command.as_ref() {
"exit" | "quit" => break,
_ => console.execute(&command)
},
Err(e) => println!("Error parsing command: {:?}", e)
}
}
}
struct Console {
ctx: ExecutionContext
}
impl Console {
fn execute(&mut self, sql: &str) {
println!("Executing query ...");
let timer = Instant::now();
match Parser::parse_sql(String::from(sql)) {
Ok(ast) => match ast {
SQLCreateTable { .. } => {
self.ctx.sql(&sql).unwrap();
println!("Registered schema with execution context");
()
},
_ => self.execute_in_worker(&sql)
}
Err(e) => println!("Error: {:?}", e)
}
let elapsed = timer.elapsed();
let elapsed_seconds = elapsed.as_secs() as f64
+ elapsed.subsec_nanos() as f64 / 1000000000.0;
println!("Query executed in {} seconds", elapsed_seconds);
}
fn execute_in_worker(&mut self, sql: &str) {
let mut core = Core::new().unwrap();
let client = Client::new(&core.handle());
let uri = "http://localhost:8080".parse().unwrap();
match self.ctx.create_logical_plan(&sql) {
Ok(logical_plan) => {
let execution_plan = ExecutionPlan::Interactive { plan: logical_plan };
match serde_json::to_string(&execution_plan) {
Ok(json) => {
let mut req = Request::new(Method::Post, uri);
req.headers_mut().set(ContentType::json());
req.headers_mut().set(ContentLength(json.len() as u64));
req.set_body(json);
let post = client.request(req).and_then(|res| {
res.body().concat2()
});
match core.run(post) {
Ok(result) => {
let result = str::from_utf8(&result).unwrap();
println!("{}", result);
}
Err(e) => println!("Failed to serialize plan: {:?}", e)
}
}
Err(e) => println!("Failed to serialize plan: {:?}", e)
}
}
Err(e) => println!("Query failed: {:?}", e)
}
}
}