use std::sync::Arc;
use arrow::array::StringViewArray;
use arrow::{
array::{DictionaryArray, Float64Array, Int64Array, StringArray},
compute::SortOptions,
datatypes::{Int32Type, Schema},
record_batch::RecordBatch,
};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
execution::context::TaskContext,
physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
sorts::sort_preserving_merge::SortPreservingMergeExec, ExecutionPlan,
ExecutionPlanProperties,
},
prelude::SessionContext,
};
use datafusion_datasource::memory::MemorySourceConfig;
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::runtime::Runtime;
const NUM_STREAMS: usize = 8;
const BATCH_SIZE: usize = 1024;
const INPUT_SIZE: u64 = 100000;
type PartitionedBatches = Vec<Vec<RecordBatch>>;
fn criterion_benchmark(c: &mut Criterion) {
let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![
("i64", &i64_streams),
("f64", &f64_streams),
("utf8 low cardinality", &utf8_low_cardinality_streams),
("utf8 high cardinality", &utf8_high_cardinality_streams),
(
"utf8 view low cardinality",
&utf8_view_low_cardinality_streams,
),
(
"utf8 view high cardinality",
&utf8_view_high_cardinality_streams,
),
("utf8 tuple", &utf8_tuple_streams),
("utf8 view tuple", &utf8_view_tuple_streams),
("utf8 dictionary", &dictionary_streams),
("utf8 dictionary tuple", &dictionary_tuple_streams),
("mixed dictionary tuple", &mixed_dictionary_tuple_streams),
("mixed tuple", &mixed_tuple_streams),
(
"mixed tuple with utf8 view",
&mixed_tuple_with_utf8_view_streams,
),
];
for (name, f) in cases {
c.bench_function(&format!("merge sorted {name}"), |b| {
let data = f(true);
let case = BenchCase::merge_sorted(&data);
b.iter(move || case.run())
});
c.bench_function(&format!("sort merge {name}"), |b| {
let data = f(false);
let case = BenchCase::sort_merge(&data);
b.iter(move || case.run())
});
c.bench_function(&format!("sort {name}"), |b| {
let data = f(false);
let case = BenchCase::sort(&data);
b.iter(move || case.run())
});
c.bench_function(&format!("sort partitioned {name}"), |b| {
let data = f(false);
let case = BenchCase::sort_partitioned(&data);
b.iter(move || case.run())
});
}
}
struct BenchCase {
runtime: Runtime,
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
}
impl BenchCase {
fn merge_sorted(partitions: &[Vec<RecordBatch>]) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());
let exec = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let plan = Arc::new(SortPreservingMergeExec::new(sort, exec));
Self {
runtime,
task_ctx,
plan,
}
}
fn sort_merge(partitions: &[Vec<RecordBatch>]) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());
let source = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let exec = SortExec::new(sort.clone(), source).with_preserve_partitioning(true);
let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
Self {
runtime,
task_ctx,
plan,
}
}
fn sort(partitions: &[Vec<RecordBatch>]) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());
let exec = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let exec = Arc::new(CoalescePartitionsExec::new(exec));
let plan = Arc::new(SortExec::new(sort, exec));
Self {
runtime,
task_ctx,
plan,
}
}
fn sort_partitioned(partitions: &[Vec<RecordBatch>]) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = partitions[0][0].schema();
let sort = make_sort_exprs(schema.as_ref());
let source = MemorySourceConfig::try_new_exec(partitions, schema, None).unwrap();
let exec = SortExec::new(sort, source).with_preserve_partitioning(true);
let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
Self {
runtime,
task_ctx,
plan,
}
}
fn run(&self) {
let plan = Arc::clone(&self.plan);
let task_ctx = Arc::clone(&self.task_ctx);
assert_eq!(plan.output_partitioning().partition_count(), 1);
self.runtime.block_on(async move {
let mut stream = plan.execute(0, task_ctx).unwrap();
while let Some(b) = stream.next().await {
b.expect("unexpected execution error");
}
})
}
}
fn make_sort_exprs(schema: &Schema) -> LexOrdering {
schema
.fields()
.iter()
.map(|f| PhysicalSortExpr {
expr: col(f.name(), schema).unwrap(),
options: SortOptions::default(),
})
.collect()
}
fn i64_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().i64_values();
if sorted {
values.sort_unstable();
}
split_tuples(values, |v| {
let array = Int64Array::from(v);
RecordBatch::try_from_iter(vec![("i64", Arc::new(array) as _)]).unwrap()
})
}
fn f64_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().f64_values();
if sorted {
values.sort_unstable_by(|a, b| a.total_cmp(b));
}
split_tuples(values, |v| {
let array = Float64Array::from(v);
RecordBatch::try_from_iter(vec![("f64", Arc::new(array) as _)]).unwrap()
})
}
fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
split_tuples(values, |v| {
let array: StringArray = v.into_iter().collect();
RecordBatch::try_from_iter(vec![("utf_low", Arc::new(array) as _)]).unwrap()
})
}
fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
split_tuples(values, |v| {
let array: StringViewArray = v.into_iter().collect();
RecordBatch::try_from_iter(vec![("utf_view_low", Arc::new(array) as _)]).unwrap()
})
}
fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_high_cardinality_values();
if sorted {
values.sort_unstable();
}
split_tuples(values, |v| {
let array: StringViewArray = v.into_iter().collect();
RecordBatch::try_from_iter(vec![("utf_view_high", Arc::new(array) as _)]).unwrap()
})
}
fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
let mut values = DataGenerator::new().utf8_high_cardinality_values();
if sorted {
values.sort_unstable();
}
split_tuples(values, |v| {
let array: StringArray = v.into_iter().collect();
RecordBatch::try_from_iter(vec![("utf_high", Arc::new(array) as _)]).unwrap()
})
}
fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_high_cardinality_values())
.collect();
if sorted {
tuples.sort_unstable();
}
split_tuples(tuples, |tuples| {
let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let utf8_high: StringArray = utf8_high.into_iter().collect();
let utf8_low1: StringArray = utf8_low1.into_iter().collect();
let utf8_low2: StringArray = utf8_low2.into_iter().collect();
RecordBatch::try_from_iter(vec![
("utf_low1", Arc::new(utf8_low1) as _),
("utf_low2", Arc::new(utf8_low2) as _),
("utf_high", Arc::new(utf8_high) as _),
])
.unwrap()
})
}
fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_high_cardinality_values())
.collect();
if sorted {
tuples.sort_unstable();
}
split_tuples(tuples, |tuples| {
let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let utf8_view_high: StringViewArray = utf8_high.into_iter().collect();
let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
RecordBatch::try_from_iter(vec![
("utf_view_low1", Arc::new(utf8_view_low1) as _),
("utf_view_low2", Arc::new(utf8_view_low2) as _),
("utf_view_high", Arc::new(utf8_view_high) as _),
])
.unwrap()
})
}
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.i64_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.collect();
if sorted {
tuples.sort_unstable();
}
split_tuples(tuples, |tuples| {
let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect();
let utf8_low1: StringArray = utf8_low1.into_iter().collect();
let utf8_low2: StringArray = utf8_low2.into_iter().collect();
let i64_values: Int64Array = i64_values.into_iter().collect();
RecordBatch::try_from_iter(vec![
("f64", Arc::new(f64_values) as _),
("utf_low1", Arc::new(utf8_low1) as _),
("utf_low2", Arc::new(utf8_low2) as _),
("i64", Arc::new(i64_values) as _),
])
.unwrap()
})
}
fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.i64_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.collect();
if sorted {
tuples.sort_unstable();
}
split_tuples(tuples, |tuples| {
let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect();
let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
let i64_values: Int64Array = i64_values.into_iter().collect();
RecordBatch::try_from_iter(vec![
("f64", Arc::new(f64_values) as _),
("utf_view_low1", Arc::new(utf8_view_low1) as _),
("utf_view_low2", Arc::new(utf8_view_low2) as _),
("i64", Arc::new(i64_values) as _),
])
.unwrap()
})
}
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut values = gen.utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
split_tuples(values, |v| {
let dictionary: DictionaryArray<Int32Type> =
v.iter().map(Option::as_deref).collect();
RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap()
})
}
fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.collect();
if sorted {
tuples.sort_unstable();
}
split_tuples(tuples, |tuples| {
let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let a: DictionaryArray<Int32Type> = a.iter().map(Option::as_deref).collect();
let b: DictionaryArray<Int32Type> = b.iter().map(Option::as_deref).collect();
let c: DictionaryArray<Int32Type> = c.iter().map(Option::as_deref).collect();
RecordBatch::try_from_iter(vec![
("a", Arc::new(a) as _),
("b", Arc::new(b) as _),
("c", Arc::new(c) as _),
])
.unwrap()
})
}
fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.collect();
if sorted {
tuples.sort_unstable();
}
split_tuples(tuples, |tuples| {
let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let a: DictionaryArray<Int32Type> = a.iter().map(Option::as_deref).collect();
let b: DictionaryArray<Int32Type> = b.iter().map(Option::as_deref).collect();
let c: DictionaryArray<Int32Type> = c.iter().map(Option::as_deref).collect();
let d: Int64Array = d.into_iter().collect();
RecordBatch::try_from_iter(vec![
("a", Arc::new(a) as _),
("b", Arc::new(b) as _),
("c", Arc::new(c) as _),
("d", Arc::new(d) as _),
])
.unwrap()
})
}
struct DataGenerator {
rng: StdRng,
}
impl DataGenerator {
fn new() -> Self {
Self {
rng: StdRng::seed_from_u64(42),
}
}
fn i64_values(&mut self) -> Vec<i64> {
let mut vec: Vec<_> = (0..INPUT_SIZE)
.map(|_| self.rng.random_range(0..INPUT_SIZE as i64))
.collect();
vec.sort_unstable();
vec
}
fn f64_values(&mut self) -> Vec<f64> {
self.i64_values().into_iter().map(|v| v as f64).collect()
}
fn utf8_low_cardinality_values(&mut self) -> Vec<Option<Arc<str>>> {
let strings = (0..100)
.map(|s| format!("value{s}").into())
.collect::<Vec<_>>();
let mut input = (0..INPUT_SIZE)
.map(|_| {
let idx = self.rng.random_range(0..strings.len());
let s = Arc::clone(&strings[idx]);
Some(s)
})
.collect::<Vec<_>>();
input.sort_unstable();
input
}
fn utf8_high_cardinality_values(&mut self) -> Vec<Option<String>> {
let mut input = (0..INPUT_SIZE)
.map(|_| Some(self.random_string()))
.collect::<Vec<_>>();
input.sort_unstable();
input
}
fn random_string(&mut self) -> String {
let rng = &mut self.rng;
rng.sample_iter(rand::distr::Alphanumeric)
.filter(|c| c.is_ascii_alphabetic())
.take(20)
.map(char::from)
.collect::<String>()
}
}
fn split_tuples<T, F>(input: Vec<T>, f: F) -> PartitionedBatches
where
F: Fn(Vec<T>) -> RecordBatch,
{
let mut rng = StdRng::seed_from_u64(1337);
let mut outputs: Vec<Vec<Vec<T>>> = (0..NUM_STREAMS).map(|_| Vec::new()).collect();
for i in input {
let stream_idx = rng.random_range(0..NUM_STREAMS);
let stream = &mut outputs[stream_idx];
match stream.last_mut() {
Some(x) if x.len() < BATCH_SIZE => x.push(i),
_ => {
let mut v = Vec::with_capacity(BATCH_SIZE);
v.push(i);
stream.push(v)
}
}
}
outputs
.into_iter()
.map(|stream| stream.into_iter().map(&f).collect())
.collect()
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);