Expand description
Utilities for improved cooperative scheduling.
§Cooperative scheduling
A single call to poll_next
on a top-level Stream
may potentially perform a lot of work
before it returns a Poll::Pending
. Think for instance of calculating an aggregation over a
large dataset.
If a Stream
runs for a long period of time without yielding back to the Tokio executor,
it can starve other tasks waiting on that executor to execute them.
Additionally, this prevents the query execution from being cancelled.
To ensure that Stream
implementations yield regularly, operators can insert explicit yield
points using the utilities in this module. For most operators this is not necessary. The
Stream
s of the built-in DataFusion operators that generate (rather than manipulate)
RecordBatch
es such as DataSourceExec
and those that eagerly consume RecordBatch
es
(for instance, RepartitionExec
) contain yield points that will make most query Stream
s yield
periodically.
There are a couple of types of operators that should insert yield points:
- New source operators that do not make use of Tokio resources
- Exchange like operators that do not use Tokio’s
Channel
implementation to pass data between tasks
§Adding yield points
Yield points can be inserted manually using the facilities provided by the
Tokio coop module such as
tokio::task::coop::consume_budget
.
Another option is to use the wrapper Stream
implementation provided by this module which will
consume a unit of task budget every time a RecordBatch
is produced.
Wrapper Stream
s can be created using the cooperative
and make_cooperative
functions.
cooperative
is a generic function that takes ownership of the wrapped RecordBatchStream
.
This function has the benefit of not requiring an additional heap allocation and can avoid
dynamic dispatch.
make_cooperative
is a non-generic function that wraps a SendableRecordBatchStream
. This
can be used to wrap dynamically typed, heap allocated RecordBatchStream
s.
§Automatic cooperation
The EnsureCooperative
physical optimizer rule, which is included in the default set of
optimizer rules, inspects query plans for potential cooperative scheduling issues.
It injects the CooperativeExec
wrapper ExecutionPlan
into the query plan where necessary.
This ExecutionPlan
uses make_cooperative
to wrap the Stream
of its input.
The optimizer rule currently checks the plan for exchange-like operators and leave operators
that report SchedulingType::NonCooperative
in their plan properties.
Structs§
- Cooperative
Exec - An execution plan decorator that enables cooperative multitasking.
It wraps the streams produced by its input execution plan using the
make_cooperative
function, which makes the stream participate in Tokio cooperative scheduling. - Cooperative
Stream - A stream that passes record batches through unchanged while cooperating with the Tokio runtime.
It consumes cooperative scheduling budget for each returned
RecordBatch
, allowing other tasks to execute when the budget is exhausted.
Functions§
- cooperative
- Creates a
CooperativeStream
wrapper around the givenRecordBatchStream
. This wrapper collaborates with the Tokio cooperative scheduler by consuming a unit of scheduling budget for each returned record batch. - make_
cooperative - Wraps a
SendableRecordBatchStream
inside aCooperativeStream
to enable cooperative multitasking. SinceSendableRecordBatchStream
is adyn RecordBatchStream
this requires the use of dynamic method dispatch. When the stream type is statically known, consider use the genericcooperative
function to allow static method dispatch.