use crate::{DFSchema, DataFusionError, Result, SchemaError};
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
pub relation: Option<String>,
pub name: String,
}
impl Column {
pub fn new(relation: Option<impl Into<String>>, name: impl Into<String>) -> Self {
Self {
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
name: name.into(),
}
}
pub fn from_qualified_name(flat_name: &str) -> Self {
use sqlparser::tokenizer::Token;
let dialect = sqlparser::dialect::GenericDialect {};
let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name);
if let Ok(tokens) = tokenizer.tokenize() {
if let [Token::Word(relation), Token::Period, Token::Word(name)] =
tokens.as_slice()
{
return Column {
relation: Some(relation.value.clone()),
name: name.value.clone(),
};
}
}
Column {
relation: None,
name: String::from(flat_name),
}
}
pub fn flat_name(&self) -> String {
match &self.relation {
Some(r) => format!("{}.{}", r, self.name),
None => self.name.clone(),
}
}
pub fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}
for schema in schemas {
let fields = schema.fields_with_unqualified_name(&self.name);
match fields.len() {
0 => continue,
1 => {
return Ok(fields[0].qualified_column());
}
_ => {
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
if all_matched {
return Ok(fields[0].qualified_column());
}
}
}
}
}
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Column::new(self.relation.clone(), self.name),
valid_fields: Some(
schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
),
}))
}
}
impl From<&str> for Column {
fn from(c: &str) -> Self {
Self::from_qualified_name(c)
}
}
impl FromStr for Column {
type Err = Infallible;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(s.into())
}
}
impl fmt::Display for Column {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.flat_name())
}
}