Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
ExecutionPlan in datafusion::physical_plan - Rust
[go: Go Back, main page]

Trait ExecutionPlan

Source
pub trait ExecutionPlan:
    Debug
    + DisplayAs
    + Send
    + Sync {
Show 24 methods // Required methods fn name(&self) -> &str; fn as_any(&self) -> &(dyn Any + 'static); fn properties(&self) -> &PlanProperties; fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>; fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>; fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>; // Provided methods fn static_name() -> &'static str where Self: Sized { ... } fn schema(&self) -> Arc<Schema> { ... } fn check_invariants( &self, _check: InvariantLevel, ) -> Result<(), DataFusionError> { ... } fn required_input_distribution(&self) -> Vec<Distribution> { ... } fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> { ... } fn maintains_input_order(&self) -> Vec<bool> { ... } fn benefits_from_input_partitioning(&self) -> Vec<bool> { ... } fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> { ... } fn metrics(&self) -> Option<MetricsSet> { ... } fn statistics(&self) -> Result<Statistics, DataFusionError> { ... } fn partition_statistics( &self, partition: Option<usize>, ) -> Result<Statistics, DataFusionError> { ... } fn supports_limit_pushdown(&self) -> bool { ... } fn with_fetch( &self, _limit: Option<usize>, ) -> Option<Arc<dyn ExecutionPlan>> { ... } fn fetch(&self) -> Option<usize> { ... } fn cardinality_effect(&self) -> CardinalityEffect { ... } fn try_swapping_with_projection( &self, _projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> { ... } fn gather_filters_for_pushdown( &self, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription, DataFusionError> { ... } fn handle_child_pushdown_result( &self, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>, DataFusionError> { ... }
}
Expand description

Represent nodes in the DataFusion Physical Plan.

Calling execute produces an async SendableRecordBatchStream of RecordBatch that incrementally computes a partition of the ExecutionPlan’s output from its input. See Partitioning for more details on partitioning.

Methods such as Self::schema and Self::properties communicate properties of the output to the DataFusion optimizer, and methods such as required_input_distribution and required_input_ordering express requirements of the ExecutionPlan from its input.

ExecutionPlan can be displayed in a simplified form using the return value from displayable in addition to the (normally quite verbose) Debug output.

Required Methods§

Source

fn name(&self) -> &str

Short name for the ExecutionPlan, such as ‘DataSourceExec’.

Implementation note: this method can just proxy to static_name if no special action is needed. It doesn’t provide a default implementation like that because this method doesn’t require the Sized constrain to allow a wilder range of use cases.

Source

fn as_any(&self) -> &(dyn Any + 'static)

Returns the execution plan as Any so that it can be downcast to a specific implementation.

Source

fn properties(&self) -> &PlanProperties

Return properties of the output of the ExecutionPlan, such as output ordering(s), partitioning information etc.

This information is available via methods on ExecutionPlanProperties trait, which is implemented for all ExecutionPlans.

Source

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>

Get a list of children ExecutionPlans that act as inputs to this plan. The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).

Source

fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>

Returns a new ExecutionPlan where all existing children were replaced by the children, in order

Source

fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>

Begin execution of partition, returning a Stream of RecordBatches.

§Notes

The execute method itself is not async but it returns an async futures::stream::Stream. This Stream should incrementally compute the output, RecordBatch by RecordBatch (in a streaming fashion). Most ExecutionPlans should not do any work before the first RecordBatch is requested from the stream.

RecordBatchStreamAdapter can be used to convert an async Stream into a SendableRecordBatchStream.

Using async Streams allows for network I/O during execution and takes advantage of Rust’s built in support for async continuations and crate ecosystem.

§Error handling

Any error that occurs during execution is sent as an Err in the output stream.

ExecutionPlan implementations in DataFusion cancel additional work immediately once an error occurs. The rationale is that if the overall query will return an error, any additional work such as continued polling of inputs will be wasted as it will be thrown away.

§Cancellation / Aborting Execution

The Stream that is returned must ensure that any allocated resources are freed when the stream itself is dropped. This is particularly important for spawned tasks or threads. Unless care is taken to “abort” such tasks, they may continue to consume resources even after the plan is dropped, generating intermediate results that are never used. Thus, spawn is disallowed, and instead use SpawnedTask.

To enable timely cancellation, the Stream that is returned must not block the CPU indefinitely and must yield back to the tokio runtime regularly. In a typical ExecutionPlan, this automatically happens unless there are special circumstances; e.g. when the computational complexity of processing a batch is superlinear. See this general guideline for more context on this point, which explains why one should avoid spending a long time without reaching an await/yield point in asynchronous runtimes. This can be achieved by manually returning Poll::Pending and setting up wakers appropriately, or the use of tokio::task::yield_now() when appropriate. In special cases that warrant manual yielding, determination for “regularly” may be made using a timer (being careful with the overhead-heavy system call needed to take the time), or by counting rows or batches.

The cancellation benchmark tracks some cases of how quickly queries can be cancelled.

For more details see SpawnedTask, JoinSet and RecordBatchReceiverStreamBuilder for structures to help ensure all background tasks are cancelled.

§Implementation Examples

While async Streams have a non trivial learning curve, the futures crate provides StreamExt and TryStreamExt which help simplify many common operations.

Here are some common patterns:

§Return Precomputed RecordBatch

We can return a precomputed RecordBatch as a Stream:

struct MyPlan {
    batch: RecordBatch,
}

impl MyPlan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>
    ) -> Result<SendableRecordBatchStream> {
        // use functions from futures crate convert the batch into a stream
        let fut = futures::future::ready(Ok(self.batch.clone()));
        let stream = futures::stream::once(fut);
        Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
    }
}
§Lazily (async) Compute RecordBatch

We can also lazily compute a RecordBatch when the returned Stream is polled

struct MyPlan {
    schema: SchemaRef,
}

/// Returns a single batch when the returned stream is polled
async fn get_batch() -> Result<RecordBatch> {
    todo!()
}

impl MyPlan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>
    ) -> Result<SendableRecordBatchStream> {
        let fut = get_batch();
        let stream = futures::stream::once(fut);
        Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
    }
}
§Lazily (async) create a Stream

If you need to create the return Stream using an async function, you can do so by flattening the result:

struct MyPlan {
    schema: SchemaRef,
}

/// async function that returns a stream
async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
    todo!()
}

impl MyPlan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>
    ) -> Result<SendableRecordBatchStream> {
        // A future that yields a stream
        let fut = get_batch_stream();
        // Use TryStreamExt::try_flatten to flatten the stream of streams
        let stream = futures::stream::once(fut).try_flatten();
        Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
    }
}

Provided Methods§

Source

fn static_name() -> &'static str
where Self: Sized,

Short name for the ExecutionPlan, such as ‘DataSourceExec’. Like name but can be called without an instance.

Source

fn schema(&self) -> Arc<Schema>

Get the schema for this execution plan

Source

fn check_invariants( &self, _check: InvariantLevel, ) -> Result<(), DataFusionError>

Returns an error if this individual node does not conform to its invariants. These invariants are typically only checked in debug mode.

A default set of invariants is provided in the default implementation. Extension nodes can provide their own invariants.

Source

fn required_input_distribution(&self) -> Vec<Distribution>

Specifies the data distribution requirements for all the children for this ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,

Source

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>>

Specifies the ordering required for all of the children of this ExecutionPlan.

For each child, it’s the local ordering requirement within each partition rather than the global ordering

NOTE that checking !is_empty() does not check for a required input ordering. Instead, the correct check is that at least one entry must be Some

Source

fn maintains_input_order(&self) -> Vec<bool>

Returns false if this ExecutionPlan’s implementation may reorder rows within or between partitions.

For example, Projection, Filter, and Limit maintain the order of inputs – they may transform values (Projection) or not produce the same number of rows that went in (Filter and Limit), but the rows that are produced go in the same way.

DataFusion uses this metadata to apply certain optimizations such as automatically repartitioning correctly.

The default implementation returns false

WARNING: if you override this default, you MUST ensure that the ExecutionPlan’s maintains the ordering invariant or else DataFusion may produce incorrect results.

Source

fn benefits_from_input_partitioning(&self) -> Vec<bool>

Specifies whether the ExecutionPlan benefits from increased parallelization at its input for each child.

If returns true, the ExecutionPlan would benefit from partitioning its corresponding child (and thus from more parallelism). For ExecutionPlan that do very little work the overhead of extra parallelism may outweigh any benefits

The default implementation returns true unless this ExecutionPlan has signalled it requires a single child input partition.

Source

fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>

If supported, attempt to increase the partitioning of this ExecutionPlan to produce target_partitions partitions.

If the ExecutionPlan does not support changing its partitioning, returns Ok(None) (the default).

It is the ExecutionPlan can increase its partitioning, but not to the target_partitions, it may return an ExecutionPlan with fewer partitions. This might happen, for example, if each new partition would be too small to be efficiently processed individually.

The DataFusion optimizer attempts to use as many threads as possible by repartitioning its inputs to match the target number of threads available (target_partitions). Some data sources, such as the built in CSV and Parquet readers, implement this method as they are able to read from their input files in parallel, regardless of how the source data is split amongst files.

Source

fn metrics(&self) -> Option<MetricsSet>

Return a snapshot of the set of Metrics for this ExecutionPlan. If no Metrics are available, return None.

While the values of the metrics in the returned MetricsSets may change as execution progresses, the specific metrics will not.

Once self.execute() has returned (technically the future is resolved) for all available partitions, the set of metrics should be complete. If this function is called prior to execute() new metrics may appear in subsequent calls.

Source

fn statistics(&self) -> Result<Statistics, DataFusionError>

👎Deprecated since 48.0.0: Use partition_statistics method instead

Returns statistics for this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error.

For TableScan executors, which supports filter pushdown, special attention needs to be paid to whether the stats returned by this method are exact or not

Source

fn partition_statistics( &self, partition: Option<usize>, ) -> Result<Statistics, DataFusionError>

Returns statistics for a specific partition of this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error. If partition is None, it returns statistics for the entire plan.

Source

fn supports_limit_pushdown(&self) -> bool

Returns true if a limit can be safely pushed down through this ExecutionPlan node.

If this method returns true, and the query plan contains a limit at the output of this node, DataFusion will push the limit to the input of this node.

Source

fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>

Returns a fetching variant of this ExecutionPlan node, if it supports fetch limits. Returns None otherwise.

Source

fn fetch(&self) -> Option<usize>

Gets the fetch count for the operator, None means there is no fetch.

Source

fn cardinality_effect(&self) -> CardinalityEffect

Gets the effect on cardinality, if known

Source

fn try_swapping_with_projection( &self, _projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>

Attempts to push down the given projection into the input of this ExecutionPlan.

If the operator supports this optimization, the resulting plan will be: self_new <- projection <- source, starting from projection <- self <- source. Otherwise, it returns the current ExecutionPlan as-is.

Returns Ok(Some(...)) if pushdown is applied, Ok(None) if it is not supported or not possible, or Err on failure.

Source

fn gather_filters_for_pushdown( &self, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription, DataFusionError>

Collect filters that this node can push down to its children. Filters that are being pushed down from parents are passed in, and the node may generate additional filters to push down. For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec, what will happen is that we recurse down the plan calling ExecutionPlan::gather_filters_for_pushdown:

  1. FilterExec::gather_filters_for_pushdown is called with no parent filters so it only returns that FilterExec wants to push down its own predicate.
  2. HashJoinExec::gather_filters_for_pushdown is called with the filter from FilterExec, which it only allows to push down to one side of the join (unless it’s on the join key) but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join).
  3. DataSourceExec::gather_filters_for_pushdown is called with both filters from HashJoinExec and FilterExec, however DataSourceExec::gather_filters_for_pushdown doesn’t actually do anything since it has no children and no additional filters to push down. It’s only once ExecutionPlan::handle_child_pushdown_result is called on DataSourceExec as we recurse up the plan that DataSourceExec can actually bind the filters.

The default implementation bars all parent filters from being pushed down and adds no new filters. This is the safest option, making filter pushdown opt-in on a per-node pasis.

Source

fn handle_child_pushdown_result( &self, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>, DataFusionError>

Handle the result of a child pushdown. This is called as we recurse back up the plan tree after recursing down and calling ExecutionPlan::gather_filters_for_pushdown. Once we know what the result of pushing down filters into children is we ask the current node what it wants to do with that result. For a DataSourceExec that may be absorbing the filters to apply them during the scan phase (also known as late materialization). A FilterExec may absorb any filters its children could not absorb, or if there are no filters left it may remove itself from the plan altogether. It combines both ChildPushdownResult::parent_filters and ChildPushdownResult::self_filters into a single predicate and replaces it’s own predicate. Then it passes PredicateSupport::Supported for each parent predicate to the parent. A HashJoinExec may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. It passes ChildPushdownResult::parent_filters back up to it’s parents wrapped in FilterPushdownPropagation::transparent and ChildPushdownResult::self_filters is discarded.

The default implementation is a no-op that passes the result of pushdown from the children to its parent.

Trait Implementations§

Source§

impl DynTreeNode for dyn ExecutionPlan

Source§

fn arc_children(&self) -> Vec<&Arc<dyn ExecutionPlan>>

Returns all children of the specified TreeNode.
Source§

fn with_new_arc_children( &self, arc_self: Arc<dyn ExecutionPlan>, new_children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>

Constructs a new node with the specified children.
Source§

impl ExecutionPlanProperties for &dyn ExecutionPlan

Source§

fn output_partitioning(&self) -> &Partitioning

Specifies how the output of this ExecutionPlan is split into partitions.
Source§

fn output_ordering(&self) -> Option<&LexOrdering>

If the output of this ExecutionPlan within each partition is sorted, returns Some(keys) describing the ordering. A None return value indicates no assumptions should be made on the output ordering. Read more
Source§

fn boundedness(&self) -> Boundedness

Boundedness information of the stream corresponding to this ExecutionPlan. For more details, see Boundedness.
Source§

fn pipeline_behavior(&self) -> EmissionType

Indicates how the stream of this ExecutionPlan emits its results. For more details, see EmissionType.
Source§

fn equivalence_properties(&self) -> &EquivalenceProperties

Get the EquivalenceProperties within the plan. Read more

Implementors§

Source§

impl ExecutionPlan for DataSourceExec

Source§

impl ExecutionPlan for DataSinkExec

Source§

impl ExecutionPlan for OutputRequirementExec

Source§

impl ExecutionPlan for AggregateExec

Source§

impl ExecutionPlan for AnalyzeExec

Source§

impl ExecutionPlan for CoalesceBatchesExec

Source§

impl ExecutionPlan for CoalescePartitionsExec

Source§

impl ExecutionPlan for EmptyExec

Source§

impl ExecutionPlan for ExplainExec

Source§

impl ExecutionPlan for FilterExec

Source§

impl ExecutionPlan for CrossJoinExec

Source§

impl ExecutionPlan for HashJoinExec

Source§

impl ExecutionPlan for NestedLoopJoinExec

Source§

impl ExecutionPlan for SortMergeJoinExec

Source§

impl ExecutionPlan for SymmetricHashJoinExec

Source§

impl ExecutionPlan for GlobalLimitExec

Source§

impl ExecutionPlan for LocalLimitExec

Source§

impl ExecutionPlan for LazyMemoryExec

Source§

impl ExecutionPlan for PlaceholderRowExec

Source§

impl ExecutionPlan for ProjectionExec

Source§

impl ExecutionPlan for RecursiveQueryExec

Source§

impl ExecutionPlan for RepartitionExec

Source§

impl ExecutionPlan for PartialSortExec

Source§

impl ExecutionPlan for SortExec

Source§

impl ExecutionPlan for SortPreservingMergeExec

Source§

impl ExecutionPlan for StreamingTableExec

Source§

impl ExecutionPlan for InterleaveExec

Source§

impl ExecutionPlan for UnionExec

Source§

impl ExecutionPlan for UnnestExec

Source§

impl ExecutionPlan for ValuesExec

Source§

impl ExecutionPlan for BoundedWindowAggExec

Source§

impl ExecutionPlan for WindowAggExec

Source§

impl ExecutionPlan for WorkTableExec