use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use crate::error::{ExecutionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr};
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr};
use crate::arrow::array::PrimitiveArrayOps;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{
array::{
ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, StringArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
compute,
};
use fnv::FnvHashMap;
use super::{
common, expressions::Column, group_scalar::GroupByScalar, SendableRecordBatchReader,
};
use async_trait::async_trait;
#[derive(Debug, Copy, Clone)]
pub enum AggregateMode {
Partial,
Final,
}
#[derive(Debug)]
pub struct HashAggregateExec {
mode: AggregateMode,
group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
}
fn create_schema(
input_schema: &Schema,
group_expr: &Vec<(Arc<dyn PhysicalExpr>, String)>,
aggr_expr: &Vec<Arc<dyn AggregateExpr>>,
mode: AggregateMode,
) -> Result<Schema> {
let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len());
for (expr, name) in group_expr {
fields.push(Field::new(
name,
expr.data_type(&input_schema)?,
expr.nullable(&input_schema)?,
))
}
match mode {
AggregateMode::Partial => {
for expr in aggr_expr {
fields.extend(expr.state_fields()?.iter().cloned())
}
}
AggregateMode::Final => {
for expr in aggr_expr {
fields.push(expr.field()?)
}
}
}
Ok(Schema::new(fields))
}
impl HashAggregateExec {
pub fn try_new(
mode: AggregateMode,
group_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let schema = create_schema(&input.schema(), &group_expr, &aggr_expr, mode)?;
let schema = Arc::new(schema);
Ok(HashAggregateExec {
mode,
group_expr,
aggr_expr,
input,
schema,
})
}
}
#[async_trait]
impl ExecutionPlan for HashAggregateExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn required_child_distribution(&self) -> Distribution {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
AggregateMode::Final => Distribution::SinglePartition,
}
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchReader> {
let input = self.input.execute(partition).await?;
let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
if self.group_expr.is_empty() {
Ok(Box::new(HashAggregateIterator::new(
self.mode,
self.schema.clone(),
self.aggr_expr.clone(),
input,
)))
} else {
Ok(Box::new(GroupedHashAggregateIterator::new(
self.mode.clone(),
self.schema.clone(),
group_expr,
self.aggr_expr.clone(),
input,
)))
}
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(HashAggregateExec::try_new(
self.mode,
self.group_expr.clone(),
self.aggr_expr.clone(),
children[0].clone(),
)?)),
_ => Err(ExecutionError::General(
"HashAggregateExec wrong number of children".to_string(),
)),
}
}
}
struct GroupedHashAggregateIterator {
mode: AggregateMode,
schema: SchemaRef,
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchReader,
finished: bool,
}
fn group_aggregate_batch(
mode: &AggregateMode,
group_expr: &Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: &Vec<Arc<dyn AggregateExpr>>,
batch: &RecordBatch,
accumulators: &mut FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>)>,
aggregate_expressions: &Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<()> {
let group_values = evaluate(group_expr, batch)?;
let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
let mut key = Vec::with_capacity(group_values.len());
for _ in 0..group_values.len() {
key.push(GroupByScalar::UInt32(0));
}
for row in 0..batch.num_rows() {
create_key(&group_values, row, &mut key)
.map_err(ExecutionError::into_arrow_external_error)?;
match accumulators.get_mut(&key) {
None => {
let accumulator_set = create_accumulators(aggr_expr)
.map_err(ExecutionError::into_arrow_external_error)?;
accumulators
.insert(key.clone(), (accumulator_set, Box::new(vec![row as u32])));
}
Some((_, v)) => v.push(row as u32),
}
}
accumulators
.iter_mut()
.map(|(_, (accumulator_set, indices))| {
accumulator_set
.iter()
.zip(&aggr_input_values)
.into_iter()
.map(|(accumulator, aggr_array)| {
(
accumulator,
aggr_array
.iter()
.map(|array| {
compute::take(
array,
&UInt32Array::from(*indices.clone()),
None, )
.unwrap()
})
.collect::<Vec<ArrayRef>>(),
)
})
.map(|(accumulator, values)| match mode {
AggregateMode::Partial => {
accumulator.borrow_mut().update_batch(&values)
}
AggregateMode::Final => {
accumulator.borrow_mut().merge_batch(&values)
}
})
.collect::<Result<()>>()
.and(Ok(indices.clear()))
})
.collect::<Result<()>>()
}
impl GroupedHashAggregateIterator {
pub fn new(
mode: AggregateMode,
schema: SchemaRef,
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchReader,
) -> Self {
GroupedHashAggregateIterator {
mode,
schema,
group_expr,
aggr_expr,
input,
finished: false,
}
}
}
type AccumulatorSet = Vec<Rc<RefCell<dyn Accumulator>>>;
impl Iterator for GroupedHashAggregateIterator {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
self.finished = true;
let mode = &self.mode;
let group_expr = &self.group_expr;
let aggr_expr = &self.aggr_expr;
let aggregate_expressions = match aggregate_expressions(&aggr_expr, &mode) {
Ok(e) => e,
Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
};
let mut accumulators: FnvHashMap<
Vec<GroupByScalar>,
(AccumulatorSet, Box<Vec<u32>>),
> = FnvHashMap::default();
match self
.input
.as_mut()
.into_iter()
.map(|batch| {
group_aggregate_batch(
&mode,
&group_expr,
&aggr_expr,
&batch?,
&mut accumulators,
&aggregate_expressions,
)
.map_err(ExecutionError::into_arrow_external_error)
})
.collect::<ArrowResult<()>>()
{
Err(e) => return Some(Err(e)),
Ok(_) => {}
}
Some(
create_batch_from_map(
&self.mode,
&accumulators,
self.group_expr.len(),
&self.schema,
)
.map_err(ExecutionError::into_arrow_external_error),
)
}
}
impl RecordBatchReader for GroupedHashAggregateIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
fn evaluate(
expr: &Vec<Arc<dyn PhysicalExpr>>,
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| expr.evaluate(&batch))
.collect::<Result<Vec<_>>>()
}
fn evaluate_many(
expr: &Vec<Vec<Arc<dyn PhysicalExpr>>>,
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter()
.map(|expr| evaluate(expr, batch))
.collect::<Result<Vec<_>>>()
}
fn merge_expressions(
expr: &Arc<dyn AggregateExpr>,
) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
Ok(expr
.state_fields()?
.iter()
.map(|f| Arc::new(Column::new(f.name())) as Arc<dyn PhysicalExpr>)
.collect::<Vec<_>>())
}
fn aggregate_expressions(
aggr_expr: &[Arc<dyn AggregateExpr>],
mode: &AggregateMode,
) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
match mode {
AggregateMode::Partial => {
Ok(aggr_expr.iter().map(|agg| agg.expressions()).collect())
}
AggregateMode::Final => Ok(aggr_expr
.iter()
.map(|agg| merge_expressions(agg))
.collect::<Result<Vec<_>>>()?),
}
}
struct HashAggregateIterator {
mode: AggregateMode,
schema: SchemaRef,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchReader,
finished: bool,
}
impl HashAggregateIterator {
pub fn new(
mode: AggregateMode,
schema: SchemaRef,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: SendableRecordBatchReader,
) -> Self {
HashAggregateIterator {
mode,
schema,
aggr_expr,
input,
finished: false,
}
}
}
fn aggregate_batch(
mode: &AggregateMode,
batch: &RecordBatch,
accumulators: &AccumulatorSet,
expressions: &Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<()> {
accumulators
.iter()
.zip(expressions)
.map(|(accum, expr)| {
let values = &expr
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?;
match mode {
AggregateMode::Partial => accum.borrow_mut().update_batch(values),
AggregateMode::Final => accum.borrow_mut().merge_batch(values),
}
})
.collect::<Result<()>>()
}
impl Iterator for HashAggregateIterator {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
self.finished = true;
let accumulators = match create_accumulators(&self.aggr_expr) {
Ok(e) => e,
Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
};
let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) {
Ok(e) => e,
Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
};
let mode = self.mode;
let schema = self.schema();
match self
.input
.as_mut()
.into_iter()
.map(|batch| {
aggregate_batch(&mode, &batch?, &accumulators, &expressions)
.map_err(ExecutionError::into_arrow_external_error)
})
.collect::<ArrowResult<()>>()
{
Err(e) => return Some(Err(e)),
Ok(_) => {}
}
Some(
finalize_aggregation(&accumulators, &mode)
.map_err(ExecutionError::into_arrow_external_error)
.and_then(|columns| RecordBatch::try_new(schema.clone(), columns)),
)
}
}
impl RecordBatchReader for HashAggregateIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
fn concatenate(arrays: Vec<Vec<ArrayRef>>) -> ArrowResult<Vec<ArrayRef>> {
(0..arrays[0].len())
.map(|column| {
let array_list = arrays.iter().map(|a| a[column].clone()).collect::<Vec<_>>();
compute::concat(&array_list)
})
.collect::<ArrowResult<Vec<_>>>()
}
fn create_batch_from_map(
mode: &AggregateMode,
accumulators: &FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>)>,
num_group_expr: usize,
output_schema: &Schema,
) -> Result<RecordBatch> {
let arrays = accumulators
.iter()
.map(|(k, (accumulator_set, _))| {
let mut groups = (0..num_group_expr)
.map(|i| match &k[i] {
GroupByScalar::Int8(n) => {
Arc::new(Int8Array::from(vec![*n])) as ArrayRef
}
GroupByScalar::Int16(n) => Arc::new(Int16Array::from(vec![*n])),
GroupByScalar::Int32(n) => Arc::new(Int32Array::from(vec![*n])),
GroupByScalar::Int64(n) => Arc::new(Int64Array::from(vec![*n])),
GroupByScalar::UInt8(n) => Arc::new(UInt8Array::from(vec![*n])),
GroupByScalar::UInt16(n) => Arc::new(UInt16Array::from(vec![*n])),
GroupByScalar::UInt32(n) => Arc::new(UInt32Array::from(vec![*n])),
GroupByScalar::UInt64(n) => Arc::new(UInt64Array::from(vec![*n])),
GroupByScalar::Utf8(str) => Arc::new(StringArray::from(vec![&**str])),
})
.collect::<Vec<ArrayRef>>();
groups.extend(
finalize_aggregation(accumulator_set, mode)
.map_err(ExecutionError::into_arrow_external_error)?,
);
Ok(groups)
})
.collect::<Result<Vec<Vec<ArrayRef>>>>()?;
let batch = if arrays.len() != 0 {
let columns = concatenate(arrays)?;
RecordBatch::try_new(Arc::new(output_schema.to_owned()), columns)?
} else {
common::create_batch_empty(output_schema)?
};
Ok(batch)
}
fn create_accumulators(
aggr_expr: &Vec<Arc<dyn AggregateExpr>>,
) -> Result<AccumulatorSet> {
aggr_expr
.iter()
.map(|expr| expr.create_accumulator())
.collect::<Result<Vec<_>>>()
}
fn finalize_aggregation(
accumulators: &AccumulatorSet,
mode: &AggregateMode,
) -> Result<Vec<ArrayRef>> {
match mode {
AggregateMode::Partial => {
let a = accumulators
.iter()
.map(|accumulator| accumulator.borrow_mut().state())
.map(|value| {
value.and_then(|e| {
Ok(e.iter().map(|v| v.to_array()).collect::<Vec<ArrayRef>>())
})
})
.collect::<Result<Vec<_>>>()?;
Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
}
AggregateMode::Final => {
accumulators
.iter()
.map(|accumulator| {
accumulator
.borrow_mut()
.evaluate()
.and_then(|v| Ok(v.to_array()))
})
.collect::<Result<Vec<ArrayRef>>>()
}
}
}
fn create_key(
group_by_keys: &[ArrayRef],
row: usize,
vec: &mut Vec<GroupByScalar>,
) -> Result<()> {
for i in 0..group_by_keys.len() {
let col = &group_by_keys[i];
match col.data_type() {
DataType::UInt8 => {
let array = col.as_any().downcast_ref::<UInt8Array>().unwrap();
vec[i] = GroupByScalar::UInt8(array.value(row))
}
DataType::UInt16 => {
let array = col.as_any().downcast_ref::<UInt16Array>().unwrap();
vec[i] = GroupByScalar::UInt16(array.value(row))
}
DataType::UInt32 => {
let array = col.as_any().downcast_ref::<UInt32Array>().unwrap();
vec[i] = GroupByScalar::UInt32(array.value(row))
}
DataType::UInt64 => {
let array = col.as_any().downcast_ref::<UInt64Array>().unwrap();
vec[i] = GroupByScalar::UInt64(array.value(row))
}
DataType::Int8 => {
let array = col.as_any().downcast_ref::<Int8Array>().unwrap();
vec[i] = GroupByScalar::Int8(array.value(row))
}
DataType::Int16 => {
let array = col.as_any().downcast_ref::<Int16Array>().unwrap();
vec[i] = GroupByScalar::Int16(array.value(row))
}
DataType::Int32 => {
let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
vec[i] = GroupByScalar::Int32(array.value(row))
}
DataType::Int64 => {
let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
vec[i] = GroupByScalar::Int64(array.value(row))
}
DataType::Utf8 => {
let array = col.as_any().downcast_ref::<StringArray>().unwrap();
vec[i] = GroupByScalar::Utf8(String::from(array.value(row)))
}
_ => {
return Err(ExecutionError::ExecutionError(
"Unsupported GROUP BY data type".to_string(),
))
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use arrow::array::Float64Array;
use super::*;
use crate::physical_plan::expressions::{col, Avg};
use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::{common, memory::MemoryExec};
fn some_data() -> ArrowResult<(Arc<Schema>, Vec<RecordBatch>)> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::UInt32, false),
Field::new("b", DataType::Float64, false),
]));
Ok((
schema.clone(),
vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
],
)?,
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
],
)?,
],
))
}
#[tokio::test]
async fn aggregate() -> Result<()> {
let (schema, batches) = some_data().unwrap();
let input: Arc<dyn ExecutionPlan> = Arc::new(
MemoryExec::try_new(&vec![batches.clone(), batches], schema, None).unwrap(),
);
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a"), "a".to_string())];
let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
col("b"),
"AVG(b)".to_string(),
DataType::Float64,
))];
let partial_aggregate = Arc::new(HashAggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
input,
)?);
let result = common::collect(partial_aggregate.execute(0).await?)?;
let keys = result[0]
.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
assert_eq!(*keys, UInt32Array::from(vec![2, 3, 4]));
let ns = result[0]
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(*ns, UInt64Array::from(vec![2, 3, 3]));
let sums = result[0]
.column(2)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(*sums, Float64Array::from(vec![2.0, 7.0, 11.0]));
let merge = Arc::new(MergeExec::new(partial_aggregate, 2));
let final_group: Vec<Arc<dyn PhysicalExpr>> =
(0..groups.len()).map(|i| col(&groups[i].1)).collect();
let merged_aggregate = Arc::new(HashAggregateExec::try_new(
AggregateMode::Final,
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
.collect(),
aggregates,
merge,
)?);
let result = common::collect(merged_aggregate.execute(0).await?)?;
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);
let a = batch
.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap();
let b = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(*a, UInt32Array::from(vec![2, 3, 4]));
assert_eq!(
*b,
Float64Array::from(vec![
1.0,
(2.0 + 3.0 + 2.0) / 3.0,
(3.0 + 4.0 + 4.0) / 3.0
])
);
Ok(())
}
}