use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Fields};
use arrow::array::{
Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, Scalar, UInt32Array,
};
use arrow::buffer::OffsetBuffer;
use datafusion_common::cast::{
as_fixed_size_list_array, as_large_list_array, as_list_array,
};
use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue};
use datafusion_expr::ColumnarValue;
pub(crate) fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> {
let data_type = args[0].data_type();
if !args.iter().all(|arg| {
arg.data_type().equals_datatype(data_type)
|| arg.data_type().equals_datatype(&DataType::Null)
}) {
let types = args.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
return plan_err!("{name} received incompatible types: '{types:?}'.");
}
Ok(())
}
pub(crate) fn make_scalar_function<F>(
inner: F,
) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue>
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef>,
{
move |args: &[ColumnarValue]| {
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});
let is_scalar = len.is_none();
let args = ColumnarValue::values_to_arrays(args)?;
let result = (inner)(&args);
if is_scalar {
let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0));
result.map(ColumnarValue::Scalar)
} else {
result.map(ColumnarValue::Array)
}
}
}
pub(crate) fn align_array_dimensions<O: OffsetSizeTrait>(
args: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {
let args_ndim = args
.iter()
.map(|arg| datafusion_common::utils::list_ndims(arg.data_type()))
.collect::<Vec<_>>();
let max_ndim = args_ndim.iter().max().unwrap_or(&0);
let aligned_args: Result<Vec<ArrayRef>> = args
.into_iter()
.zip(args_ndim.iter())
.map(|(array, ndim)| {
if ndim < max_ndim {
let mut aligned_array = Arc::clone(&array);
for _ in 0..(max_ndim - ndim) {
let data_type = aligned_array.data_type().to_owned();
let array_lengths = vec![1; aligned_array.len()];
let offsets = OffsetBuffer::<O>::from_lengths(array_lengths);
aligned_array = Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new_list_field(data_type, true)),
offsets,
aligned_array,
None,
)?)
}
Ok(aligned_array)
} else {
Ok(Arc::clone(&array))
}
})
.collect();
aligned_args
}
pub(crate) fn compare_element_to_list(
list_array_row: &dyn Array,
element_array: &dyn Array,
row_index: usize,
eq: bool,
) -> Result<BooleanArray> {
if list_array_row.data_type() != element_array.data_type() {
return exec_err!(
"compare_element_to_list received incompatible types: '{:?}' and '{:?}'.",
list_array_row.data_type(),
element_array.data_type()
);
}
let indices = UInt32Array::from(vec![row_index as u32]);
let element_array_row = arrow::compute::take(element_array, &indices, None)?;
let res = match element_array_row.data_type() {
DataType::List(_) => {
let element_array_row_inner = as_list_array(&element_array_row)?.value(0);
let list_array_row_inner = as_list_array(list_array_row)?;
list_array_row_inner
.iter()
.map(|row| {
row.map(|row| {
if eq {
row.eq(&element_array_row_inner)
} else {
row.ne(&element_array_row_inner)
}
})
})
.collect::<BooleanArray>()
}
DataType::LargeList(_) => {
let element_array_row_inner =
as_large_list_array(&element_array_row)?.value(0);
let list_array_row_inner = as_large_list_array(list_array_row)?;
list_array_row_inner
.iter()
.map(|row| {
row.map(|row| {
if eq {
row.eq(&element_array_row_inner)
} else {
row.ne(&element_array_row_inner)
}
})
})
.collect::<BooleanArray>()
}
_ => {
let element_arr = Scalar::new(element_array_row);
if eq {
arrow_ord::cmp::not_distinct(&list_array_row, &element_arr)?
} else {
arrow_ord::cmp::distinct(&list_array_row, &element_arr)?
}
}
};
Ok(res)
}
pub(crate) fn compute_array_dims(
arr: Option<ArrayRef>,
) -> Result<Option<Vec<Option<u64>>>> {
let mut value = match arr {
Some(arr) => arr,
None => return Ok(None),
};
if value.is_empty() {
return Ok(None);
}
let mut res = vec![Some(value.len() as u64)];
loop {
match value.data_type() {
DataType::List(_) => {
value = as_list_array(&value)?.value(0);
res.push(Some(value.len() as u64));
}
DataType::LargeList(_) => {
value = as_large_list_array(&value)?.value(0);
res.push(Some(value.len() as u64));
}
DataType::FixedSizeList(..) => {
value = as_fixed_size_list_array(&value)?.value(0);
res.push(Some(value.len() as u64));
}
_ => return Ok(Some(res)),
}
}
}
pub(crate) fn get_map_entry_field(data_type: &DataType) -> Result<&Fields> {
match data_type {
DataType::Map(field, _) => {
let field_data_type = field.data_type();
match field_data_type {
DataType::Struct(fields) => Ok(fields),
_ => {
internal_err!("Expected a Struct type, got {:?}", field_data_type)
}
}
}
_ => internal_err!("Expected a Map type, got {:?}", data_type),
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::ListArray;
use arrow::datatypes::Int64Type;
use datafusion_common::utils::SingleRowListArrayBuilder;
#[test]
fn test_align_array_dimensions() {
let array1d_1: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
Some(vec![Some(4), Some(5)]),
]));
let array1d_2: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(6), Some(7), Some(8)]),
]));
let array2d_1: ArrayRef = Arc::new(
SingleRowListArrayBuilder::new(Arc::clone(&array1d_1)).build_list_array(),
);
let array2d_2 = Arc::new(
SingleRowListArrayBuilder::new(Arc::clone(&array1d_2)).build_list_array(),
);
let res = align_array_dimensions::<i32>(vec![
array1d_1.to_owned(),
array2d_2.to_owned(),
])
.unwrap();
let expected = as_list_array(&array2d_1).unwrap();
let expected_dim = datafusion_common::utils::list_ndims(array2d_1.data_type());
assert_ne!(as_list_array(&res[0]).unwrap(), expected);
assert_eq!(
datafusion_common::utils::list_ndims(res[0].data_type()),
expected_dim
);
let array3d_1: ArrayRef =
Arc::new(SingleRowListArrayBuilder::new(array2d_1).build_list_array());
let array3d_2: ArrayRef =
Arc::new(SingleRowListArrayBuilder::new(array2d_2).build_list_array());
let res = align_array_dimensions::<i32>(vec![array1d_1, array3d_2]).unwrap();
let expected = as_list_array(&array3d_1).unwrap();
let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type());
assert_ne!(as_list_array(&res[0]).unwrap(), expected);
assert_eq!(
datafusion_common::utils::list_ndims(res[0].data_type()),
expected_dim
);
}
}