extern crate criterion;
use arrow::array::{StringArray, StringViewArray};
use arrow::datatypes::DataType;
use arrow::util::bench_util::{
create_string_array_with_len, create_string_view_array_with_len,
};
use criterion::{black_box, criterion_group, criterion_main, Criterion, SamplingMode};
use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use rand::distributions::Alphanumeric;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;
use std::time::Duration;
fn gen_args_array(
n_rows: usize,
str_len_chars: usize,
null_density: f32,
utf8_density: f32,
is_string_view: bool, ) -> Vec<ColumnarValue> {
let mut rng = StdRng::seed_from_u64(42);
let rng_ref = &mut rng;
let num_elements = 5; let utf8 = "DataFusionДатаФусион数据融合📊🔥"; let corpus_char_count = utf8.chars().count();
let mut output_set_vec: Vec<Option<String>> = Vec::with_capacity(n_rows);
let mut output_element_vec: Vec<Option<String>> = Vec::with_capacity(n_rows);
for _ in 0..n_rows {
let rand_num = rng_ref.gen::<f32>(); if rand_num < null_density {
output_element_vec.push(None);
output_set_vec.push(None);
} else if rand_num < null_density + utf8_density {
let mut generated_string = String::with_capacity(str_len_chars);
for i in 0..num_elements {
for _ in 0..str_len_chars {
let idx = rng_ref.gen_range(0..corpus_char_count);
let char = utf8.chars().nth(idx).unwrap();
generated_string.push(char);
}
if i < num_elements - 1 {
generated_string.push(',');
}
}
output_element_vec.push(Some(random_element_in_set(&generated_string)));
output_set_vec.push(Some(generated_string));
} else {
let mut generated_string = String::with_capacity(str_len_chars);
for i in 0..num_elements {
for _ in 0..str_len_chars {
let c = rng_ref.sample(Alphanumeric);
generated_string.push(c as char);
}
if i < num_elements - 1 {
generated_string.push(',');
}
}
output_element_vec.push(Some(random_element_in_set(&generated_string)));
output_set_vec.push(Some(generated_string));
}
}
if is_string_view {
let set_array: StringViewArray = output_set_vec.into_iter().collect();
let element_array: StringViewArray = output_element_vec.into_iter().collect();
vec![
ColumnarValue::Array(Arc::new(element_array)),
ColumnarValue::Array(Arc::new(set_array)),
]
} else {
let set_array: StringArray = output_set_vec.clone().into_iter().collect();
let element_array: StringArray = output_element_vec.into_iter().collect();
vec![
ColumnarValue::Array(Arc::new(element_array)),
ColumnarValue::Array(Arc::new(set_array)),
]
}
}
fn random_element_in_set(string: &str) -> String {
let elements: Vec<&str> = string.split(',').collect();
if elements.is_empty() || (elements.len() == 1 && elements[0].is_empty()) {
return String::new();
}
let mut rng = StdRng::seed_from_u64(44);
let random_index = rng.gen_range(0..elements.len());
elements[random_index].to_string()
}
fn gen_args_scalar(
n_rows: usize,
str_len_chars: usize,
null_density: f32,
is_string_view: bool, ) -> Vec<ColumnarValue> {
let str_list = "Apache,DataFusion,SQL,Query,Engine".to_string();
if is_string_view {
let string =
create_string_view_array_with_len(n_rows, null_density, str_len_chars, false);
vec![
ColumnarValue::Array(Arc::new(string)),
ColumnarValue::Scalar(ScalarValue::Utf8(Some(str_list))),
]
} else {
let string =
create_string_array_with_len::<i32>(n_rows, null_density, str_len_chars);
vec![
ColumnarValue::Array(Arc::new(string)),
ColumnarValue::Scalar(ScalarValue::Utf8(Some(str_list))),
]
}
}
fn criterion_benchmark(c: &mut Criterion) {
let find_in_set = datafusion_functions::unicode::find_in_set();
let n_rows = 8192;
for str_len in [8, 32, 1024] {
let mut group = c.benchmark_group("find_in_set");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(50);
group.measurement_time(Duration::from_secs(10));
let args = gen_args_array(n_rows, str_len, 0.1, 0.5, false);
group.bench_function(format!("string_len_{}", str_len), |b| {
b.iter(|| {
black_box(find_in_set.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
number_rows: n_rows,
return_type: &DataType::Int32,
}))
})
});
let args = gen_args_array(n_rows, str_len, 0.1, 0.5, true);
group.bench_function(format!("string_view_len_{}", str_len), |b| {
b.iter(|| {
black_box(find_in_set.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
number_rows: n_rows,
return_type: &DataType::Int32,
}))
})
});
group.finish();
let mut group = c.benchmark_group("find_in_set_scalar");
let args = gen_args_scalar(n_rows, str_len, 0.1, false);
group.bench_function(format!("string_len_{}", str_len), |b| {
b.iter(|| {
black_box(find_in_set.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
number_rows: n_rows,
return_type: &DataType::Int32,
}))
})
});
let args = gen_args_scalar(n_rows, str_len, 0.1, true);
group.bench_function(format!("string_view_len_{}", str_len), |b| {
b.iter(|| {
black_box(find_in_set.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
number_rows: n_rows,
return_type: &DataType::Int32,
}))
})
});
group.finish();
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);