Trait datafusion::physical_plan::ExecutionPlan
source · pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
Show 16 methods
// Required methods
fn as_any(&self) -> &(dyn Any + 'static);
fn schema(&self) -> Arc<Schema>;
fn output_partitioning(&self) -> Partitioning;
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>;
// Provided methods
fn unbounded_output(
&self,
_children: &[bool]
) -> Result<bool, DataFusionError> { ... }
fn required_input_distribution(&self) -> Vec<Distribution> { ... }
fn required_input_ordering(
&self
) -> Vec<Option<Vec<PhysicalSortRequirement>>> { ... }
fn maintains_input_order(&self) -> Vec<bool> { ... }
fn benefits_from_input_partitioning(&self) -> Vec<bool> { ... }
fn equivalence_properties(&self) -> EquivalenceProperties { ... }
fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> { ... }
fn metrics(&self) -> Option<MetricsSet> { ... }
fn statistics(&self) -> Result<Statistics, DataFusionError> { ... }
}
Expand description
Represent nodes in the DataFusion Physical Plan.
Calling execute
produces an async
SendableRecordBatchStream
of
RecordBatch
that incrementally computes a partition of the
ExecutionPlan
’s output from its input. See Partitioning
for more
details on partitioning.
Methods such as schema
and output_partitioning
communicate
properties of this output to the DataFusion optimizer, and methods such as
required_input_distribution
and required_input_ordering
express
requirements of the ExecutionPlan
from its input.
ExecutionPlan
can be displayed in a simplified form using the
return value from displayable
in addition to the (normally
quite verbose) Debug
output.
Required Methods§
sourcefn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
Returns the execution plan as Any
so that it can be
downcast to a specific implementation.
sourcefn output_partitioning(&self) -> Partitioning
fn output_partitioning(&self) -> Partitioning
Specifies how the output of this ExecutionPlan
is split into
partitions.
sourcefn output_ordering(&self) -> Option<&[PhysicalSortExpr]>
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>
If the output of this ExecutionPlan
within each partition is sorted,
returns Some(keys)
with the description of how it was sorted.
For example, Sort, (obviously) produces sorted output as does
SortPreservingMergeStream. Less obviously Projection
produces sorted output if its input was sorted as it does not
reorder the input rows,
It is safe to return None
here if your ExecutionPlan
does not
have any particular output order here
sourcefn children(&self) -> Vec<Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>
Get a list of children ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).
sourcefn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>> ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
Returns a new ExecutionPlan
where all existing children were replaced
by the children
, oi order
sourcefn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>
fn execute( &self, partition: usize, context: Arc<TaskContext> ) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>
Begin execution of partition
, returning a Stream
of
RecordBatch
es.
Notes
The execute
method itself is not async
but it returns an async
[futures::stream::Stream
]. This Stream
should incrementally compute
the output, RecordBatch
by RecordBatch
(in a streaming fashion).
Most ExecutionPlan
s should not do any work before the first
RecordBatch
is requested from the stream.
RecordBatchStreamAdapter
can be used to convert an async
Stream
into a SendableRecordBatchStream
.
Using async
Streams
allows for network I/O during execution and
takes advantage of Rust’s built in support for async
continuations and
crate ecosystem.
Implementation Examples
While async
Stream
s have a non trivial learning curve, the
futures
crate provides StreamExt
and TryStreamExt
which help simplify many common operations.
Here are some common patterns:
Return Precomputed RecordBatch
We can return a precomputed RecordBatch
as a Stream
:
struct MyPlan {
batch: RecordBatch,
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
// use functions from futures crate convert the batch into a stream
let fut = futures::future::ready(Ok(self.batch.clone()));
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
}
}
Lazily (async) Compute RecordBatch
We can also lazily compute a RecordBatch
when the returned Stream
is polled
struct MyPlan {
schema: SchemaRef,
}
/// Returns a single batch when the returned stream is polled
async fn get_batch() -> Result<RecordBatch> {
todo!()
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
let fut = get_batch();
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
}
}
Lazily (async) create a Stream
If you need to to create the return Stream
using an async
function,
you can do so by flattening the result:
struct MyPlan {
schema: SchemaRef,
}
/// async function that returns a stream
async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
todo!()
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
// A future that yields a stream
let fut = get_batch_stream();
// Use TryStreamExt::try_flatten to flatten the stream of streams
let stream = futures::stream::once(fut).try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
}
}
Provided Methods§
sourcefn unbounded_output(&self, _children: &[bool]) -> Result<bool, DataFusionError>
fn unbounded_output(&self, _children: &[bool]) -> Result<bool, DataFusionError>
Specifies whether this plan generates an infinite stream of records. If the plan does not support pipelining, but its input(s) are infinite, returns an error to indicate this.
sourcefn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
Specifies the data distribution requirements for all the
children for this ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,
sourcefn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
Specifies the ordering required for all of the children of this
ExecutionPlan
.
For each child, it’s the local ordering requirement within each partition rather than the global ordering
NOTE that checking !is_empty()
does not check for a
required input ordering. Instead, the correct check is that at
least one entry must be Some
sourcefn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
Returns false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions.
For example, Projection, Filter, and Limit maintain the order of inputs – they may transform values (Projection) or not produce the same number of rows that went in (Filter and Limit), but the rows that are produced go in the same way.
DataFusion uses this metadata to apply certain optimizations such as automatically repartitioning correctly.
The default implementation returns false
WARNING: if you override this default, you MUST ensure that
the ExecutionPlan
’s maintains the ordering invariant or else
DataFusion may produce incorrect results.
sourcefn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
Specifies whether the ExecutionPlan
benefits from increased
parallelization at its input for each child.
If returns true
, the ExecutionPlan
would benefit from partitioning
its corresponding child (and thus from more parallelism). For
ExecutionPlan
that do very little work the overhead of extra
parallelism may outweigh any benefits
The default implementation returns true
unless this ExecutionPlan
has signalled it requires a single child input partition.
sourcefn equivalence_properties(&self) -> EquivalenceProperties
fn equivalence_properties(&self) -> EquivalenceProperties
Get the EquivalenceProperties
within the plan.
Equivalence properties tell DataFusion what columns are known to be equal, during various optimization passes. By default, this returns “no known equivalences” which is always correct, but may cause DataFusion to unnecessarily resort data.
If this ExecutionPlan makes no changes to the schema of the rows flowing
through it or how columns within each row relate to each other, it
should return the equivalence properties of its input. For
example, since FilterExec
may remove rows from its input, but does not
otherwise modify them, it preserves its input equivalence properties.
However, since ProjectionExec
may calculate derived expressions, it
needs special handling.
See also Self::maintains_input_order
and Self::output_ordering
for related concepts.
sourcefn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>
If supported, attempt to increase the partitioning of this ExecutionPlan
to
produce target_partitions
partitions.
If the ExecutionPlan
does not support changing its partitioning,
returns Ok(None)
(the default).
It is the ExecutionPlan
can increase its partitioning, but not to the
target_partitions
, it may return an ExecutionPlan with fewer
partitions. This might happen, for example, if each new partition would
be too small to be efficiently processed individually.
The DataFusion optimizer attempts to use as many threads as possible by
repartitioning its inputs to match the target number of threads
available (target_partitions
). Some data sources, such as the built in
CSV and Parquet readers, implement this method as they are able to read
from their input files in parallel, regardless of how the source data is
split amongst files.
sourcefn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Return a snapshot of the set of Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None.
While the values of the metrics in the returned
MetricsSet
s may change as execution progresses, the
specific metrics will not.
Once self.execute()
has returned (technically the future is
resolved) for all available partitions, the set of metrics
should be complete. If this function is called prior to
execute()
new metrics may appear in subsequent calls.
sourcefn statistics(&self) -> Result<Statistics, DataFusionError>
fn statistics(&self) -> Result<Statistics, DataFusionError>
Returns statistics for this ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error.