use std::sync::Arc;
use arrow::array::{
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
StringArrayType, StringViewArray,
};
use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow::datatypes::DataType;
use chrono::format::{parse, Parsed, StrftimeItems};
use chrono::LocalResult::Single;
use chrono::{DateTime, TimeZone, Utc};
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{
exec_err, unwrap_or_internal_err, DataFusionError, Result, ScalarType, ScalarValue,
};
use datafusion_expr::ColumnarValue;
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}
pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
for (idx, a) in args.iter().skip(1).enumerate() {
match a.data_type() {
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
}
_ => {
return exec_err!(
"{name} function unsupported data type at index {}: {}",
idx + 1,
a.data_type()
);
}
}
}
Ok(())
}
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
timezone: &T,
s: &str,
format: &str,
) -> Result<DateTime<T>, DataFusionError> {
let err = |err_ctx: &str| {
DataFusionError::Execution(format!(
"Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
))
};
let mut parsed = Parsed::new();
parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
let dt = parsed.to_datetime();
if let Err(e) = &dt {
let ndt = parsed
.to_naive_datetime_with_offset(0)
.or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
if let Err(e) = &ndt {
return Err(err(&e.to_string()));
}
if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
Ok(e.to_owned())
} else {
Err(err(&e.to_string()))
}
} else {
Ok(dt.unwrap().with_timezone(timezone))
}
}
#[inline]
pub(crate) fn string_to_timestamp_nanos_formatted(
s: &str,
format: &str,
) -> Result<i64, DataFusionError> {
string_to_datetime_formatted(&Utc, s, format)?
.naive_utc()
.and_utc()
.timestamp_nanos_opt()
.ok_or_else(|| {
DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())
})
}
#[inline]
pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
Ok(string_to_datetime_formatted(&Utc, s, format)?
.naive_utc()
.and_utc()
.timestamp_millis())
}
pub(crate) fn handle<O, F, S>(
args: &[ColumnarValue],
op: F,
name: &str,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&str) -> Result<O::Native>,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<&StringViewArray, O, _>(
a.as_ref().as_string_view(),
op,
)?,
))),
DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
a.as_ref().as_string::<i64>(),
op,
)?,
))),
DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
a.as_ref().as_string::<i32>(),
op,
)?,
))),
other => exec_err!("Unsupported data type {other:?} for function {name}"),
},
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(a) => {
let result = a.as_ref().map(|x| op(x)).transpose()?;
Ok(ColumnarValue::Scalar(S::scalar(result)))
}
_ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
},
}
}
pub(crate) fn handle_multiple<O, F, S, M>(
args: &[ColumnarValue],
op: F,
op2: M,
name: &str,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
S: ScalarType<O::Native>,
F: Fn(&str, &str) -> Result<O::Native>,
M: Fn(O::Native) -> O::Native,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
for (pos, arg) in args.iter().enumerate() {
match arg {
ColumnarValue::Array(arg) => match arg.data_type() {
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
}
other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
},
ColumnarValue::Scalar(arg) => {
match arg.data_type() {
DataType::Utf8View| DataType::LargeUtf8 | DataType::Utf8 => {
}
other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
}
}
}
}
Ok(ColumnarValue::Array(Arc::new(
strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
)))
}
other => {
exec_err!("Unsupported data type {other:?} for function {name}")
}
},
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(a) => {
let a = a.as_ref();
let a = unwrap_or_internal_err!(a);
let mut ret = None;
for (pos, v) in args.iter().enumerate().skip(1) {
let ColumnarValue::Scalar(
ScalarValue::Utf8View(x)
| ScalarValue::LargeUtf8(x)
| ScalarValue::Utf8(x),
) = v
else {
return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}");
};
if let Some(s) = x {
match op(a, s.as_str()) {
Ok(r) => {
ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
op2(r),
)))));
break;
}
Err(e) => ret = Some(Err(e)),
}
}
}
unwrap_or_internal_err!(ret)
}
other => {
exec_err!("Unsupported data type {other:?} for function {name}")
}
},
}
}
pub(crate) fn strings_to_primitive_function<O, F, F2>(
args: &[ColumnarValue],
op: F,
op2: F2,
name: &str,
) -> Result<PrimitiveArray<O>>
where
O: ArrowPrimitiveType,
F: Fn(&str, &str) -> Result<O::Native>,
F2: Fn(O::Native) -> O::Native,
{
if args.len() < 2 {
return exec_err!(
"{:?} args were supplied but {} takes 2 or more arguments",
args.len(),
name
);
}
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => {
let string_array = a.as_string_view();
handle_array_op::<O, &StringViewArray, F, F2>(
&string_array,
&args[1..],
op,
op2,
)
}
DataType::LargeUtf8 => {
let string_array = as_generic_string_array::<i64>(&a)?;
handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
&string_array,
&args[1..],
op,
op2,
)
}
DataType::Utf8 => {
let string_array = as_generic_string_array::<i32>(&a)?;
handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
&string_array,
&args[1..],
op,
op2,
)
}
other => exec_err!(
"Unsupported data type {other:?} for function substr,\
expected Utf8View, Utf8 or LargeUtf8."
),
},
other => exec_err!(
"Received {} data type, expected only array",
other.data_type()
),
}
}
fn handle_array_op<'a, O, V, F, F2>(
first: &V,
args: &[ColumnarValue],
op: F,
op2: F2,
) -> Result<PrimitiveArray<O>>
where
V: StringArrayType<'a>,
O: ArrowPrimitiveType,
F: Fn(&str, &str) -> Result<O::Native>,
F2: Fn(O::Native) -> O::Native,
{
first
.iter()
.enumerate()
.map(|(pos, x)| {
let mut val = None;
if let Some(x) = x {
for arg in args {
let v = match arg {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => Ok(a.as_string_view().value(pos)),
DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
other => exec_err!("Unexpected type encountered '{other}'"),
},
ColumnarValue::Scalar(s) => match s.try_as_str() {
Some(Some(v)) => Ok(v),
Some(None) => continue, None => exec_err!("Unexpected scalar type encountered '{s}'"),
},
}?;
let r = op(x, v);
if r.is_ok() {
val = Some(Ok(op2(r.unwrap())));
break;
} else {
val = Some(r);
}
}
};
val.transpose()
})
.collect()
}
fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
array: StringArrType,
op: F,
) -> Result<PrimitiveArray<O>>
where
StringArrType: StringArrayType<'a>,
O: ArrowPrimitiveType,
F: Fn(&'a str) -> Result<O::Native>,
{
array.iter().map(|x| x.map(&op).transpose()).collect()
}