use std::sync::Arc;
use arrow::array::{
Array, ArrowPrimitiveType, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use chrono::format::{parse, Parsed, StrftimeItems};
use chrono::LocalResult::Single;
use chrono::{DateTime, TimeZone, Utc};
use itertools::Either;
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{
exec_err, unwrap_or_internal_err, DataFusionError, Result, ScalarType, ScalarValue,
};
use datafusion_expr::ColumnarValue;
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}
pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
for (idx, a) in args.iter().skip(1).enumerate() {
match a.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
}
_ => {
return exec_err!(
"{name} function unsupported data type at index {}: {}",
idx + 1,
a.data_type()
);
}
}
}
Ok(())
}
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
timezone: &T,
s: &str,
format: &str,
) -> Result<DateTime<T>, DataFusionError> {
let err = |err_ctx: &str| {
DataFusionError::Execution(format!(
"Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
))
};
let mut parsed = Parsed::new();
parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
let dt = parsed.to_datetime();
if let Err(e) = &dt {
let ndt = parsed
.to_naive_datetime_with_offset(0)
.or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
if let Err(e) = &ndt {
return Err(err(&e.to_string()));
}
if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
Ok(e.to_owned())
} else {
Err(err(&e.to_string()))
}
} else {
Ok(dt.unwrap().with_timezone(timezone))
}
}
#[inline]
pub(crate) fn string_to_timestamp_nanos_formatted(
s: &str,
format: &str,
) -> Result<i64, DataFusionError> {
string_to_datetime_formatted(&Utc, s, format)?
.naive_utc()
.and_utc()
.timestamp_nanos_opt()
.ok_or_else(|| {
DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())
})
}
#[inline]
pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
Ok(string_to_datetime_formatted(&Utc, s, format)?
.naive_utc()
.and_utc()
.timestamp_millis())
}
pub(crate) fn handle<'a, O, F, S>(
args: &'a [ColumnarValue],
op: F,
name: &str,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&'a str) -> Result<O::Native>,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<i32, O, _>(&[a.as_ref()], op, name)?,
))),
other => exec_err!("Unsupported data type {other:?} for function {name}"),
},
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => {
let result = a.as_ref().map(|x| (op)(x)).transpose()?;
Ok(ColumnarValue::Scalar(S::scalar(result)))
}
other => exec_err!("Unsupported data type {other:?} for function {name}"),
},
}
}
pub(crate) fn handle_multiple<'a, O, F, S, M>(
args: &'a [ColumnarValue],
op: F,
op2: M,
name: &str,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&'a str, &'a str) -> Result<O::Native>,
M: Fn(O::Native) -> O::Native,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
for (pos, arg) in args.iter().enumerate() {
match arg {
ColumnarValue::Array(arg) => match arg.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
}
other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
},
ColumnarValue::Scalar(arg) => {
match arg.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
}
other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
}
}
}
}
Ok(ColumnarValue::Array(Arc::new(
strings_to_primitive_function::<i32, O, _, _>(args, op, op2, name)?,
)))
}
other => {
exec_err!("Unsupported data type {other:?} for function {name}")
}
},
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => {
let a = a.as_ref();
let a = unwrap_or_internal_err!(a);
let mut ret = None;
for (pos, v) in args.iter().enumerate().skip(1) {
let ColumnarValue::Scalar(
ScalarValue::Utf8(x) | ScalarValue::LargeUtf8(x),
) = v
else {
return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}");
};
if let Some(s) = x {
match op(a.as_str(), s.as_str()) {
Ok(r) => {
ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
op2(r),
)))));
break;
}
Err(e) => ret = Some(Err(e)),
}
}
}
unwrap_or_internal_err!(ret)
}
other => {
exec_err!("Unsupported data type {other:?} for function {name}")
}
},
}
}
pub(crate) fn strings_to_primitive_function<'a, T, O, F, F2>(
args: &'a [ColumnarValue],
op: F,
op2: F2,
name: &str,
) -> Result<PrimitiveArray<O>>
where
O: ArrowPrimitiveType,
T: OffsetSizeTrait,
F: Fn(&'a str, &'a str) -> Result<O::Native>,
F2: Fn(O::Native) -> O::Native,
{
if args.len() < 2 {
return exec_err!(
"{:?} args were supplied but {} takes 2 or more arguments",
args.len(),
name
);
}
let data = args
.iter()
.map(|a| match a {
ColumnarValue::Array(a) => {
Ok(Either::Left(as_generic_string_array::<T>(a.as_ref())?))
}
ColumnarValue::Scalar(s) => match s {
ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => Ok(Either::Right(a)),
other => exec_err!(
"Unexpected scalar type encountered '{other}' for function '{name}'"
),
},
})
.collect::<Result<Vec<Either<&GenericStringArray<T>, &Option<String>>>>>()?;
let first_arg = &data.first().unwrap().left().unwrap();
first_arg
.iter()
.enumerate()
.map(|(pos, x)| {
let mut val = None;
if let Some(x) = x {
let param_args = data.iter().skip(1);
for param_arg in param_args {
let p = *param_arg;
let r = if p.is_left() {
let p = p.left().unwrap();
op(x, p.value(pos))
}
else if let Some(p) = p.right().unwrap() {
op(x, p.as_str())
} else {
continue;
};
if r.is_ok() {
val = Some(Ok(op2(r.unwrap())));
break;
} else {
val = Some(r);
}
}
};
val.transpose()
})
.collect()
}
fn unary_string_to_primitive_function<'a, T, O, F>(
args: &[&'a dyn Array],
op: F,
name: &str,
) -> Result<PrimitiveArray<O>>
where
O: ArrowPrimitiveType,
T: OffsetSizeTrait,
F: Fn(&'a str) -> Result<O::Native>,
{
if args.len() != 1 {
return exec_err!(
"{:?} args were supplied but {} takes exactly one argument",
args.len(),
name
);
}
let array = as_generic_string_array::<T>(args[0])?;
array.iter().map(|x| x.map(&op).transpose()).collect()
}