use datafusion_expr::logical_plan::FileType;
use sqlparser::{
ast::{ColumnDef, ColumnOptionDef, Statement as SQLStatement, TableConstraint},
dialect::{keywords::Keyword, Dialect, GenericDialect},
parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
};
use std::collections::VecDeque;
macro_rules! parser_err {
($MSG:expr) => {
Err(ParserError::ParserError($MSG.to_string()))
};
}
fn parse_file_type(s: &str) -> Result<FileType, ParserError> {
match s.to_uppercase().as_str() {
"PARQUET" => Ok(FileType::Parquet),
"NDJSON" => Ok(FileType::NdJson),
"CSV" => Ok(FileType::CSV),
"AVRO" => Ok(FileType::Avro),
other => Err(ParserError::ParserError(format!(
"expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}",
other
))),
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CreateExternalTable {
pub name: String,
pub columns: Vec<ColumnDef>,
pub file_type: FileType,
pub has_header: bool,
pub delimiter: char,
pub location: String,
pub table_partition_cols: Vec<String>,
pub if_not_exists: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DescribeTable {
pub table_name: String,
}
#[derive(Debug, Clone, PartialEq)]
pub enum Statement {
Statement(Box<SQLStatement>),
CreateExternalTable(CreateExternalTable),
DescribeTable(DescribeTable),
}
pub struct DFParser<'a> {
parser: Parser<'a>,
}
impl<'a> DFParser<'a> {
pub fn new(sql: &str) -> Result<Self, ParserError> {
let dialect = &GenericDialect {};
DFParser::new_with_dialect(sql, dialect)
}
pub fn new_with_dialect(
sql: &str,
dialect: &'a dyn Dialect,
) -> Result<Self, ParserError> {
let mut tokenizer = Tokenizer::new(dialect, sql);
let tokens = tokenizer.tokenize()?;
Ok(DFParser {
parser: Parser::new(tokens, dialect),
})
}
pub fn parse_sql(sql: &str) -> Result<VecDeque<Statement>, ParserError> {
let dialect = &GenericDialect {};
DFParser::parse_sql_with_dialect(sql, dialect)
}
pub fn parse_sql_with_dialect(
sql: &str,
dialect: &dyn Dialect,
) -> Result<VecDeque<Statement>, ParserError> {
let mut parser = DFParser::new_with_dialect(sql, dialect)?;
let mut stmts = VecDeque::new();
let mut expecting_statement_delimiter = false;
loop {
while parser.parser.consume_token(&Token::SemiColon) {
expecting_statement_delimiter = false;
}
if parser.parser.peek_token() == Token::EOF {
break;
}
if expecting_statement_delimiter {
return parser.expected("end of statement", parser.parser.peek_token());
}
let statement = parser.parse_statement()?;
stmts.push_back(statement);
expecting_statement_delimiter = true;
}
Ok(stmts)
}
fn expected<T>(&self, expected: &str, found: Token) -> Result<T, ParserError> {
parser_err!(format!("Expected {}, found: {}", expected, found))
}
pub fn parse_statement(&mut self) -> Result<Statement, ParserError> {
match self.parser.peek_token() {
Token::Word(w) => {
match w.keyword {
Keyword::CREATE => {
self.parser.next_token();
self.parse_create()
}
Keyword::DESCRIBE => {
self.parser.next_token();
self.parse_describe()
}
_ => {
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
}
}
}
_ => {
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
}
}
}
pub fn parse_describe(&mut self) -> Result<Statement, ParserError> {
let table_name = self.parser.parse_object_name()?;
let des = DescribeTable {
table_name: table_name.to_string(),
};
Ok(Statement::DescribeTable(des))
}
pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
self.parse_create_external_table()
} else {
Ok(Statement::Statement(Box::from(self.parser.parse_create()?)))
}
}
fn parse_partitions(&mut self) -> Result<Vec<String>, ParserError> {
let mut partitions: Vec<String> = vec![];
if !self.parser.consume_token(&Token::LParen)
|| self.parser.consume_token(&Token::RParen)
{
return Ok(partitions);
}
loop {
if let Token::Word(_) = self.parser.peek_token() {
let identifier = self.parser.parse_identifier()?;
partitions.push(identifier.to_string());
} else {
return self.expected("partition name", self.parser.peek_token());
}
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
break;
} else if !comma {
return self.expected(
"',' or ')' after partition definition",
self.parser.peek_token(),
);
}
}
Ok(partitions)
}
fn parse_columns(
&mut self,
) -> Result<(Vec<ColumnDef>, Vec<TableConstraint>), ParserError> {
let mut columns = vec![];
let mut constraints = vec![];
if !self.parser.consume_token(&Token::LParen)
|| self.parser.consume_token(&Token::RParen)
{
return Ok((columns, constraints));
}
loop {
if let Some(constraint) = self.parser.parse_optional_table_constraint()? {
constraints.push(constraint);
} else if let Token::Word(_) = self.parser.peek_token() {
let column_def = self.parse_column_def()?;
columns.push(column_def);
} else {
return self.expected(
"column name or constraint definition",
self.parser.peek_token(),
);
}
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
break;
} else if !comma {
return self.expected(
"',' or ')' after column definition",
self.parser.peek_token(),
);
}
}
Ok((columns, constraints))
}
fn parse_column_def(&mut self) -> Result<ColumnDef, ParserError> {
let name = self.parser.parse_identifier()?;
let data_type = self.parser.parse_data_type()?;
let collation = if self.parser.parse_keyword(Keyword::COLLATE) {
Some(self.parser.parse_object_name()?)
} else {
None
};
let mut options = vec![];
loop {
if self.parser.parse_keyword(Keyword::CONSTRAINT) {
let name = Some(self.parser.parse_identifier()?);
if let Some(option) = self.parser.parse_optional_column_option()? {
options.push(ColumnOptionDef { name, option });
} else {
return self.expected(
"constraint details after CONSTRAINT <name>",
self.parser.peek_token(),
);
}
} else if let Some(option) = self.parser.parse_optional_column_option()? {
options.push(ColumnOptionDef { name: None, option });
} else {
break;
};
}
Ok(ColumnDef {
name,
data_type,
collation,
options,
})
}
fn parse_create_external_table(&mut self) -> Result<Statement, ParserError> {
self.parser.expect_keyword(Keyword::TABLE)?;
let if_not_exists =
self.parser
.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let table_name = self.parser.parse_object_name()?;
let (columns, _) = self.parse_columns()?;
self.parser
.expect_keywords(&[Keyword::STORED, Keyword::AS])?;
let file_type = self.parse_file_format()?;
let has_header = self.parse_csv_has_header();
let has_delimiter = self.parse_has_delimiter();
let delimiter = match has_delimiter {
true => self.parse_delimiter()?,
false => ',',
};
let table_partition_cols = if self.parse_has_partition() {
self.parse_partitions()?
} else {
vec![]
};
self.parser.expect_keyword(Keyword::LOCATION)?;
let location = self.parser.parse_literal_string()?;
let create = CreateExternalTable {
name: table_name.to_string(),
columns,
file_type,
has_header,
delimiter,
location,
table_partition_cols,
if_not_exists,
};
Ok(Statement::CreateExternalTable(create))
}
fn parse_file_format(&mut self) -> Result<FileType, ParserError> {
match self.parser.next_token() {
Token::Word(w) => parse_file_type(&w.value),
unexpected => self.expected("one of PARQUET, NDJSON, or CSV", unexpected),
}
}
fn consume_token(&mut self, expected: &Token) -> bool {
let token = self.parser.peek_token().to_string().to_uppercase();
let token = Token::make_keyword(&token);
if token == *expected {
self.parser.next_token();
true
} else {
false
}
}
fn parse_csv_has_header(&mut self) -> bool {
self.consume_token(&Token::make_keyword("WITH"))
& self.consume_token(&Token::make_keyword("HEADER"))
& self.consume_token(&Token::make_keyword("ROW"))
}
fn parse_has_delimiter(&mut self) -> bool {
self.consume_token(&Token::make_keyword("DELIMITER"))
}
fn parse_delimiter(&mut self) -> Result<char, ParserError> {
let token = self.parser.parse_literal_string()?;
match token.len() {
1 => Ok(token.chars().next().unwrap()),
_ => Err(ParserError::TokenizerError(
"Delimiter must be a single char".to_string(),
)),
}
}
fn parse_has_partition(&mut self) -> bool {
self.consume_token(&Token::make_keyword("PARTITIONED"))
& self.consume_token(&Token::make_keyword("BY"))
}
}
#[cfg(test)]
mod tests {
use super::*;
use sqlparser::ast::{DataType, Ident};
fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> {
let statements = DFParser::parse_sql(sql)?;
assert_eq!(
statements.len(),
1,
"Expected to parse exactly one statement"
);
assert_eq!(statements[0], expected);
Ok(())
}
fn expect_parse_error(sql: &str, expected_error: &str) {
match DFParser::parse_sql(sql) {
Ok(statements) => {
panic!(
"Expected parse error for '{}', but was successful: {:?}",
sql, statements
);
}
Err(e) => {
let error_message = e.to_string();
assert!(
error_message.contains(expected_error),
"Expected error '{}' not found in actual error '{}'",
expected_error,
error_message
);
}
}
}
fn make_column_def(name: impl Into<String>, data_type: DataType) -> ColumnDef {
ColumnDef {
name: Ident {
value: name.into(),
quote_style: None,
},
data_type,
collation: None,
options: vec![],
}
}
#[test]
fn create_external_table() -> Result<(), ParserError> {
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'";
let display = None;
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: false,
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV DELIMITER '|' LOCATION 'foo.csv'";
let display = None;
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: false,
delimiter: '|',
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'";
let display = None;
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: false,
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
let sqls = vec![
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH HEADER ROW LOCATION 'foo.csv'",
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV with header row LOCATION 'foo.csv'"
];
for sql in sqls {
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: true,
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
}
let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'";
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![],
file_type: FileType::Parquet,
has_header: false,
delimiter: ',',
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
let sql = "CREATE EXTERNAL TABLE t STORED AS parqueT LOCATION 'foo.parquet'";
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![],
file_type: FileType::Parquet,
has_header: false,
delimiter: ',',
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
let sql = "CREATE EXTERNAL TABLE t STORED AS AVRO LOCATION 'foo.avro'";
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![],
file_type: FileType::Avro,
has_header: false,
delimiter: ',',
location: "foo.avro".into(),
table_partition_cols: vec![],
if_not_exists: false,
});
expect_parse_ok(sql, expected)?;
let sql =
"CREATE EXTERNAL TABLE IF NOT EXISTS t STORED AS PARQUET LOCATION 'foo.parquet'";
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![],
file_type: FileType::Parquet,
has_header: false,
delimiter: ',',
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: true,
});
expect_parse_ok(sql, expected)?;
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'";
expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV");
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");
Ok(())
}
}