Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
datafusion::physical_plan::coop - Rust
[go: Go Back, main page]

Module coop

Source
Expand description

Utilities for improved cooperative scheduling.

§Cooperative scheduling

A single call to poll_next on a top-level Stream may potentially perform a lot of work before it returns a Poll::Pending. Think for instance of calculating an aggregation over a large dataset. If a Stream runs for a long period of time without yielding back to the Tokio executor, it can starve other tasks waiting on that executor to execute them. Additionally, this prevents the query execution from being cancelled.

To ensure that Stream implementations yield regularly, operators can insert explicit yield points using the utilities in this module. For most operators this is not necessary. The Streams of the built-in DataFusion operators that generate (rather than manipulate) RecordBatches such as DataSourceExec and those that eagerly consume RecordBatches (for instance, RepartitionExec) contain yield points that will make most query Streams yield periodically.

There are a couple of types of operators that should insert yield points:

  • New source operators that do not make use of Tokio resources
  • Exchange like operators that do not use Tokio’s Channel implementation to pass data between tasks

§Adding yield points

Yield points can be inserted manually using the facilities provided by the Tokio coop module such as tokio::task::coop::consume_budget.

Another option is to use the wrapper Stream implementation provided by this module which will consume a unit of task budget every time a RecordBatch is produced. Wrapper Streams can be created using the cooperative and make_cooperative functions.

cooperative is a generic function that takes ownership of the wrapped RecordBatchStream. This function has the benefit of not requiring an additional heap allocation and can avoid dynamic dispatch.

make_cooperative is a non-generic function that wraps a SendableRecordBatchStream. This can be used to wrap dynamically typed, heap allocated RecordBatchStreams.

§Automatic cooperation

The EnsureCooperative physical optimizer rule, which is included in the default set of optimizer rules, inspects query plans for potential cooperative scheduling issues. It injects the CooperativeExec wrapper ExecutionPlan into the query plan where necessary. This ExecutionPlan uses make_cooperative to wrap the Stream of its input.

The optimizer rule currently checks the plan for exchange-like operators and leave operators that report SchedulingType::NonCooperative in their plan properties.

Structs§

CooperativeExec
An execution plan decorator that enables cooperative multitasking. It wraps the streams produced by its input execution plan using the make_cooperative function, which makes the stream participate in Tokio cooperative scheduling.
CooperativeStream
A stream that passes record batches through unchanged while cooperating with the Tokio runtime. It consumes cooperative scheduling budget for each returned RecordBatch, allowing other tasks to execute when the budget is exhausted.

Functions§

cooperative
Creates a CooperativeStream wrapper around the given RecordBatchStream. This wrapper collaborates with the Tokio cooperative scheduler by consuming a unit of scheduling budget for each returned record batch.
make_cooperative
Wraps a SendableRecordBatchStream inside a CooperativeStream to enable cooperative multitasking. Since SendableRecordBatchStream is a dyn RecordBatchStream this requires the use of dynamic method dispatch. When the stream type is statically known, consider use the generic cooperative function to allow static method dispatch.