use std::{any::Any, borrow::Cow, sync::Arc};
use crate::Session;
use crate::TableProvider;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::Column;
use datafusion_expr::TableType;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
use datafusion_physical_plan::ExecutionPlan;
#[derive(Debug)]
pub struct ViewTable {
logical_plan: LogicalPlan,
table_schema: SchemaRef,
definition: Option<String>,
}
impl ViewTable {
pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
let table_schema = logical_plan.schema().as_ref().to_owned().into();
Self {
logical_plan,
table_schema,
definition,
}
}
#[deprecated(
since = "47.0.0",
note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed"
)]
pub fn try_new(
logical_plan: LogicalPlan,
definition: Option<String>,
) -> Result<Self> {
Ok(Self::new(logical_plan, definition))
}
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
}
pub fn logical_plan(&self) -> &LogicalPlan {
&self.logical_plan
}
}
#[async_trait]
impl TableProvider for ViewTable {
fn as_any(&self) -> &dyn Any {
self
}
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
Some(Cow::Borrowed(&self.logical_plan))
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
fn table_type(&self) -> TableType {
TableType::View
}
fn get_table_definition(&self) -> Option<&str> {
self.definition.as_deref()
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
let plan = self.logical_plan().clone();
let mut plan = LogicalPlanBuilder::from(plan);
if let Some(filter) = filter {
plan = plan.filter(filter)?;
}
let mut plan = if let Some(projection) = projection {
let current_projection =
(0..plan.schema().fields().len()).collect::<Vec<usize>>();
if projection == ¤t_projection {
plan
} else {
let fields: Vec<Expr> = projection
.iter()
.map(|i| {
Expr::Column(Column::from(
self.logical_plan.schema().qualified_field(*i),
))
})
.collect();
plan.project(fields)?
}
} else {
plan
};
if let Some(limit) = limit {
plan = plan.limit(0, Some(limit))?;
}
state.create_physical_plan(&plan.build()?).await
}
}