use std::sync::Arc;
use arrow::array::{
Array, ArrayRef, ArrowPrimitiveType, AsArray, ListArray, NullBufferBuilder,
PrimitiveArray,
};
use arrow::datatypes::{Field, Int64Type};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::Accumulator;
use datafusion_functions_aggregate::array_agg::ArrayAggAccumulator;
use arrow::buffer::OffsetBuffer;
use rand::distr::{Distribution, StandardUniform};
use rand::prelude::StdRng;
use rand::Rng;
use rand::SeedableRng;
pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) {
let list_item_data_type = values.as_list::<i32>().values().data_type().clone();
c.bench_function(name, |b| {
b.iter(|| {
#[allow(clippy::unit_arg)]
black_box(
ArrayAggAccumulator::try_new(&list_item_data_type, false)
.unwrap()
.merge_batch(&[values.clone()])
.unwrap(),
)
})
});
}
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowPrimitiveType,
StandardUniform: Distribution<T::Native>,
{
let mut rng = seedable_rng();
(0..size)
.map(|_| {
if rng.random::<f32>() < null_density {
None
} else {
Some(rng.random())
}
})
.collect()
}
pub fn create_list_array<T>(
size: usize,
null_density: f32,
zero_length_lists_probability: f32,
) -> ListArray
where
T: ArrowPrimitiveType,
StandardUniform: Distribution<T::Native>,
{
let mut nulls_builder = NullBufferBuilder::new(size);
let mut rng = StdRng::seed_from_u64(42);
let offsets = OffsetBuffer::from_lengths((0..size).map(|_| {
let is_null = rng.random::<f32>() < null_density;
let mut length = rng.random_range(1..10);
if is_null {
nulls_builder.append_null();
if rng.random::<f32>() <= zero_length_lists_probability {
length = 0;
}
} else {
nulls_builder.append_non_null();
}
length
}));
let length = *offsets.last().unwrap() as usize;
let values = create_primitive_array::<T>(length, 0.0);
let field = Field::new_list_field(T::DATA_TYPE, true);
ListArray::new(
Arc::new(field),
offsets,
Arc::new(values),
nulls_builder.finish(),
)
}
fn array_agg_benchmark(c: &mut Criterion) {
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.0, 1.0)) as ArrayRef;
merge_batch_bench(c, "array_agg i64 merge_batch no nulls", values);
let values = Arc::new(create_list_array::<Int64Type>(8192, 1.0, 1.0)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch all nulls, 100% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 1.0, 0.9)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch all nulls, 90% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.3, 1.0)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 30% nulls, 100% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.7, 1.0)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 70% nulls, 100% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.3, 0.99)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 30% nulls, 99% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.7, 0.99)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 70% nulls, 99% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.3, 0.9)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 30% nulls, 90% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.7, 0.9)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 70% nulls, 90% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.3, 0.50)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 30% nulls, 50% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.7, 0.50)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 70% nulls, 50% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.3, 0.0)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 30% nulls, 0% of nulls point to a zero length array",
values,
);
let values = Arc::new(create_list_array::<Int64Type>(8192, 0.7, 0.0)) as ArrayRef;
merge_batch_bench(
c,
"array_agg i64 merge_batch 70% nulls, 0% of nulls point to a zero length array",
values,
);
}
criterion_group!(benches, array_agg_benchmark);
criterion_main!(benches);