use crate::{OptimizerConfig, OptimizerRule};
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ToDFSchema,
};
use datafusion_expr::utils::grouping_set_to_exprlist;
use datafusion_expr::{
logical_plan::{
builder::{build_join_schema, LogicalPlanBuilder},
Aggregate, Analyze, Join, LogicalPlan, Projection, SubqueryAlias, TableScan,
Union, Window,
},
utils::{expr_to_columns, exprlist_to_columns, find_sort_exprs, from_plan},
Expr,
};
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
#[derive(Default)]
pub struct ProjectionPushDown {}
impl OptimizerRule for ProjectionPushDown {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
let required_columns = plan
.schema()
.fields()
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();
optimize_plan(self, plan, &required_columns, false, optimizer_config)
}
fn name(&self) -> &str {
"projection_push_down"
}
}
impl ProjectionPushDown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
fn get_projected_schema(
table_name: Option<&String>,
schema: &Schema,
required_columns: &HashSet<Column>,
has_projection: bool,
) -> Result<(Vec<usize>, DFSchemaRef)> {
let mut projection: BTreeSet<usize> = required_columns
.iter()
.filter(|c| c.relation.is_none() || c.relation.as_ref() == table_name)
.map(|c| schema.index_of(&c.name))
.filter_map(ArrowResult::ok)
.collect();
if projection.is_empty() {
if has_projection && !schema.fields().is_empty() {
projection.insert(0);
} else {
projection = schema
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}
}
let projected_fields: Vec<DFField> = match table_name {
Some(qualifer) => projection
.iter()
.map(|i| DFField::from_qualified(qualifer, schema.fields()[*i].clone()))
.collect(),
None => projection
.iter()
.map(|i| DFField::from(schema.fields()[*i].clone()))
.collect(),
};
let projection = projection.into_iter().collect::<Vec<_>>();
Ok((projection, projected_fields.to_dfschema_ref()?))
}
fn optimize_plan(
_optimizer: &ProjectionPushDown,
plan: &LogicalPlan,
required_columns: &HashSet<Column>, has_projection: bool,
_optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
LogicalPlan::Projection(Projection {
input,
expr,
schema,
alias,
}) => {
let mut new_expr = Vec::new();
let mut new_fields = Vec::new();
schema
.fields()
.iter()
.enumerate()
.try_for_each(|(i, field)| {
if required_columns.contains(&field.qualified_column()) {
new_expr.push(expr[i].clone());
new_fields.push(field.clone());
expr_to_columns(&expr[i], &mut new_required_columns)
} else {
Ok(())
}
})?;
let new_input = optimize_plan(
_optimizer,
input,
&new_required_columns,
true,
_optimizer_config,
)?;
let new_required_columns_optimized = new_input
.schema()
.fields()
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();
let all_column_exprs = new_expr.iter().all(|e| matches!(e, Expr::Column(_)));
if new_fields.is_empty()
|| (has_projection
&& all_column_exprs
&& &new_required_columns_optimized == required_columns)
{
Ok(new_input)
} else {
let metadata = new_input.schema().metadata().clone();
Ok(LogicalPlan::Projection(Projection {
expr: new_expr,
input: Arc::new(new_input),
schema: DFSchemaRef::new(DFSchema::new_with_metadata(
new_fields, metadata,
)?),
alias: alias.clone(),
}))
}
}
LogicalPlan::Join(Join {
left,
right,
on,
filter,
join_type,
join_constraint,
null_equals_null,
..
}) => {
for (l, r) in on {
new_required_columns.insert(l.clone());
new_required_columns.insert(r.clone());
}
if let Some(expr) = filter {
expr_to_columns(expr, &mut new_required_columns)?;
}
let optimized_left = Arc::new(optimize_plan(
_optimizer,
left,
&new_required_columns,
true,
_optimizer_config,
)?);
let optimized_right = Arc::new(optimize_plan(
_optimizer,
right,
&new_required_columns,
true,
_optimizer_config,
)?);
let schema = build_join_schema(
optimized_left.schema(),
optimized_right.schema(),
join_type,
)?;
Ok(LogicalPlan::Join(Join {
left: optimized_left,
right: optimized_right,
join_type: *join_type,
join_constraint: *join_constraint,
on: on.clone(),
filter: filter.clone(),
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
}))
}
LogicalPlan::Window(Window {
schema,
window_expr,
input,
..
}) => {
let mut new_window_expr = Vec::new();
{
window_expr.iter().try_for_each(|expr| {
let name = &expr.name(schema)?;
let column = Column::from_name(name);
if required_columns.contains(&column) {
new_window_expr.push(expr.clone());
new_required_columns.insert(column);
expr_to_columns(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
}
if new_window_expr.is_empty() {
return LogicalPlanBuilder::from(optimize_plan(
_optimizer,
input,
required_columns,
true,
_optimizer_config,
)?)
.build();
};
exprlist_to_columns(
&find_sort_exprs(&new_window_expr),
&mut new_required_columns,
)?;
LogicalPlanBuilder::from(optimize_plan(
_optimizer,
input,
&new_required_columns,
true,
_optimizer_config,
)?)
.window(new_window_expr)?
.build()
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
schema,
input,
}) => {
let all_group_expr: Vec<Expr> = grouping_set_to_exprlist(group_expr)?;
exprlist_to_columns(&all_group_expr, &mut new_required_columns)?;
let mut new_aggr_expr = Vec::new();
aggr_expr.iter().try_for_each(|expr| {
let name = &expr.name(schema)?;
let column = Column::from_name(name);
if required_columns.contains(&column) {
new_aggr_expr.push(expr.clone());
new_required_columns.insert(column);
expr_to_columns(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
let new_schema = DFSchema::new_with_metadata(
schema
.fields()
.iter()
.filter(|x| new_required_columns.contains(&x.qualified_column()))
.cloned()
.collect(),
schema.metadata().clone(),
)?;
Ok(LogicalPlan::Aggregate(Aggregate {
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
input: Arc::new(optimize_plan(
_optimizer,
input,
&new_required_columns,
true,
_optimizer_config,
)?),
schema: DFSchemaRef::new(new_schema),
}))
}
LogicalPlan::TableScan(TableScan {
table_name,
source,
filters,
fetch: limit,
..
}) => {
let (projection, projected_schema) = get_projected_schema(
Some(table_name),
&source.schema(),
required_columns,
has_projection,
)?;
Ok(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection: Some(projection),
projected_schema,
filters: filters.clone(),
fetch: *limit,
}))
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze(a) => {
let required_columns = a
.input
.schema()
.fields()
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();
Ok(LogicalPlan::Analyze(Analyze {
input: Arc::new(optimize_plan(
_optimizer,
&a.input,
&required_columns,
false,
_optimizer_config,
)?),
verbose: a.verbose,
schema: a.schema.clone(),
}))
}
LogicalPlan::Union(Union {
inputs,
schema,
alias,
}) => {
let union_required_fields = schema
.fields()
.iter()
.filter(|f| new_required_columns.contains(&f.qualified_column()))
.map(|f| f.field())
.collect::<HashSet<&Field>>();
let new_inputs = inputs
.iter()
.map(|input_plan| {
input_plan
.schema()
.fields()
.iter()
.filter(|f| union_required_fields.contains(f.field()))
.for_each(|f| {
new_required_columns.insert(f.qualified_column());
});
optimize_plan(
_optimizer,
input_plan,
&new_required_columns,
has_projection,
_optimizer_config,
)
})
.collect::<Result<Vec<_>>>()?;
let new_schema = DFSchema::new_with_metadata(
schema
.fields()
.iter()
.filter(|f| union_required_fields.contains(f.field()))
.cloned()
.collect(),
schema.metadata().clone(),
)?;
Ok(LogicalPlan::Union(Union {
inputs: new_inputs.iter().cloned().map(Arc::new).collect(),
schema: Arc::new(new_schema),
alias: alias.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
match input.as_ref() {
LogicalPlan::TableScan(TableScan { table_name, .. }) => {
let new_required_columns = new_required_columns
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => Column {
relation: Some(table_name.clone()),
name: c.name.clone(),
},
_ => c.clone(),
})
.collect();
let new_inputs = vec![optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_optimizer_config,
)?];
let expr = vec![];
from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
)),
}
}
LogicalPlan::Limit(_)
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition(_)
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::Values(_)
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
exprlist_to_columns(&expr, &mut new_required_columns)?;
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input_plan| {
optimize_plan(
_optimizer,
input_plan,
&new_required_columns,
has_projection,
_optimizer_config,
)
})
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::*;
use arrow::datatypes::DataType;
use datafusion_expr::{
col, lit,
logical_plan::{builder::LogicalPlanBuilder, JoinType},
max, min,
utils::exprlist_to_fields,
Expr,
};
use std::collections::HashMap;
#[test]
fn aggregate_no_group_by() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
.build()?;
let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#test.b)]]\
\n TableScan: test projection=[b]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn aggregate_group_by() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("c")], vec![max(col("b"))])?
.build()?;
let expected = "Aggregate: groupBy=[[#test.c]], aggr=[[MAX(#test.b)]]\
\n TableScan: test projection=[b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn aggregate_group_by_with_table_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.alias("a")?
.aggregate(vec![col("c")], vec![max(col("b"))])?
.build()?;
let expected = "Aggregate: groupBy=[[#a.c]], aggr=[[MAX(#a.b)]]\
\n SubqueryAlias: a\
\n TableScan: test projection=[b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn aggregate_no_group_by_with_filter() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(col("c"))?
.aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
.build()?;
let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#test.b)]]\
\n Filter: #test.c\
\n TableScan: test projection=[b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn redundunt_project() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b"), col("c")])?
.project(vec![col("a"), col("c"), col("b")])?
.build()?;
let expected = "Projection: #test.a, #test.c, #test.b\
\n TableScan: test projection=[a, b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn reorder_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("c"), col("b"), col("a")])?
.build()?;
let expected = "Projection: #test.c, #test.b, #test.a\
\n TableScan: test projection=[a, b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn noncontiguous_redundunt_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("c"), col("b"), col("a")])?
.filter(col("c").gt(lit(1)))?
.project(vec![col("c"), col("a"), col("b")])?
.filter(col("b").gt(lit(1)))?
.filter(col("a").gt(lit(1)))?
.project(vec![col("a"), col("c"), col("b")])?
.build()?;
let expected = "Projection: #test.a, #test.c, #test.b\
\n Filter: #test.a > Int32(1)\
\n Filter: #test.b > Int32(1)\
\n Filter: #test.c > Int32(1)\
\n TableScan: test projection=[a, b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn join_schema_trim_full_join_column_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
.join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
.project(vec![col("a"), col("b"), col("c1")])?
.build()?;
let expected = "Projection: #test.a, #test.b, #test2.c1\
\n Left Join: #test.a = #test2.c1\
\n TableScan: test projection=[a, b]\
\n TableScan: test2 projection=[c1]";
let optimized_plan = optimize(&plan)?;
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
let optimized_join = optimized_plan.inputs()[0];
assert_eq!(
**optimized_join.schema(),
DFSchema::new_with_metadata(
vec![
DFField::new(Some("test"), "a", DataType::UInt32, false),
DFField::new(Some("test"), "b", DataType::UInt32, false),
DFField::new(Some("test2"), "c1", DataType::UInt32, false),
],
HashMap::new()
)?,
);
Ok(())
}
#[test]
fn join_schema_trim_partial_join_column_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
.join(&table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
.project(vec![col("a"), col("b")])?
.build()?;
let expected = "Projection: #test.a, #test.b\
\n Left Join: #test.a = #test2.c1\
\n TableScan: test projection=[a, b]\
\n TableScan: test2 projection=[c1]";
let optimized_plan = optimize(&plan)?;
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
let optimized_join = optimized_plan.inputs()[0];
assert_eq!(
**optimized_join.schema(),
DFSchema::new_with_metadata(
vec![
DFField::new(Some("test"), "a", DataType::UInt32, false),
DFField::new(Some("test"), "b", DataType::UInt32, false),
DFField::new(Some("test2"), "c1", DataType::UInt32, false),
],
HashMap::new()
)?,
);
Ok(())
}
#[test]
fn join_schema_trim_using_join() -> Result<()> {
let table_scan = test_table_scan()?;
let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
.join_using(&table2_scan, JoinType::Left, vec!["a"])?
.project(vec![col("a"), col("b")])?
.build()?;
let expected = "Projection: #test.a, #test.b\
\n Left Join: Using #test.a = #test2.a\
\n TableScan: test projection=[a, b]\
\n TableScan: test2 projection=[a]";
let optimized_plan = optimize(&plan)?;
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
let optimized_join = optimized_plan.inputs()[0];
assert_eq!(
**optimized_join.schema(),
DFSchema::new_with_metadata(
vec![
DFField::new(Some("test"), "a", DataType::UInt32, false),
DFField::new(Some("test"), "b", DataType::UInt32, false),
DFField::new(Some("test2"), "a", DataType::UInt32, false),
],
HashMap::new()
)?,
);
Ok(())
}
#[test]
fn cast() -> Result<()> {
let table_scan = test_table_scan()?;
let projection = LogicalPlanBuilder::from(table_scan)
.project(vec![Expr::Cast {
expr: Box::new(col("c")),
data_type: DataType::Float64,
}])?
.build()?;
let expected = "Projection: CAST(#test.c AS Float64)\
\n TableScan: test projection=[c]";
assert_optimized_plan_eq(&projection, expected);
Ok(())
}
#[test]
fn table_scan_projected_schema() -> Result<()> {
let table_scan = test_table_scan()?;
assert_eq!(3, table_scan.schema().fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b")])?
.build()?;
assert_fields_eq(&plan, vec!["a", "b"]);
let expected = "Projection: #test.a, #test.b\
\n TableScan: test projection=[a, b]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
let table_scan = test_table_scan()?;
let input_schema = table_scan.schema();
assert_eq!(3, input_schema.fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
let expr = vec![col("a"), col("b")];
let projected_fields = exprlist_to_fields(&expr, &table_scan).unwrap();
let projected_schema = DFSchema::new_with_metadata(
projected_fields,
input_schema.metadata().clone(),
)
.unwrap();
let plan = LogicalPlan::Projection(Projection {
expr,
input: Arc::new(table_scan),
schema: Arc::new(projected_schema),
alias: None,
});
assert_fields_eq(&plan, vec!["a", "b"]);
let expected = "Projection: #a, #b\
\n TableScan: test projection=[a, b]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn table_limit() -> Result<()> {
let table_scan = test_table_scan()?;
assert_eq!(3, table_scan.schema().fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("c"), col("a")])?
.limit(None, Some(5))?
.build()?;
assert_fields_eq(&plan, vec!["c", "a"]);
let expected = "Limit: skip=None, fetch=5\
\n Projection: #test.c, #test.a\
\n TableScan: test projection=[a, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn table_scan_without_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan).build()?;
let expected = "TableScan: test projection=[a, b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn table_scan_with_literal_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![lit(1_i64), lit(2_i64)])?
.build()?;
let expected = "Projection: Int64(1), Int64(2)\
\n TableScan: test projection=[a]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn table_unused_column() -> Result<()> {
let table_scan = test_table_scan()?;
assert_eq!(3, table_scan.schema().fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("c"), col("a"), col("b")])?
.filter(col("c").gt(lit(1)))?
.aggregate(vec![col("c")], vec![max(col("a"))])?
.build()?;
assert_fields_eq(&plan, vec!["c", "MAX(test.a)"]);
let expected = "\
Aggregate: groupBy=[[#test.c]], aggr=[[MAX(#test.a)]]\
\n Filter: #test.c > Int32(1)\
\n Projection: #test.c, #test.a\
\n TableScan: test projection=[a, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn table_unused_projection() -> Result<()> {
let table_scan = test_table_scan()?;
assert_eq!(3, table_scan.schema().fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("b")])?
.project(vec![lit(1).alias("a")])?
.build()?;
assert_fields_eq(&plan, vec!["a"]);
let expected = "\
Projection: Int32(1) AS a\
\n TableScan: test projection=[a]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
#[test]
fn test_double_optimization() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("b")])?
.project(vec![lit(1).alias("a")])?
.build()?;
let optimized_plan1 = optimize(&plan).expect("failed to optimize plan");
let optimized_plan2 =
optimize(&optimized_plan1).expect("failed to optimize plan");
let formatted_plan1 = format!("{:?}", optimized_plan1);
let formatted_plan2 = format!("{:?}", optimized_plan2);
assert_eq!(formatted_plan1, formatted_plan2);
Ok(())
}
#[test]
fn table_unused_aggregate() -> Result<()> {
let table_scan = test_table_scan()?;
assert_eq!(3, table_scan.schema().fields().len());
assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
.filter(col("c").gt(lit(1)))?
.project(vec![col("c"), col("a"), col("MAX(test.b)")])?
.build()?;
assert_fields_eq(&plan, vec!["c", "a", "MAX(test.b)"]);
let expected = "Projection: #test.c, #test.a, #MAX(test.b)\
\n Filter: #test.c > Int32(1)\
\n Aggregate: groupBy=[[#test.a, #test.c]], aggr=[[MAX(#test.b)]]\
\n TableScan: test projection=[a, b, c]";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimized_plan = optimize(plan).expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let rule = ProjectionPushDown::new();
rule.optimize(plan, &OptimizerConfig::new())
}
}