use super::expressions::PhysicalSortExpr;
use super::{common, SendableRecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::{
memory::MemoryStream, ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, PhysicalExpr,
};
use crate::scalar::ScalarValue;
use arrow::array::new_null_array;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use std::any::Any;
use std::sync::Arc;
#[derive(Debug)]
pub struct ValuesExec {
schema: SchemaRef,
data: Vec<RecordBatch>,
}
impl ValuesExec {
pub fn try_new(
schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<Self> {
if data.is_empty() {
return Err(DataFusionError::Plan("Values list cannot be empty".into()));
}
let n_row = data.len();
let n_col = schema.fields().len();
let batch = RecordBatch::try_new(
schema.clone(),
schema
.fields()
.iter()
.map(|field| new_null_array(field.data_type(), 1))
.collect::<Vec<_>>(),
)?;
let arr = (0..n_col)
.map(|j| {
(0..n_row)
.map(|i| {
let r = data[i][j].evaluate(&batch);
match r {
Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
ScalarValue::try_from_array(&a, 0)
}
Ok(ColumnarValue::Array(a)) => {
Err(DataFusionError::Plan(format!(
"Cannot have array values {:?} in a values list",
a
)))
}
Err(err) => Err(err),
}
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new(schema.clone(), arr)?;
let data: Vec<RecordBatch> = vec![batch];
Ok(Self { schema, data })
}
fn data(&self) -> Vec<RecordBatch> {
self.data.clone()
}
}
impl ExecutionPlan for ValuesExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn relies_on_input_order(&self) -> bool {
false
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ValuesExec {
schema: self.schema.clone(),
data: self.data.clone(),
}))
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"ValuesExec invalid partition {} (expected 0)",
partition
)));
}
Ok(Box::pin(MemoryStream::try_new(
self.data(),
self.schema.clone(),
None,
)?))
}
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "ValuesExec")
}
}
}
fn statistics(&self) -> Statistics {
let batch = self.data();
common::compute_record_batch_statistics(&[batch], &self.schema, None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util;
#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = test_util::aggr_test_schema();
let empty = ValuesExec::try_new(schema, vec![]);
assert!(empty.is_err());
Ok(())
}
}