pub trait ExecutionPlan:
Debug
+ DisplayAs
+ Send
+ Sync {
Show 25 methods
// Required methods
fn name(&self) -> &str;
fn as_any(&self) -> &(dyn Any + 'static);
fn properties(&self) -> &PlanProperties;
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>;
// Provided methods
fn static_name() -> &'static str
where Self: Sized { ... }
fn schema(&self) -> Arc<Schema> { ... }
fn check_invariants(
&self,
_check: InvariantLevel,
) -> Result<(), DataFusionError> { ... }
fn required_input_distribution(&self) -> Vec<Distribution> { ... }
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> { ... }
fn maintains_input_order(&self) -> Vec<bool> { ... }
fn benefits_from_input_partitioning(&self) -> Vec<bool> { ... }
fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> { ... }
fn metrics(&self) -> Option<MetricsSet> { ... }
fn statistics(&self) -> Result<Statistics, DataFusionError> { ... }
fn partition_statistics(
&self,
partition: Option<usize>,
) -> Result<Statistics, DataFusionError> { ... }
fn supports_limit_pushdown(&self) -> bool { ... }
fn with_fetch(
&self,
_limit: Option<usize>,
) -> Option<Arc<dyn ExecutionPlan>> { ... }
fn fetch(&self) -> Option<usize> { ... }
fn cardinality_effect(&self) -> CardinalityEffect { ... }
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> { ... }
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription, DataFusionError> { ... }
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>, DataFusionError> { ... }
fn with_new_state(
&self,
_state: Arc<dyn Any + Sync + Send>,
) -> Option<Arc<dyn ExecutionPlan>> { ... }
}
Expand description
Represent nodes in the DataFusion Physical Plan.
Calling execute
produces an async
SendableRecordBatchStream
of
RecordBatch
that incrementally computes a partition of the
ExecutionPlan
’s output from its input. See Partitioning
for more
details on partitioning.
Methods such as Self::schema
and Self::properties
communicate
properties of the output to the DataFusion optimizer, and methods such as
required_input_distribution
and required_input_ordering
express
requirements of the ExecutionPlan
from its input.
ExecutionPlan
can be displayed in a simplified form using the
return value from displayable
in addition to the (normally
quite verbose) Debug
output.
Required Methods§
Sourcefn name(&self) -> &str
fn name(&self) -> &str
Short name for the ExecutionPlan, such as ‘DataSourceExec’.
Implementation note: this method can just proxy to
static_name
if no special action is
needed. It doesn’t provide a default implementation like that because
this method doesn’t require the Sized
constrain to allow a wilder
range of use cases.
Sourcefn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
Returns the execution plan as Any
so that it can be
downcast to a specific implementation.
Sourcefn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
Return properties of the output of the ExecutionPlan
, such as output
ordering(s), partitioning information etc.
This information is available via methods on ExecutionPlanProperties
trait, which is implemented for all ExecutionPlan
s.
Sourcefn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
Get a list of children ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).
Sourcefn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
Returns a new ExecutionPlan
where all existing children were replaced
by the children
, in order
Sourcefn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>
Begin execution of partition
, returning a Stream
of
RecordBatch
es.
§Notes
The execute
method itself is not async
but it returns an async
futures::stream::Stream
. This Stream
should incrementally compute
the output, RecordBatch
by RecordBatch
(in a streaming fashion).
Most ExecutionPlan
s should not do any work before the first
RecordBatch
is requested from the stream.
RecordBatchStreamAdapter
can be used to convert an async
Stream
into a SendableRecordBatchStream
.
Using async
Streams
allows for network I/O during execution and
takes advantage of Rust’s built in support for async
continuations and
crate ecosystem.
§Error handling
Any error that occurs during execution is sent as an Err
in the output
stream.
ExecutionPlan
implementations in DataFusion cancel additional work
immediately once an error occurs. The rationale is that if the overall
query will return an error, any additional work such as continued
polling of inputs will be wasted as it will be thrown away.
§Cancellation / Aborting Execution
The Stream
that is returned must ensure that any allocated resources
are freed when the stream itself is dropped. This is particularly
important for spawn
ed tasks or threads. Unless care is taken to
“abort” such tasks, they may continue to consume resources even after
the plan is dropped, generating intermediate results that are never
used.
Thus, spawn
is disallowed, and instead use SpawnedTask
.
To enable timely cancellation, the Stream
that is returned must not
block the CPU indefinitely and must yield back to the tokio runtime regularly.
In a typical ExecutionPlan
, this automatically happens unless there are
special circumstances; e.g. when the computational complexity of processing a
batch is superlinear. See this general guideline for more context
on this point, which explains why one should avoid spending a long time without
reaching an await
/yield point in asynchronous runtimes.
This can be achieved by using the utilities from the coop
module, by
manually returning Poll::Pending
and setting up wakers appropriately, or by calling
tokio::task::yield_now()
when appropriate.
In special cases that warrant manual yielding, determination for “regularly” may be
made using the Tokio task budget,
a timer (being careful with the overhead-heavy system call needed to take the time), or by
counting rows or batches.
The cancellation benchmark tracks some cases of how quickly queries can be cancelled.
For more details see SpawnedTask
, JoinSet
and RecordBatchReceiverStreamBuilder
for structures to help ensure all background tasks are cancelled.
§Implementation Examples
While async
Stream
s have a non trivial learning curve, the
futures
crate provides StreamExt
and TryStreamExt
which help simplify many common operations.
Here are some common patterns:
§Return Precomputed RecordBatch
We can return a precomputed RecordBatch
as a Stream
:
struct MyPlan {
batch: RecordBatch,
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
// use functions from futures crate convert the batch into a stream
let fut = futures::future::ready(Ok(self.batch.clone()));
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
}
}
§Lazily (async) Compute RecordBatch
We can also lazily compute a RecordBatch
when the returned Stream
is polled
struct MyPlan {
schema: SchemaRef,
}
/// Returns a single batch when the returned stream is polled
async fn get_batch() -> Result<RecordBatch> {
todo!()
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
let fut = get_batch();
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
}
}
§Lazily (async) create a Stream
If you need to create the return Stream
using an async
function,
you can do so by flattening the result:
struct MyPlan {
schema: SchemaRef,
}
/// async function that returns a stream
async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
todo!()
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
// A future that yields a stream
let fut = get_batch_stream();
// Use TryStreamExt::try_flatten to flatten the stream of streams
let stream = futures::stream::once(fut).try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
}
}
Provided Methods§
Sourcefn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
Short name for the ExecutionPlan, such as ‘DataSourceExec’.
Like name
but can be called without an instance.
Sourcefn check_invariants(
&self,
_check: InvariantLevel,
) -> Result<(), DataFusionError>
fn check_invariants( &self, _check: InvariantLevel, ) -> Result<(), DataFusionError>
Returns an error if this individual node does not conform to its invariants. These invariants are typically only checked in debug mode.
A default set of invariants is provided in the default implementation. Extension nodes can provide their own invariants.
Sourcefn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
Specifies the data distribution requirements for all the
children for this ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,
Sourcefn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
Specifies the ordering required for all of the children of this
ExecutionPlan
.
For each child, it’s the local ordering requirement within each partition rather than the global ordering
NOTE that checking !is_empty()
does not check for a
required input ordering. Instead, the correct check is that at
least one entry must be Some
Sourcefn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
Returns false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions.
For example, Projection, Filter, and Limit maintain the order of inputs – they may transform values (Projection) or not produce the same number of rows that went in (Filter and Limit), but the rows that are produced go in the same way.
DataFusion uses this metadata to apply certain optimizations such as automatically repartitioning correctly.
The default implementation returns false
WARNING: if you override this default, you MUST ensure that
the ExecutionPlan
’s maintains the ordering invariant or else
DataFusion may produce incorrect results.
Sourcefn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
Specifies whether the ExecutionPlan
benefits from increased
parallelization at its input for each child.
If returns true
, the ExecutionPlan
would benefit from partitioning
its corresponding child (and thus from more parallelism). For
ExecutionPlan
that do very little work the overhead of extra
parallelism may outweigh any benefits
The default implementation returns true
unless this ExecutionPlan
has signalled it requires a single child input partition.
Sourcefn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>
If supported, attempt to increase the partitioning of this ExecutionPlan
to
produce target_partitions
partitions.
If the ExecutionPlan
does not support changing its partitioning,
returns Ok(None)
(the default).
It is the ExecutionPlan
can increase its partitioning, but not to the
target_partitions
, it may return an ExecutionPlan with fewer
partitions. This might happen, for example, if each new partition would
be too small to be efficiently processed individually.
The DataFusion optimizer attempts to use as many threads as possible by
repartitioning its inputs to match the target number of threads
available (target_partitions
). Some data sources, such as the built in
CSV and Parquet readers, implement this method as they are able to read
from their input files in parallel, regardless of how the source data is
split amongst files.
Sourcefn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Return a snapshot of the set of Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None.
While the values of the metrics in the returned
MetricsSet
s may change as execution progresses, the
specific metrics will not.
Once self.execute()
has returned (technically the future is
resolved) for all available partitions, the set of metrics
should be complete. If this function is called prior to
execute()
new metrics may appear in subsequent calls.
Sourcefn statistics(&self) -> Result<Statistics, DataFusionError>
👎Deprecated since 48.0.0: Use partition_statistics
method instead
fn statistics(&self) -> Result<Statistics, DataFusionError>
partition_statistics
method insteadReturns statistics for this ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error.
For TableScan executors, which supports filter pushdown, special attention needs to be paid to whether the stats returned by this method are exact or not
Sourcefn partition_statistics(
&self,
partition: Option<usize>,
) -> Result<Statistics, DataFusionError>
fn partition_statistics( &self, partition: Option<usize>, ) -> Result<Statistics, DataFusionError>
Returns statistics for a specific partition of this ExecutionPlan
node.
If statistics are not available, should return Statistics::new_unknown
(the default), not an error.
If partition
is None
, it returns statistics for the entire plan.
Sourcefn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Returns true
if a limit can be safely pushed down through this
ExecutionPlan
node.
If this method returns true
, and the query plan contains a limit at
the output of this node, DataFusion will push the limit to the input
of this node.
Sourcefn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
Returns a fetching variant of this ExecutionPlan
node, if it supports
fetch limits. Returns None
otherwise.
Sourcefn fetch(&self) -> Option<usize>
fn fetch(&self) -> Option<usize>
Gets the fetch count for the operator, None
means there is no fetch.
Sourcefn cardinality_effect(&self) -> CardinalityEffect
fn cardinality_effect(&self) -> CardinalityEffect
Gets the effect on cardinality, if known
Sourcefn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>
fn try_swapping_with_projection( &self, _projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>
Attempts to push down the given projection into the input of this ExecutionPlan
.
If the operator supports this optimization, the resulting plan will be:
self_new <- projection <- source
, starting from projection <- self <- source
.
Otherwise, it returns the current ExecutionPlan
as-is.
Returns Ok(Some(...))
if pushdown is applied, Ok(None)
if it is not supported
or not possible, or Err
on failure.
Sourcefn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription, DataFusionError>
fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription, DataFusionError>
Collect filters that this node can push down to its children.
Filters that are being pushed down from parents are passed in,
and the node may generate additional filters to push down.
For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec,
what will happen is that we recurse down the plan calling ExecutionPlan::gather_filters_for_pushdown
:
FilterExec::gather_filters_for_pushdown
is called with no parent filters so it only returns thatFilterExec
wants to push down its own predicate.HashJoinExec::gather_filters_for_pushdown
is called with the filter fromFilterExec
, which it only allows to push down to one side of the join (unless it’s on the join key) but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).DataSourceExec::gather_filters_for_pushdown
is called with both filters fromHashJoinExec
andFilterExec
, howeverDataSourceExec::gather_filters_for_pushdown
doesn’t actually do anything since it has no children and no additional filters to push down. It’s only onceExecutionPlan::handle_child_pushdown_result
is called onDataSourceExec
as we recurse up the plan thatDataSourceExec
can actually bind the filters.
The default implementation bars all parent filters from being pushed down and adds no new filters. This is the safest option, making filter pushdown opt-in on a per-node pasis.
There are two different phases in filter pushdown, which some operators may handle the same and some differently.
Depending on the phase the operator may or may not be allowed to modify the plan.
See FilterPushdownPhase
for more details.
Sourcefn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>, DataFusionError>
fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>, DataFusionError>
Handle the result of a child pushdown.
This method is called as we recurse back up the plan tree after pushing
filters down to child nodes via ExecutionPlan::gather_filters_for_pushdown
.
It allows the current node to process the results of filter pushdown from
its children, deciding whether to absorb filters, modify the plan, or pass
filters back up to its parent.
Purpose and Context: Filter pushdown is a critical optimization in DataFusion that aims to reduce the amount of data processed by applying filters as early as possible in the query plan. This method is part of the second phase of filter pushdown, where results are propagated back up the tree after being pushed down. Each node can inspect the pushdown results from its children and decide how to handle any unapplied filters, potentially optimizing the plan structure or filter application.
Behavior in Different Nodes:
- For a
DataSourceExec
, this often means absorbing the filters to apply them during the scan phase (late materialization), reducing the data read from the source. - A
FilterExec
may absorb any filters its children could not handle, combining them with its own predicate. If no filters remain (i.e., the predicate becomes trivially true), it may remove itself from the plan altogether. It typically marks parent filters as supported, indicating they have been handled. - A
HashJoinExec
might ignore the pushdown result if filters need to be applied during the join operation. It passes the parent filters back up wrapped inFilterPushdownPropagation::if_any
, discarding any self-filters from children.
Example Walkthrough:
Consider a query plan: FilterExec (f1) -> HashJoinExec -> DataSourceExec
.
- Downward Phase (
gather_filters_for_pushdown
): Starting atFilterExec
, the filterf1
is gathered and pushed down toHashJoinExec
.HashJoinExec
may allowf1
to pass to one side of the join or add its own filters (e.g., a min-max filter from the build side), then pushes filters toDataSourceExec
.DataSourceExec
, being a leaf node, has no children to push to, so it prepares to handle filters in the upward phase. - Upward Phase (
handle_child_pushdown_result
): Starting atDataSourceExec
, it absorbs applicable filters fromHashJoinExec
for late materialization during scanning, marking them as supported.HashJoinExec
receives the result, decides whether to apply any remaining filters during the join, and passes unhandled filters back up toFilterExec
.FilterExec
absorbs any unhandled filters, updates its predicate if necessary, or removes itself if the predicate becomes trivial (e.g.,lit(true)
), and marks filters as supported for its parent.
The default implementation is a no-op that passes the result of pushdown from the children to its parent transparently, ensuring no filters are lost if a node does not override this behavior.
Notes for Implementation:
When returning filters via FilterPushdownPropagation
, the order of
filters need not match the order they were passed in via
child_pushdown_result
. However, preserving the order is recommended for
debugging and ease of reasoning about the resulting plans.
Helper Methods for Customization: There are various helper methods to simplify implementing this method:
FilterPushdownPropagation::if_any
: Marks all parent filters as supported as long as at least one child supports them.FilterPushdownPropagation::if_all
: Marks all parent filters as supported as long as all children support them.FilterPushdownPropagation::with_parent_pushdown_result
: Allows adding filters to the propagation result, indicating which filters are supported by the current node.FilterPushdownPropagation::with_updated_node
: Allows updating the current node in the propagation result, used if the node has modified its plan based on the pushdown results.
Filter Pushdown Phases:
There are two different phases in filter pushdown (Pre
and others),
which some operators may handle differently. Depending on the phase, the
operator may or may not be allowed to modify the plan. See
FilterPushdownPhase
for more details on phase-specific behavior.
Sourcefn with_new_state(
&self,
_state: Arc<dyn Any + Sync + Send>,
) -> Option<Arc<dyn ExecutionPlan>>
fn with_new_state( &self, _state: Arc<dyn Any + Sync + Send>, ) -> Option<Arc<dyn ExecutionPlan>>
Injects arbitrary run-time state into this execution plan, returning a new plan instance that incorporates that state if it is relevant to the concrete node implementation.
This is a generic entry point: the state
can be any type wrapped in
Arc<dyn Any + Send + Sync>
. A node that cares about the state should
down-cast it to the concrete type it expects and, if successful, return a
modified copy of itself that captures the provided value. If the state is
not applicable, the default behaviour is to return None
so that parent
nodes can continue propagating the attempt further down the plan tree.
For example, WorkTableExec
down-casts the supplied state to an Arc<WorkTable>
in order to wire up the working table used during recursive-CTE execution.
Similar patterns can be followed by custom nodes that need late-bound
dependencies or shared state.
Trait Implementations§
Source§impl DynTreeNode for dyn ExecutionPlan
impl DynTreeNode for dyn ExecutionPlan
Source§fn arc_children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn arc_children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
TreeNode
.Source§fn with_new_arc_children(
&self,
arc_self: Arc<dyn ExecutionPlan>,
new_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
fn with_new_arc_children( &self, arc_self: Arc<dyn ExecutionPlan>, new_children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
Source§impl ExecutionPlanProperties for &dyn ExecutionPlan
impl ExecutionPlanProperties for &dyn ExecutionPlan
Source§fn output_partitioning(&self) -> &Partitioning
fn output_partitioning(&self) -> &Partitioning
ExecutionPlan
is split into
partitions.Source§fn output_ordering(&self) -> Option<&LexOrdering>
fn output_ordering(&self) -> Option<&LexOrdering>
ExecutionPlan
within each partition is sorted,
returns Some(keys)
describing the ordering. A None
return value
indicates no assumptions should be made on the output ordering. Read moreSource§fn boundedness(&self) -> Boundedness
fn boundedness(&self) -> Boundedness
ExecutionPlan
.
For more details, see Boundedness
.Source§fn pipeline_behavior(&self) -> EmissionType
fn pipeline_behavior(&self) -> EmissionType
ExecutionPlan
emits its results.
For more details, see EmissionType
.Source§fn equivalence_properties(&self) -> &EquivalenceProperties
fn equivalence_properties(&self) -> &EquivalenceProperties
EquivalenceProperties
within the plan. Read more