use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
pub relation: Option<OwnedTableReference>,
pub name: String,
}
impl Column {
pub fn new(
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Self {
Self {
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
pub fn new_unqualified(name: impl Into<String>) -> Self {
Self {
relation: None,
name: name.into(),
}
}
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
name: name.into(),
}
}
fn from_idents(idents: &mut Vec<String>) -> Option<Self> {
let (relation, name) = match idents.len() {
1 => (None, idents.remove(0)),
2 => (
Some(OwnedTableReference::Bare {
table: idents.remove(0).into(),
}),
idents.remove(0),
),
3 => (
Some(OwnedTableReference::Partial {
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
idents.remove(0),
),
4 => (
Some(OwnedTableReference::Full {
catalog: idents.remove(0).into(),
schema: idents.remove(0).into(),
table: idents.remove(0).into(),
}),
idents.remove(0),
),
_ => return None,
};
Some(Self { relation, name })
}
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name: &str = &flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(flat_name, false))
.unwrap_or_else(|| Self {
relation: None,
name: flat_name.to_owned(),
})
}
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
let flat_name: &str = &flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(flat_name, true))
.unwrap_or_else(|| Self {
relation: None,
name: flat_name.to_owned(),
})
}
pub fn flat_name(&self) -> String {
match &self.relation {
Some(r) => format!("{}.{}", r, self.name),
None => self.name.clone(),
}
}
pub fn quoted_flat_name(&self) -> String {
match &self.relation {
Some(r) => {
format!(
"{}.{}",
r.to_quoted_string(),
quote_identifier(self.name.as_str())
)
}
None => quote_identifier(&self.name).to_string(),
}
}
#[deprecated(
since = "20.0.0",
note = "use normalize_with_schemas_and_ambiguity_check instead"
)]
pub fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}
for schema in schemas {
let fields = schema.fields_with_unqualified_name(&self.name);
match fields.len() {
0 => continue,
1 => {
return Ok(fields[0].qualified_column());
}
_ => {
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
if all_matched {
return Ok(fields[0].qualified_column());
}
}
}
}
}
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
}
pub fn normalize_with_schemas_and_ambiguity_check(
self,
schemas: &[&[&DFSchema]],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}
for schema_level in schemas {
let fields = schema_level
.iter()
.flat_map(|s| s.fields_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
match fields.len() {
0 => continue,
1 => return Ok(fields[0].qualified_column()),
_ => {
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
if all_matched {
return Ok(fields[0].qualified_column());
}
}
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
},
));
}
}
}
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(self),
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
}
}
impl From<&str> for Column {
fn from(c: &str) -> Self {
Self::from_qualified_name(c)
}
}
impl From<&String> for Column {
fn from(c: &String) -> Self {
Self::from_qualified_name(c)
}
}
impl From<String> for Column {
fn from(c: String) -> Self {
Self::from_qualified_name(c)
}
}
impl FromStr for Column {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(s.into())
}
}
impl fmt::Display for Column {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.flat_name())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::DFField;
use arrow::datatypes::DataType;
use std::collections::HashMap;
fn create_schema(names: &[(Option<&str>, &str)]) -> Result<DFSchema> {
let fields = names
.iter()
.map(|(qualifier, name)| {
DFField::new(
qualifier.to_owned().map(|s| s.to_string()),
name,
DataType::Boolean,
true,
)
})
.collect::<Vec<_>>();
DFSchema::new_with_metadata(fields, HashMap::new())
}
#[test]
fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
let schema1 = create_schema(&[(Some("t1"), "a"), (Some("t1"), "b")])?;
let schema2 = create_schema(&[(Some("t2"), "c"), (Some("t2"), "d")])?;
let schema3 = create_schema(&[
(Some("t3"), "a"),
(Some("t3"), "b"),
(Some("t3"), "c"),
(Some("t3"), "d"),
(Some("t3"), "e"),
])?;
let col = Column::new(Some("t1"), "a");
let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
assert_eq!(col, Column::new(Some("t1"), "a"));
let col = Column::from_name("a");
let col = col.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema2], &[&schema3]],
&[],
)?;
assert_eq!(col, Column::new(Some("t1"), "a"));
let col = Column::from_name("e");
let col = col.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema2], &[&schema3]],
&[],
)?;
assert_eq!(col, Column::new(Some("t3"), "e"));
let mut using_columns = HashSet::new();
using_columns.insert(Column::new(Some("t1"), "a"));
using_columns.insert(Column::new(Some("t3"), "a"));
let col = Column::from_name("a");
let col = col.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema3], &[&schema2]],
&[using_columns],
)?;
assert_eq!(col, Column::new(Some("t1"), "a"));
let col = Column::from_name("z");
let err = col
.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema2], &[&schema3]],
&[],
)
.expect_err("should've failed to find field");
let expected = r#"Schema error: No field named z. Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e."#;
assert_eq!(err.to_string(), expected);
let col = Column::from_name("a");
let err = col
.normalize_with_schemas_and_ambiguity_check(
&[&[&schema1, &schema3], &[&schema2]],
&[],
)
.expect_err("should've found ambiguous field");
let expected = "Schema error: Ambiguous reference to unqualified field a";
assert_eq!(err.to_string(), expected);
Ok(())
}
}