#[cfg(feature = "backtrace")]
use std::backtrace::{Backtrace, BacktraceStatus};
use std::borrow::Cow;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
use std::sync::Arc;
use crate::utils::quote_identifier;
use crate::{Column, DFSchema, TableReference};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
#[cfg(feature = "parquet")]
use parquet::errors::ParquetError;
use sqlparser::parser::ParserError;
pub type Result<T, E = DataFusionError> = result::Result<T, E>;
pub type SharedResult<T> = result::Result<T, Arc<DataFusionError>>;
pub type GenericError = Box<dyn Error + Send + Sync>;
#[derive(Debug)]
pub enum DataFusionError {
ArrowError(ArrowError, Option<String>),
#[cfg(feature = "parquet")]
ParquetError(ParquetError),
#[cfg(feature = "avro")]
AvroError(AvroError),
#[cfg(feature = "object_store")]
ObjectStore(object_store::Error),
IoError(io::Error),
SQL(ParserError, Option<String>),
NotImplemented(String),
Internal(String),
Plan(String),
Configuration(String),
SchemaError(SchemaError, Box<Option<String>>),
Execution(String),
ResourcesExhausted(String),
External(GenericError),
Context(String, Box<DataFusionError>),
Substrait(String),
}
#[macro_export]
macro_rules! context {
($desc:expr, $err:expr) => {
$err.context(format!("{} at {}:{}", $desc, file!(), line!()))
};
}
#[derive(Debug)]
pub enum SchemaError {
AmbiguousReference { field: Column },
DuplicateQualifiedField {
qualifier: Box<TableReference>,
name: String,
},
DuplicateUnqualifiedField { name: String },
FieldNotFound {
field: Box<Column>,
valid_fields: Vec<Column>,
},
}
impl Display for SchemaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::FieldNotFound {
field,
valid_fields,
} => {
write!(f, "No field named {}", field.quoted_flat_name())?;
if !valid_fields.is_empty() {
write!(
f,
". Valid fields are {}",
valid_fields
.iter()
.map(|field| field.quoted_flat_name())
.collect::<Vec<String>>()
.join(", ")
)?;
}
write!(f, ".")
}
Self::DuplicateQualifiedField { qualifier, name } => {
write!(
f,
"Schema contains duplicate qualified field name {}.{}",
qualifier.to_quoted_string(),
quote_identifier(name)
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
"Schema contains duplicate unqualified field name {}",
quote_identifier(name)
)
}
Self::AmbiguousReference { field } => {
if field.relation.is_some() {
write!(
f,
"Schema contains qualified field name {} and unqualified field name {} which would be ambiguous",
field.quoted_flat_name(),
quote_identifier(&field.name)
)
} else {
write!(
f,
"Ambiguous reference to unqualified field {}",
field.quoted_flat_name()
)
}
}
}
}
}
impl Error for SchemaError {}
impl From<std::fmt::Error> for DataFusionError {
fn from(_e: std::fmt::Error) -> Self {
DataFusionError::Execution("Fail to format".to_string())
}
}
impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
}
}
impl From<ArrowError> for DataFusionError {
fn from(e: ArrowError) -> Self {
DataFusionError::ArrowError(e, None)
}
}
impl From<DataFusionError> for ArrowError {
fn from(e: DataFusionError) -> Self {
match e {
DataFusionError::ArrowError(e, _) => e,
DataFusionError::External(e) => ArrowError::ExternalError(e),
other => ArrowError::ExternalError(Box::new(other)),
}
}
}
#[cfg(feature = "parquet")]
impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
DataFusionError::ParquetError(e)
}
}
#[cfg(feature = "avro")]
impl From<AvroError> for DataFusionError {
fn from(e: AvroError) -> Self {
DataFusionError::AvroError(e)
}
}
#[cfg(feature = "object_store")]
impl From<object_store::Error> for DataFusionError {
fn from(e: object_store::Error) -> Self {
DataFusionError::ObjectStore(e)
}
}
#[cfg(feature = "object_store")]
impl From<object_store::path::Error> for DataFusionError {
fn from(e: object_store::path::Error) -> Self {
DataFusionError::ObjectStore(e.into())
}
}
impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e, None)
}
}
impl From<GenericError> for DataFusionError {
fn from(err: GenericError) -> Self {
DataFusionError::External(err)
}
}
impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let error_prefix = self.error_prefix();
let message = self.message();
write!(f, "{error_prefix}{message}")
}
}
impl Error for DataFusionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DataFusionError::ArrowError(e, _) => Some(e),
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(e) => Some(e),
#[cfg(feature = "avro")]
DataFusionError::AvroError(e) => Some(e),
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(e) => Some(e),
DataFusionError::IoError(e) => Some(e),
DataFusionError::SQL(e, _) => Some(e),
DataFusionError::NotImplemented(_) => None,
DataFusionError::Internal(_) => None,
DataFusionError::Configuration(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e, _) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
}
}
}
impl From<DataFusionError> for io::Error {
fn from(e: DataFusionError) -> Self {
io::Error::new(io::ErrorKind::Other, e)
}
}
impl DataFusionError {
const BACK_TRACE_SEP: &'static str = "\n\nbacktrace: ";
pub fn find_root(&self) -> &Self {
let mut last_datafusion_error = self;
let mut root_error: &dyn Error = self;
while let Some(source) = root_error.source() {
root_error = source;
if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
last_datafusion_error = e;
} else if let Some(e) = root_error.downcast_ref::<Arc<DataFusionError>>() {
last_datafusion_error = e.as_ref();
}
}
last_datafusion_error
}
pub fn context(self, description: impl Into<String>) -> Self {
Self::Context(description.into(), Box::new(self))
}
pub fn strip_backtrace(&self) -> String {
self.to_string()
.split(Self::BACK_TRACE_SEP)
.collect::<Vec<&str>>()
.first()
.unwrap_or(&"")
.to_string()
}
#[inline(always)]
pub fn get_back_trace() -> String {
#[cfg(feature = "backtrace")]
{
let back_trace = Backtrace::capture();
if back_trace.status() == BacktraceStatus::Captured {
return format!("{}{}", Self::BACK_TRACE_SEP, back_trace);
}
"".to_owned()
}
#[cfg(not(feature = "backtrace"))]
"".to_owned()
}
fn error_prefix(&self) -> &'static str {
match self {
DataFusionError::ArrowError(_, _) => "Arrow error: ",
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(_) => "Parquet error: ",
#[cfg(feature = "avro")]
DataFusionError::AvroError(_) => "Avro error: ",
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(_) => "Object Store error: ",
DataFusionError::IoError(_) => "IO error: ",
DataFusionError::SQL(_, _) => "SQL error: ",
DataFusionError::NotImplemented(_) => "This feature is not implemented: ",
DataFusionError::Internal(_) => "Internal error: ",
DataFusionError::Plan(_) => "Error during planning: ",
DataFusionError::Configuration(_) => "Invalid or Unsupported Configuration: ",
DataFusionError::SchemaError(_, _) => "Schema error: ",
DataFusionError::Execution(_) => "Execution error: ",
DataFusionError::ResourcesExhausted(_) => "Resources exhausted: ",
DataFusionError::External(_) => "External error: ",
DataFusionError::Context(_, _) => "",
DataFusionError::Substrait(_) => "Substrait error: ",
}
}
pub fn message(&self) -> Cow<str> {
match *self {
DataFusionError::ArrowError(ref desc, ref backtrace) => {
let backtrace = backtrace.clone().unwrap_or("".to_owned());
Cow::Owned(format!("{desc}{backtrace}"))
}
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(ref desc) => Cow::Owned(desc.to_string()),
#[cfg(feature = "avro")]
DataFusionError::AvroError(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::IoError(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::SQL(ref desc, ref backtrace) => {
let backtrace: String = backtrace.clone().unwrap_or("".to_owned());
Cow::Owned(format!("{desc:?}{backtrace}"))
}
DataFusionError::Configuration(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::NotImplemented(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::Internal(ref desc) => Cow::Owned(format!(
"{desc}.\nThis was likely caused by a bug in DataFusion's \
code and we would welcome that you file an bug report in our issue tracker"
)),
DataFusionError::Plan(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::SchemaError(ref desc, ref backtrace) => {
let backtrace: &str =
&backtrace.as_ref().clone().unwrap_or("".to_owned());
Cow::Owned(format!("{desc}{backtrace}"))
}
DataFusionError::Execution(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::ResourcesExhausted(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::External(ref desc) => Cow::Owned(desc.to_string()),
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::Context(ref desc, ref err) => {
Cow::Owned(format!("{desc}\ncaused by\n{}", *err))
}
DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()),
}
}
}
#[macro_export]
macro_rules! unwrap_or_internal_err {
($Value: ident) => {
$Value.ok_or_else(|| {
DataFusionError::Internal(format!(
"{} should not be None",
stringify!($Value)
))
})?
};
}
macro_rules! with_dollar_sign {
($($body:tt)*) => {
macro_rules! __with_dollar_sign { $($body)* }
__with_dollar_sign!($);
}
}
macro_rules! make_error {
($NAME_ERR:ident, $NAME_DF_ERR: ident, $ERR:ident) => {
with_dollar_sign! {
($d:tt) => {
#[macro_export]
macro_rules! $NAME_DF_ERR {
($d($d args:expr),*) => {
$crate::DataFusionError::$ERR(
format!(
"{}{}",
format!($d($d args),*),
$crate::DataFusionError::get_back_trace(),
).into()
)
}
}
#[macro_export]
macro_rules! $NAME_ERR {
($d($d args:expr),*) => {
Err($crate::DataFusionError::$ERR(
format!(
"{}{}",
format!($d($d args),*),
$crate::DataFusionError::get_back_trace(),
).into()
))
}
}
}
}
};
}
make_error!(plan_err, plan_datafusion_err, Plan);
make_error!(internal_err, internal_datafusion_err, Internal);
make_error!(not_impl_err, not_impl_datafusion_err, NotImplemented);
make_error!(exec_err, exec_datafusion_err, Execution);
make_error!(config_err, config_datafusion_err, Configuration);
make_error!(substrait_err, substrait_datafusion_err, Substrait);
#[macro_export]
macro_rules! sql_datafusion_err {
($ERR:expr) => {
DataFusionError::SQL($ERR, Some(DataFusionError::get_back_trace()))
};
}
#[macro_export]
macro_rules! sql_err {
($ERR:expr) => {
Err(datafusion_common::sql_datafusion_err!($ERR))
};
}
#[macro_export]
macro_rules! arrow_datafusion_err {
($ERR:expr) => {
DataFusionError::ArrowError($ERR, Some(DataFusionError::get_back_trace()))
};
}
#[macro_export]
macro_rules! arrow_err {
($ERR:expr) => {
Err(datafusion_common::arrow_datafusion_err!($ERR))
};
}
#[macro_export]
macro_rules! schema_datafusion_err {
($ERR:expr) => {
DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
)
};
}
#[macro_export]
macro_rules! schema_err {
($ERR:expr) => {
Err(DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
))
};
}
pub use config_err as _config_err;
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
pub use plan_datafusion_err as _plan_datafusion_err;
pub use plan_err as _plan_err;
pub use schema_err as _schema_err;
pub fn field_not_found<R: Into<TableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema.columns().to_vec(),
})
}
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema.columns().to_vec(),
})
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use crate::error::DataFusionError;
use arrow::error::ArrowError;
#[test]
fn datafusion_error_to_arrow() {
let res = return_arrow_error().unwrap_err();
assert!(res
.to_string()
.starts_with("External error: Error during planning: foo"));
}
#[test]
fn arrow_error_to_datafusion() {
let res = return_datafusion_error().unwrap_err();
assert_eq!(res.strip_backtrace(), "Arrow error: Schema error: bar");
}
#[cfg(feature = "backtrace")]
#[test]
#[allow(clippy::unnecessary_literal_unwrap)]
fn test_enabled_backtrace() {
let res: Result<(), DataFusionError> = plan_err!("Err");
let err = res.unwrap_err().to_string();
assert!(err.contains(DataFusionError::BACK_TRACE_SEP));
assert_eq!(
err.split(DataFusionError::BACK_TRACE_SEP)
.collect::<Vec<&str>>()
.first()
.unwrap(),
&"Error during planning: Err"
);
assert!(!err
.split(DataFusionError::BACK_TRACE_SEP)
.collect::<Vec<&str>>()
.get(1)
.unwrap()
.is_empty());
}
#[cfg(not(feature = "backtrace"))]
#[test]
#[allow(clippy::unnecessary_literal_unwrap)]
fn test_disabled_backtrace() {
let res: Result<(), DataFusionError> = plan_err!("Err");
let res = res.unwrap_err().to_string();
assert!(!res.contains(DataFusionError::BACK_TRACE_SEP));
assert_eq!(res, "Error during planning: Err");
}
#[test]
fn test_find_root_error() {
do_root_test(
DataFusionError::Context(
"it happened!".to_string(),
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::ArrowError(
ArrowError::ExternalError(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
None,
),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::ArrowError(
ArrowError::ExternalError(Box::new(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
)))),
None,
),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(Arc::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(Arc::new(ArrowError::ExternalError(
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
}
#[test]
#[allow(clippy::unnecessary_literal_unwrap)]
fn test_make_error_parse_input() {
let res: Result<(), DataFusionError> = plan_err!("Err");
let res = res.unwrap_err();
assert_eq!(res.strip_backtrace(), "Error during planning: Err");
let extra1 = "extra1";
let extra2 = "extra2";
let res: Result<(), DataFusionError> = plan_err!("Err {} {}", extra1, extra2);
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err extra1 extra2"
);
let res: Result<(), DataFusionError> =
plan_err!("Err {:?} {:#?}", extra1, extra2);
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err \"extra1\" \"extra2\""
);
let res: Result<(), DataFusionError> = plan_err!("Err {extra1} {extra2}");
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err extra1 extra2"
);
let res: Result<(), DataFusionError> = plan_err!("Err {extra1:?} {extra2:#?}");
let res = res.unwrap_err();
assert_eq!(
res.strip_backtrace(),
"Error during planning: Err \"extra1\" \"extra2\""
);
}
fn return_arrow_error() -> arrow::error::Result<()> {
Err(DataFusionError::Plan("foo".to_string()).into())
}
fn return_datafusion_error() -> crate::error::Result<()> {
Err(ArrowError::SchemaError("bar".to_string()).into())
}
fn do_root_test(e: DataFusionError, exp: DataFusionError) {
let e = e.find_root();
assert_eq!(e.strip_backtrace(), exp.strip_backtrace());
assert_eq!(std::mem::discriminant(e), std::mem::discriminant(&exp),)
}
}