Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
ExecutionPlan in datafusion::physical_plan - Rust
[go: Go Back, main page]

pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
Show 16 methods // Required methods fn as_any(&self) -> &(dyn Any + 'static); fn schema(&self) -> Arc<Schema>; fn output_partitioning(&self) -> Partitioning; fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>; fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>> ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>; fn execute( &self, partition: usize, context: Arc<TaskContext> ) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>; // Provided methods fn unbounded_output( &self, _children: &[bool] ) -> Result<bool, DataFusionError> { ... } fn required_input_distribution(&self) -> Vec<Distribution> { ... } fn required_input_ordering( &self ) -> Vec<Option<Vec<PhysicalSortRequirement>>> { ... } fn maintains_input_order(&self) -> Vec<bool> { ... } fn benefits_from_input_partitioning(&self) -> Vec<bool> { ... } fn equivalence_properties(&self) -> EquivalenceProperties { ... } fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> { ... } fn metrics(&self) -> Option<MetricsSet> { ... } fn statistics(&self) -> Result<Statistics, DataFusionError> { ... }
}
Expand description

Represent nodes in the DataFusion Physical Plan.

Calling execute produces an async SendableRecordBatchStream of RecordBatch that incrementally computes a partition of the ExecutionPlan’s output from its input. See Partitioning for more details on partitioning.

Methods such as schema and output_partitioning communicate properties of this output to the DataFusion optimizer, and methods such as required_input_distribution and required_input_ordering express requirements of the ExecutionPlan from its input.

ExecutionPlan can be displayed in a simplified form using the return value from displayable in addition to the (normally quite verbose) Debug output.

Required Methods§

source

fn as_any(&self) -> &(dyn Any + 'static)

Returns the execution plan as Any so that it can be downcast to a specific implementation.

source

fn schema(&self) -> Arc<Schema>

Get the schema for this execution plan

source

fn output_partitioning(&self) -> Partitioning

Specifies how the output of this ExecutionPlan is split into partitions.

source

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>

If the output of this ExecutionPlan within each partition is sorted, returns Some(keys) with the description of how it was sorted.

For example, Sort, (obviously) produces sorted output as does SortPreservingMergeStream. Less obviously Projection produces sorted output if its input was sorted as it does not reorder the input rows,

It is safe to return None here if your ExecutionPlan does not have any particular output order here

source

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>

Get a list of children ExecutionPlans that act as inputs to this plan. The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).

source

fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>> ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>

Returns a new ExecutionPlan where all existing children were replaced by the children, oi order

source

fn execute( &self, partition: usize, context: Arc<TaskContext> ) -> Result<Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch, DataFusionError>> + Send>>, DataFusionError>

Begin execution of partition, returning a Stream of RecordBatches.

Notes

The execute method itself is not async but it returns an async [futures::stream::Stream]. This Stream should incrementally compute the output, RecordBatch by RecordBatch (in a streaming fashion). Most ExecutionPlans should not do any work before the first RecordBatch is requested from the stream.

RecordBatchStreamAdapter can be used to convert an async Stream into a SendableRecordBatchStream.

Using async Streams allows for network I/O during execution and takes advantage of Rust’s built in support for async continuations and crate ecosystem.

Implementation Examples

While async Streams have a non trivial learning curve, the futures crate provides StreamExt and TryStreamExt which help simplify many common operations.

Here are some common patterns:

Return Precomputed RecordBatch

We can return a precomputed RecordBatch as a Stream:

struct MyPlan {
    batch: RecordBatch,
}

impl MyPlan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>
    ) -> Result<SendableRecordBatchStream> {
        // use functions from futures crate convert the batch into a stream
        let fut = futures::future::ready(Ok(self.batch.clone()));
        let stream = futures::stream::once(fut);
        Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
    }
}
Lazily (async) Compute RecordBatch

We can also lazily compute a RecordBatch when the returned Stream is polled

struct MyPlan {
    schema: SchemaRef,
}

/// Returns a single batch when the returned stream is polled
async fn get_batch() -> Result<RecordBatch> {
    todo!()
}

impl MyPlan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>
    ) -> Result<SendableRecordBatchStream> {
        let fut = get_batch();
        let stream = futures::stream::once(fut);
        Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
    }
}
Lazily (async) create a Stream

If you need to to create the return Stream using an async function, you can do so by flattening the result:

struct MyPlan {
    schema: SchemaRef,
}

/// async function that returns a stream
async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
    todo!()
}

impl MyPlan {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>
    ) -> Result<SendableRecordBatchStream> {
        // A future that yields a stream
        let fut = get_batch_stream();
        // Use TryStreamExt::try_flatten to flatten the stream of streams
        let stream = futures::stream::once(fut).try_flatten();
        Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
    }
}

Provided Methods§

source

fn unbounded_output(&self, _children: &[bool]) -> Result<bool, DataFusionError>

Specifies whether this plan generates an infinite stream of records. If the plan does not support pipelining, but its input(s) are infinite, returns an error to indicate this.

source

fn required_input_distribution(&self) -> Vec<Distribution>

Specifies the data distribution requirements for all the children for this ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,

source

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>

Specifies the ordering required for all of the children of this ExecutionPlan.

For each child, it’s the local ordering requirement within each partition rather than the global ordering

NOTE that checking !is_empty() does not check for a required input ordering. Instead, the correct check is that at least one entry must be Some

source

fn maintains_input_order(&self) -> Vec<bool>

Returns false if this ExecutionPlan’s implementation may reorder rows within or between partitions.

For example, Projection, Filter, and Limit maintain the order of inputs – they may transform values (Projection) or not produce the same number of rows that went in (Filter and Limit), but the rows that are produced go in the same way.

DataFusion uses this metadata to apply certain optimizations such as automatically repartitioning correctly.

The default implementation returns false

WARNING: if you override this default, you MUST ensure that the ExecutionPlan’s maintains the ordering invariant or else DataFusion may produce incorrect results.

source

fn benefits_from_input_partitioning(&self) -> Vec<bool>

Specifies whether the ExecutionPlan benefits from increased parallelization at its input for each child.

If returns true, the ExecutionPlan would benefit from partitioning its corresponding child (and thus from more parallelism). For ExecutionPlan that do very little work the overhead of extra parallelism may outweigh any benefits

The default implementation returns true unless this ExecutionPlan has signalled it requires a single child input partition.

source

fn equivalence_properties(&self) -> EquivalenceProperties

Get the EquivalenceProperties within the plan

source

fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions ) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError>

If supported, attempt to increase the partitioning of this ExecutionPlan to produce target_partitions partitions.

If the ExecutionPlan does not support changing its partitioning, returns Ok(None) (the default).

It is the ExecutionPlan can increase its partitioning, but not to the target_partitions, it may return an ExecutionPlan with fewer partitions. This might happen, for example, if each new partition would be too small to be efficiently processed individually.

The DataFusion optimizer attempts to use as many threads as possible by repartitioning its inputs to match the target number of threads available (target_partitions). Some data sources, such as the built in CSV and Parquet readers, implement this method as they are able to read from their input files in parallel, regardless of how the source data is split amongst files.

source

fn metrics(&self) -> Option<MetricsSet>

Return a snapshot of the set of Metrics for this ExecutionPlan. If no Metrics are available, return None.

While the values of the metrics in the returned MetricsSets may change as execution progresses, the specific metrics will not.

Once self.execute() has returned (technically the future is resolved) for all available partitions, the set of metrics should be complete. If this function is called prior to execute() new metrics may appear in subsequent calls.

source

fn statistics(&self) -> Result<Statistics, DataFusionError>

Returns statistics for this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error.

Trait Implementations§

source§

impl DynTreeNode for dyn ExecutionPlan

source§

fn arc_children(&self) -> Vec<Arc<dyn ExecutionPlan>>

Returns all children of the specified TreeNode
source§

fn with_new_arc_children( &self, arc_self: Arc<dyn ExecutionPlan>, new_children: Vec<Arc<dyn ExecutionPlan>> ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>

construct a new self with the specified children

Implementors§