use std::{any::Any, sync::Arc};
use arrow::array::{ArrayData, Capacities, MutableArrayData};
use arrow_array::{
new_null_array, Array, ArrayRef, GenericListArray, NullArray, OffsetSizeTrait,
};
use arrow_buffer::OffsetBuffer;
use arrow_schema::DataType::{LargeList, List, Null};
use arrow_schema::{DataType, Field};
use datafusion_common::{plan_err, utils::array_into_list_array, Result};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::{
ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use crate::utils::make_scalar_function;
make_udf_function!(
MakeArray,
make_array,
"Returns an Arrow array using the specified input expressions.",
make_array_udf
);
#[derive(Debug)]
pub struct MakeArray {
signature: Signature,
aliases: Vec<String>,
}
impl Default for MakeArray {
fn default() -> Self {
Self::new()
}
}
impl MakeArray {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![TypeSignature::VariadicEqual, TypeSignature::Any(0)],
Volatility::Immutable,
),
aliases: vec![String::from("make_array"), String::from("make_list")],
}
}
}
impl ScalarUDFImpl for MakeArray {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"make_array"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match arg_types.len() {
0 => Ok(DataType::List(Arc::new(Field::new(
"item",
DataType::Null,
true,
)))),
_ => {
let mut expr_type = DataType::Null;
for arg_type in arg_types {
if !arg_type.equals_datatype(&DataType::Null) {
expr_type = arg_type.clone();
break;
}
}
Ok(List(Arc::new(Field::new("item", expr_type, true))))
}
}
}
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_scalar_function(make_array_inner)(args)
}
fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
make_scalar_function(make_array_inner)(&[])
}
fn aliases(&self) -> &[String] {
&self.aliases
}
}
pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
let mut data_type = Null;
for arg in arrays {
let arg_data_type = arg.data_type();
if !arg_data_type.equals_datatype(&Null) {
data_type = arg_data_type.clone();
break;
}
}
match data_type {
Null => {
let array = new_null_array(&Null, arrays.iter().map(|a| a.len()).sum());
Ok(Arc::new(array_into_list_array(array)))
}
LargeList(..) => array_array::<i64>(arrays, data_type),
_ => array_array::<i32>(arrays, data_type),
}
}
fn array_array<O: OffsetSizeTrait>(
args: &[ArrayRef],
data_type: DataType,
) -> Result<ArrayRef> {
if args.is_empty() {
return plan_err!("Array requires at least one argument");
}
let mut data = vec![];
let mut total_len = 0;
for arg in args {
let arg_data = if arg.as_any().is::<NullArray>() {
ArrayData::new_empty(&data_type)
} else {
arg.to_data()
};
total_len += arg_data.len();
data.push(arg_data);
}
let mut offsets: Vec<O> = Vec::with_capacity(total_len);
offsets.push(O::usize_as(0));
let capacity = Capacities::Array(total_len);
let data_ref = data.iter().collect::<Vec<_>>();
let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity);
let num_rows = args[0].len();
for row_idx in 0..num_rows {
for (arr_idx, arg) in args.iter().enumerate() {
if !arg.as_any().is::<NullArray>()
&& !arg.is_null(row_idx)
&& arg.is_valid(row_idx)
{
mutable.extend(arr_idx, row_idx, row_idx + 1);
} else {
mutable.extend_nulls(1);
}
}
offsets.push(O::usize_as(mutable.len()));
}
let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new("item", data_type, true)),
OffsetBuffer::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}