use arrow_schema::{Field, FieldRef};
use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, Result, SchemaError, TableReference};
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
pub relation: Option<TableReference>,
pub name: String,
}
impl Column {
pub fn new(
relation: Option<impl Into<TableReference>>,
name: impl Into<String>,
) -> Self {
Self {
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
pub fn new_unqualified(name: impl Into<String>) -> Self {
Self {
relation: None,
name: name.into(),
}
}
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
name: name.into(),
}
}
fn from_idents(idents: &mut Vec<String>) -> Option<Self> {
let (relation, name) = match idents.len() {
1 => (None, idents.remove(0)),
2 => (
Some(TableReference::Bare {
table: idents.remove(0).into(),
}),
idents.remove(0),
),
3 => (
Some(TableReference::Partial {
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
idents.remove(0),
),
4 => (
Some(TableReference::Full {
catalog: idents.remove(0).into(),
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
idents.remove(0),
),
_ => return None,
};
Some(Self { relation, name })
}
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, false))
.unwrap_or_else(|| Self {
relation: None,
name: flat_name,
})
}
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, true))
.unwrap_or_else(|| Self {
relation: None,
name: flat_name,
})
}
pub fn name(&self) -> &str {
&self.name
}
pub fn flat_name(&self) -> String {
match &self.relation {
Some(r) => format!("{}.{}", r, self.name),
None => self.name.clone(),
}
}
pub fn quoted_flat_name(&self) -> String {
match &self.relation {
Some(r) => {
format!(
"{}.{}",
r.to_quoted_string(),
quote_identifier(self.name.as_str())
)
}
None => quote_identifier(&self.name).to_string(),
}
}
pub fn normalize_with_schemas_and_ambiguity_check(
self,
schemas: &[&[&DFSchema]],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}
for schema_level in schemas {
let qualified_fields = schema_level
.iter()
.flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
match qualified_fields.len() {
0 => continue,
1 => return Ok(Column::from(qualified_fields[0])),
_ => {
let columns = schema_level
.iter()
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
for using_col in using_columns {
let all_matched = columns.iter().all(|c| using_col.contains(c));
if all_matched {
return Ok(columns[0].clone());
}
}
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
});
}
}
}
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(self),
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.columns())
.collect(),
})
}
}
impl From<&str> for Column {
fn from(c: &str) -> Self {
Self::from_qualified_name(c)
}
}
impl From<&String> for Column {
fn from(c: &String) -> Self {
Self::from_qualified_name(c)
}
}
impl From<String> for Column {
fn from(c: String) -> Self {
Self::from_qualified_name(c)
}
}
impl From<(Option<&TableReference>, &Field)> for Column {
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
Self::new(relation.cloned(), field.name())
}
}
impl From<(Option<&TableReference>, &FieldRef)> for Column {
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
Self::new(relation.cloned(), field.name())
}
}
impl FromStr for Column {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(s.into())
}
}
impl fmt::Display for Column {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.flat_name())
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow_schema::SchemaBuilder;
use std::sync::Arc;
fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(
names
.iter()
.map(|f| Field::new(*f, DataType::Boolean, true)),
);
let schema = Arc::new(schema_builder.finish());
DFSchema::try_from_qualified_schema(qualifier, &schema)
}
#[test]
fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
let col = Column::new(Some("t1"), "a");
let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
assert_eq!(col, Column::new(Some("t1"), "a"));
let col = Column::from_name("a");
let col = col.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema2], &[&schema3]],
&[],
)?;
assert_eq!(col, Column::new(Some("t1"), "a"));
let col = Column::from_name("e");
let col = col.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema2], &[&schema3]],
&[],
)?;
assert_eq!(col, Column::new(Some("t3"), "e"));
let mut using_columns = HashSet::new();
using_columns.insert(Column::new(Some("t1"), "a"));
using_columns.insert(Column::new(Some("t3"), "a"));
let col = Column::from_name("a");
let col = col.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema3], &[&schema2]],
&[using_columns],
)?;
assert_eq!(col, Column::new(Some("t1"), "a"));
let col = Column::from_name("z");
let err = col
.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema2], &[&schema3]],
&[],
)
.expect_err("should've failed to find field");
let expected = r#"Schema error: No field named z. Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e."#;
assert_eq!(err.strip_backtrace(), expected);
let col = Column::from_name("a");
let err = col
.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema3], &[&schema2]],
&[],
)
.expect_err("should've found ambiguous field");
let expected = "Schema error: Ambiguous reference to unqualified field a";
assert_eq!(err.strip_backtrace(), expected);
Ok(())
}
}