mod data_utils;
use arrow::util::pretty::pretty_format_batches;
use criterion::{criterion_group, criterion_main, Criterion};
use data_utils::make_data;
use datafusion::physical_plan::{collect, displayable, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion::{datasource::MemTable, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use std::sync::Arc;
use tokio::runtime::Runtime;
async fn create_context(
limit: usize,
partition_cnt: i32,
sample_cnt: i32,
asc: bool,
use_topk: bool,
use_view: bool,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
let (schema, parts) = make_data(partition_cnt, sample_cnt, asc, use_view).unwrap();
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());
let mut cfg = SessionConfig::new();
let opts = cfg.options_mut();
opts.optimizer.enable_topk_aggregation = use_topk;
let ctx = SessionContext::new_with_config(cfg);
let _ = ctx.register_table("traces", mem_table)?;
let sql = format!("select trace_id, max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};");
let df = ctx.sql(sql.as_str()).await?;
let physical_plan = df.create_physical_plan().await?;
let actual_phys_plan = displayable(physical_plan.as_ref()).indent(true).to_string();
assert_eq!(
actual_phys_plan.contains(&format!("lim=[{limit}]")),
use_topk
);
Ok((physical_plan, ctx.task_ctx()))
}
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
criterion::black_box(
rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await }),
)
.unwrap();
}
async fn aggregate(
plan: Arc<dyn ExecutionPlan>,
ctx: Arc<TaskContext>,
asc: bool,
) -> Result<()> {
let batches = collect(plan, ctx).await?;
assert_eq!(batches.len(), 1);
let batch = batches.first().unwrap();
assert_eq!(batch.num_rows(), 10);
let actual = format!("{}", pretty_format_batches(&batches)?).to_lowercase();
let expected_asc = r#"
+----------------------------------+--------------------------+
| trace_id | max(traces.timestamp_ms) |
+----------------------------------+--------------------------+
| 5868861a23ed31355efc5200eb80fe74 | 16909009999999 |
| 4040e64656804c3d77320d7a0e7eb1f0 | 16909009999998 |
| 02801bbe533190a9f8713d75222f445d | 16909009999997 |
| 9e31b3b5a620de32b68fefa5aeea57f1 | 16909009999996 |
| 2d88a860e9bd1cfaa632d8e7caeaa934 | 16909009999995 |
| a47edcef8364ab6f191dd9103e51c171 | 16909009999994 |
| 36a3fa2ccfbf8e00337f0b1254384db6 | 16909009999993 |
| 0756be84f57369012e10de18b57d8a2f | 16909009999992 |
| d4d6bf9845fa5897710e3a8db81d5907 | 16909009999991 |
| 3c2cc1abe728a66b61e14880b53482a0 | 16909009999990 |
+----------------------------------+--------------------------+
"#
.trim();
if asc {
assert_eq!(actual.trim(), expected_asc);
}
Ok(())
}
fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let limit = 10;
let partitions = 10;
let samples = 1_000_000;
c.bench_function(
format!("aggregate {} time-series rows", partitions * samples).as_str(),
|b| {
b.iter(|| {
let real = rt.block_on(async {
create_context(limit, partitions, samples, false, false, false)
.await
.unwrap()
});
run(&rt, real.0.clone(), real.1.clone(), false)
})
},
);
c.bench_function(
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
|b| {
b.iter(|| {
let asc = rt.block_on(async {
create_context(limit, partitions, samples, true, false, false)
.await
.unwrap()
});
run(&rt, asc.0.clone(), asc.1.clone(), true)
})
},
);
c.bench_function(
format!(
"top k={limit} aggregate {} time-series rows",
partitions * samples
)
.as_str(),
|b| {
b.iter(|| {
let topk_real = rt.block_on(async {
create_context(limit, partitions, samples, false, true, false)
.await
.unwrap()
});
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
})
},
);
c.bench_function(
format!(
"top k={limit} aggregate {} worst-case rows",
partitions * samples
)
.as_str(),
|b| {
b.iter(|| {
let topk_asc = rt.block_on(async {
create_context(limit, partitions, samples, true, true, false)
.await
.unwrap()
});
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
})
},
);
c.bench_function(
format!(
"top k={limit} aggregate {} time-series rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| {
b.iter(|| {
let topk_real = rt.block_on(async {
create_context(limit, partitions, samples, false, true, true)
.await
.unwrap()
});
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
})
},
);
c.bench_function(
format!(
"top k={limit} aggregate {} worst-case rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| {
b.iter(|| {
let topk_asc = rt.block_on(async {
create_context(limit, partitions, samples, true, true, true)
.await
.unwrap()
});
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
})
},
);
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);