use std::sync::Arc;
use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{collect, ExecutionPlan};
use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_datasource::memory::MemorySourceConfig;
fn generate_spm_for_round_robin_tie_breaker(
has_same_value: bool,
enable_round_robin_repartition: bool,
batch_count: usize,
partition_count: usize,
) -> SortPreservingMergeExec {
let row_size = 256;
let rb = if has_same_value {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size]));
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size]));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
} else {
let v = (0i32..row_size as i32).collect::<Vec<_>>();
let a: ArrayRef = Arc::new(Int32Array::from(v));
let charset: Vec<char> =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
.chars()
.collect();
let mut strings = Vec::new();
for i in 0..256 {
let mut s = String::new();
s.push(charset[i % charset.len()]);
s.push(charset[(i / charset.len()) % charset.len()]);
strings.push(Some(s));
}
let b: ArrayRef = Arc::new(StringArray::from_iter(strings));
let v = (0i64..row_size as i64).collect::<Vec<_>>();
let c: ArrayRef = Arc::new(Int64Array::from_iter(v));
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
};
let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>();
let partitiones = vec![rbs.clone(); partition_count];
let schema = rb.schema();
let sort = LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
options: Default::default(),
},
PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: Default::default(),
},
]);
let exec = MemorySourceConfig::try_new_exec(&partitiones, schema, None).unwrap();
SortPreservingMergeExec::new(sort, exec)
.with_round_robin_repartition(enable_round_robin_repartition)
}
fn run_bench(
c: &mut Criterion,
has_same_value: bool,
enable_round_robin_repartition: bool,
batch_count: usize,
partition_count: usize,
description: &str,
) {
let task_ctx = TaskContext::default();
let task_ctx = Arc::new(task_ctx);
let spm = Arc::new(generate_spm_for_round_robin_tie_breaker(
has_same_value,
enable_round_robin_repartition,
batch_count,
partition_count,
)) as Arc<dyn ExecutionPlan>;
c.bench_function(description, |b| {
b.to_async(FuturesExecutor)
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx))))
});
}
fn criterion_benchmark(c: &mut Criterion) {
let params = [
(true, false, "low_card_without_tiebreaker"), (true, true, "low_card_with_tiebreaker"), (false, false, "high_card_without_tiebreaker"), (false, true, "high_card_with_tiebreaker"), ];
let batch_counts = [1, 25, 625];
let partition_counts = [2, 8, 32];
for &(has_same_value, enable_round_robin_repartition, cardinality_label) in ¶ms {
for &batch_count in &batch_counts {
for &partition_count in &partition_counts {
let description = format!(
"{}_batch_count_{}_partition_count_{}",
cardinality_label, batch_count, partition_count
);
run_bench(
c,
has_same_value,
enable_round_robin_repartition,
batch_count,
partition_count,
&description,
);
}
}
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);