datafusion/lib.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25//
26// Eliminate unnecessary function calls(some may be not cheap) due to `xxx_or`
27// for performance. Also avoid abusing `xxx_or_else` for readability:
28// https://github.com/apache/datafusion/issues/15802
29#![cfg_attr(
30 not(test),
31 deny(
32 clippy::clone_on_ref_ptr,
33 clippy::or_fun_call,
34 clippy::unnecessary_lazy_evaluations
35 )
36)]
37#![warn(missing_docs, clippy::needless_borrow)]
38
39//! [DataFusion] is an extensible query engine written in Rust that
40//! uses [Apache Arrow] as its in-memory format. DataFusion's target users are
41//! developers building fast and feature rich database and analytic systems,
42//! customized to particular workloads. See [use cases] for examples.
43//!
44//! "Out of the box," DataFusion offers [SQL] and [`Dataframe`] APIs,
45//! excellent [performance], built-in support for CSV, Parquet, JSON, and Avro,
46//! extensive customization, and a great community.
47//! [Python Bindings] are also available.
48//!
49//! DataFusion features a full query planner, a columnar, streaming, multi-threaded,
50//! vectorized execution engine, and partitioned data sources. You can
51//! customize DataFusion at almost all points including additional data sources,
52//! query languages, functions, custom operators and more.
53//! See the [Architecture] section below for more details.
54//!
55//! [DataFusion]: https://datafusion.apache.org/
56//! [Apache Arrow]: https://arrow.apache.org
57//! [use cases]: https://datafusion.apache.org/user-guide/introduction.html#use-cases
58//! [SQL]: https://datafusion.apache.org/user-guide/sql/index.html
59//! [`DataFrame`]: dataframe::DataFrame
60//! [performance]: https://benchmark.clickhouse.com/
61//! [Python Bindings]: https://github.com/apache/datafusion-python
62//! [Architecture]: #architecture
63//!
64//! # Examples
65//!
66//! The main entry point for interacting with DataFusion is the
67//! [`SessionContext`]. [`Expr`]s represent expressions such as `a + b`.
68//!
69//! [`SessionContext`]: execution::context::SessionContext
70//!
71//! ## DataFrame
72//!
73//! To execute a query against data stored
74//! in a CSV file using a [`DataFrame`]:
75//!
76//! ```rust
77//! # use datafusion::prelude::*;
78//! # use datafusion::error::Result;
79//! # use datafusion::functions_aggregate::expr_fn::min;
80//! # use datafusion::arrow::array::RecordBatch;
81//!
82//! # #[tokio::main]
83//! # async fn main() -> Result<()> {
84//! let ctx = SessionContext::new();
85//!
86//! // create the dataframe
87//! let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
88//!
89//! // create a plan
90//! let df = df.filter(col("a").lt_eq(col("b")))?
91//! .aggregate(vec![col("a")], vec![min(col("b"))])?
92//! .limit(0, Some(100))?;
93//!
94//! // execute the plan
95//! let results: Vec<RecordBatch> = df.collect().await?;
96//!
97//! // format the results
98//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
99//! .to_string();
100//!
101//! let expected = vec![
102//! "+---+----------------+",
103//! "| a | min(?table?.b) |",
104//! "+---+----------------+",
105//! "| 1 | 2 |",
106//! "+---+----------------+"
107//! ];
108//!
109//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
110//! # Ok(())
111//! # }
112//! ```
113//!
114//! ## SQL
115//!
116//! To execute a query against a CSV file using [SQL]:
117//!
118//! ```
119//! # use datafusion::prelude::*;
120//! # use datafusion::error::Result;
121//! # use datafusion::arrow::array::RecordBatch;
122//!
123//! # #[tokio::main]
124//! # async fn main() -> Result<()> {
125//! let ctx = SessionContext::new();
126//!
127//! ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
128//!
129//! // create a plan
130//! let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
131//!
132//! // execute the plan
133//! let results: Vec<RecordBatch> = df.collect().await?;
134//!
135//! // format the results
136//! let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
137//! .to_string();
138//!
139//! let expected = vec![
140//! "+---+----------------+",
141//! "| a | min(example.b) |",
142//! "+---+----------------+",
143//! "| 1 | 2 |",
144//! "+---+----------------+"
145//! ];
146//!
147//! assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
148//! # Ok(())
149//! # }
150//! ```
151//!
152//! ## More Examples
153//!
154//! There are many additional annotated examples of using DataFusion in the [datafusion-examples] directory.
155//!
156//! [datafusion-examples]: https://github.com/apache/datafusion/tree/main/datafusion-examples
157//!
158//! # Architecture
159//!
160//! <!-- NOTE: The goal of this section is to provide a high level
161//! overview of how DataFusion is organized and then link to other
162//! sections of the docs with more details -->
163//!
164//! You can find a formal description of DataFusion's architecture in our
165//! [SIGMOD 2024 Paper].
166//!
167//! [SIGMOD 2024 Paper]: https://dl.acm.org/doi/10.1145/3626246.3653368
168//!
169//! ## Design Goals
170//! DataFusion's Architecture Goals are:
171//!
172//! 1. Work βout of the boxβ: Provide a very fast, world class query engine with
173//! minimal setup or required configuration.
174//!
175//! 2. Customizable everything: All behavior should be customizable by
176//! implementing traits.
177//!
178//! 3. Architecturally boring π₯±: Follow industrial best practice rather than
179//! trying cutting edge, but unproven, techniques.
180//!
181//! With these principles, users start with a basic, high-performance engine
182//! and specialize it over time to suit their needs and available engineering
183//! capacity.
184//!
185//! ## Overview Presentations
186//!
187//! The following presentations offer high level overviews of the
188//! different components and how they interact together.
189//!
190//! - [Apr 2023]: The Apache DataFusion Architecture talks
191//! - _Query Engine_: [recording](https://youtu.be/NVKujPxwSBA) and [slides](https://docs.google.com/presentation/d/1D3GDVas-8y0sA4c8EOgdCvEjVND4s2E7I6zfs67Y4j8/edit#slide=id.p)
192//! - _Logical Plan and Expressions_: [recording](https://youtu.be/EzZTLiSJnhY) and [slides](https://docs.google.com/presentation/d/1ypylM3-w60kVDW7Q6S99AHzvlBgciTdjsAfqNP85K30)
193//! - _Physical Plan and Execution_: [recording](https://youtu.be/2jkWU3_w6z0) and [slides](https://docs.google.com/presentation/d/1cA2WQJ2qg6tx6y4Wf8FH2WVSm9JQ5UgmBWATHdik0hg)
194//! - [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: [recording](https://www.youtube.com/watch?v=Rii1VTn3seQ) and [slides](https://docs.google.com/presentation/d/1q1bPibvu64k2b7LPi7Yyb0k3gA1BiUYiUbEklqW1Ckc/view#slide=id.g11054eeab4c_0_1165)
195//! - [March 2021]: The DataFusion architecture is described in _Query Engine Design and the Rust-Based DataFusion in Apache Arrow_: [recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) (DataFusion content starts [~ 15 minutes in](https://www.youtube.com/watch?v=K6eCAVEk4kU&t=875s)) and [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934)
196//! - [February 2021]: How DataFusion is used within the Ballista Project is described in _Ballista: Distributed Compute with Rust and Apache Arrow_: [recording](https://www.youtube.com/watch?v=ZZHQaOap9pQ)
197//!
198//! ## Customization and Extension
199//!
200//! DataFusion is designed to be highly extensible, so you can
201//! start with a working, full featured engine, and then
202//! specialize any behavior for your use case. For example,
203//! some projects may add custom [`ExecutionPlan`] operators, or create their own
204//! query language that directly creates [`LogicalPlan`] rather than using the
205//! built in SQL planner, [`SqlToRel`].
206//!
207//! In order to achieve this, DataFusion supports extension at many points:
208//!
209//! * read from any datasource ([`TableProvider`])
210//! * define your own catalogs, schemas, and table lists ([`catalog`] and [`CatalogProvider`])
211//! * build your own query language or plans ([`LogicalPlanBuilder`])
212//! * declare and use user-defined functions ([`ScalarUDF`], and [`AggregateUDF`], [`WindowUDF`])
213//! * add custom plan rewrite passes ([`AnalyzerRule`], [`OptimizerRule`] and [`PhysicalOptimizerRule`])
214//! * extend the planner to use user-defined logical and physical nodes ([`QueryPlanner`])
215//!
216//! You can find examples of each of them in the [datafusion-examples] directory.
217//!
218//! [`TableProvider`]: crate::datasource::TableProvider
219//! [`CatalogProvider`]: crate::catalog::CatalogProvider
220//! [`LogicalPlanBuilder`]: datafusion_expr::logical_plan::builder::LogicalPlanBuilder
221//! [`ScalarUDF`]: crate::logical_expr::ScalarUDF
222//! [`AggregateUDF`]: crate::logical_expr::AggregateUDF
223//! [`WindowUDF`]: crate::logical_expr::WindowUDF
224//! [`QueryPlanner`]: execution::context::QueryPlanner
225//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
226//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
227//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule
228//!
229//! ## Query Planning and Execution Overview
230//!
231//! ### SQL
232//!
233//! ```text
234//! Parsed with SqlToRel creates
235//! sqlparser initial plan
236//! βββββββββββββββββ βββββββββββ βββββββββββββββ
237//! β SELECT * β βQuery { β βProject β
238//! β FROM ... ββββββββββββΆβ.. ββββββββββββββΆβ TableScan β
239//! β β β} β β ... β
240//! βββββββββββββββββ βββββββββββ βββββββββββββββ
241//!
242//! SQL String sqlparser LogicalPlan
243//! AST nodes
244//! ```
245//!
246//! 1. The query string is parsed to an Abstract Syntax Tree (AST)
247//! [`Statement`] using [sqlparser].
248//!
249//! 2. The AST is converted to a [`LogicalPlan`] and logical expressions
250//! [`Expr`]s to compute the desired result by [`SqlToRel`]. This phase
251//! also includes name and type resolution ("binding").
252//!
253//! [`Statement`]: https://docs.rs/sqlparser/latest/sqlparser/ast/enum.Statement.html
254//!
255//! ### DataFrame
256//!
257//! When executing plans using the [`DataFrame`] API, the process is
258//! identical as with SQL, except the DataFrame API builds the
259//! [`LogicalPlan`] directly using [`LogicalPlanBuilder`]. Systems
260//! that have their own custom query languages typically also build
261//! [`LogicalPlan`] directly.
262//!
263//! ### Planning
264//!
265//! ```text
266//! AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
267//! OptimizerRules creates ExecutionPlan improve performance
268//! rewrite plan
269//! βββββββββββββββ βββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
270//! βProject β βProject(x, y)β βProjectExec β βProjectExec β
271//! β TableScan βββ...βββΆβ TableScan βββββββΆβ ... βββ...βββΆβ ... β
272//! β ... β β ... β β DataSourceExecβ β DataSourceExecβ
273//! βββββββββββββββ βββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
274//!
275//! LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan
276//! ```
277//!
278//! To process large datasets with many rows as efficiently as
279//! possible, significant effort is spent planning and
280//! optimizing, in the following manner:
281//!
282//! 1. The [`LogicalPlan`] is checked and rewritten to enforce
283//! semantic rules, such as type coercion, by [`AnalyzerRule`]s
284//!
285//! 2. The [`LogicalPlan`] is rewritten by [`OptimizerRule`]s, such as
286//! projection and filter pushdown, to improve its efficiency.
287//!
288//! 3. The [`LogicalPlan`] is converted to an [`ExecutionPlan`] by a
289//! [`PhysicalPlanner`]
290//!
291//! 4. The [`ExecutionPlan`] is rewritten by
292//! [`PhysicalOptimizerRule`]s, such as sort and join selection, to
293//! improve its efficiency.
294//!
295//! ## Data Sources
296//!
297//! ```text
298//! Planning β
299//! requests β TableProvider::scan
300//! information β creates an
301//! such as schema β ExecutionPlan
302//! β
303//! βΌ
304//! βββββββββββββββββββββββββββ βββββββββββββββββ
305//! β β β β
306//! βimpl TableProvider ββββββββββΆβDataSourceExec β
307//! β β β β
308//! βββββββββββββββββββββββββββ βββββββββββββββββ
309//! TableProvider
310//! (built in or user provided) ExecutionPlan
311//! ```
312//!
313//! A [`TableProvider`] provides information for planning and
314//! an [`ExecutionPlan`] for execution. DataFusion includes [`ListingTable`],
315//! a [`TableProvider`] which reads individual files or directories of files
316//! ("partitioned datasets") of the same file format. Users can add
317//! support for new file formats by implementing the [`TableProvider`]
318//! trait.
319//!
320//! See also:
321//!
322//! 1. [`ListingTable`]: Reads data from one or more Parquet, JSON, CSV, or AVRO
323//! files supporting HIVE style partitioning, optional compression, directly
324//! reading from remote object store and more.
325//!
326//! 2. [`MemTable`]: Reads data from in memory [`RecordBatch`]es.
327//!
328//! 3. [`StreamingTable`]: Reads data from potentially unbounded inputs.
329//!
330//! [`ListingTable`]: crate::datasource::listing::ListingTable
331//! [`MemTable`]: crate::datasource::memory::MemTable
332//! [`StreamingTable`]: crate::catalog::streaming::StreamingTable
333//!
334//! ## Plan Representations
335//!
336//! ### Logical Plans
337//! Logical planning yields [`LogicalPlan`] nodes and [`Expr`]
338//! representing expressions which are [`Schema`] aware and represent statements
339//! independent of how they are physically executed.
340//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
341//! [`LogicalPlan`]s, each potentially containing embedded [`Expr`]s.
342//!
343//! [`LogicalPlan`]s can be rewritten with [`TreeNode`] API, see the
344//! [`tree_node module`] for more details.
345//!
346//! [`Expr`]s can also be rewritten with [`TreeNode`] API and simplified using
347//! [`ExprSimplifier`]. Examples of working with and executing [`Expr`]s can be
348//! found in the [`expr_api`.rs] example
349//!
350//! [`TreeNode`]: datafusion_common::tree_node::TreeNode
351//! [`tree_node module`]: datafusion_expr::logical_plan::tree_node
352//! [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier
353//! [`expr_api`.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
354//!
355//! ### Physical Plans
356//!
357//! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
358//! is a plan that can be executed against data. It a DAG of other
359//! [`ExecutionPlan`]s each potentially containing expressions that implement the
360//! [`PhysicalExpr`] trait.
361//!
362//! Compared to a [`LogicalPlan`], an [`ExecutionPlan`] has additional concrete
363//! information about how to perform calculations (e.g. hash vs merge
364//! join), and how data flows during execution (e.g. partitioning and
365//! sortedness).
366//!
367//! [cp_solver] performs range propagation analysis on [`PhysicalExpr`]s and
368//! [`PruningPredicate`] can prove certain boolean [`PhysicalExpr`]s used for
369//! filtering can never be `true` using additional statistical information.
370//!
371//! [cp_solver]: crate::physical_expr::intervals::cp_solver
372//! [`PruningPredicate`]: datafusion_physical_optimizer::pruning::PruningPredicate
373//! [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
374//!
375//! ## Execution
376//!
377//! ```text
378//! ExecutionPlan::execute Calling next() on the
379//! produces a stream stream produces the data
380//!
381//! ββββββββββββββββββ βββββββββββββββββββββββββββ ββββββββββββββ
382//! βProjectExec β βimpl β βββββΆβRecordBatch β
383//! β ... βββββββΆβSendableRecordBatchStreamββββββ€ ββββββββββββββ
384//! β DataSourceExecβ β β β ββββββββββββββ
385//! ββββββββββββββββββ βββββββββββββββββββββββββββ βββββΆβRecordBatch β
386//! β² β ββββββββββββββ
387//! ExecutionPlan β β ...
388//! β β
389//! β β ββββββββββββββ
390//! PhysicalOptimizerRules βββββΆβRecordBatch β
391//! request information β ββββββββββββββ
392//! such as partitioning β β β β β β β β
393//! βββββΆ None β
394//! β β β β β β β
395//! ```
396//!
397//! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
398//! format, making heavy use of functions from the [arrow]
399//! crate. Values are represented with [`ColumnarValue`], which are either
400//! [`ScalarValue`] (single constant values) or [`ArrayRef`] (Arrow
401//! Arrays).
402//!
403//! Calling [`execute`] produces 1 or more partitions of data,
404//! as a [`SendableRecordBatchStream`], which implements a pull based execution
405//! API. Calling [`next()`]`.await` will incrementally compute and return the next
406//! [`RecordBatch`]. Balanced parallelism is achieved using [Volcano style]
407//! "Exchange" operations implemented by [`RepartitionExec`].
408//!
409//! While some recent research such as [Morsel-Driven Parallelism] describes challenges
410//! with the pull style Volcano execution model on NUMA architectures, in practice DataFusion achieves
411//! similar scalability as systems that use push driven schedulers [such as DuckDB].
412//! See the [DataFusion paper in SIGMOD 2024] for more details.
413//!
414//! [`execute`]: physical_plan::ExecutionPlan::execute
415//! [`SendableRecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream
416//! [`ColumnarValue`]: datafusion_expr::ColumnarValue
417//! [`ScalarValue`]: crate::scalar::ScalarValue
418//! [`ArrayRef`]: arrow::array::ArrayRef
419//! [`Stream`]: futures::stream::Stream
420//!
421//! See the [implementors of `ExecutionPlan`] for a list of physical operators available.
422//!
423//! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
424//! [Volcano style]: https://doi.org/10.1145/93605.98720
425//! [Morsel-Driven Parallelism]: https://db.in.tum.de/~leis/papers/morsels.pdf
426//! [DataFusion paper in SIGMOD 2024]: https://github.com/apache/datafusion/files/15149988/DataFusion_Query_Engine___SIGMOD_2024-FINAL-mk4.pdf
427//! [such as DuckDB]: https://github.com/duckdb/duckdb/issues/1583
428//! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
429//!
430//! ## Streaming Execution
431//!
432//! DataFusion is a "streaming" query engine which means [`ExecutionPlan`]s incrementally
433//! read from their input(s) and compute output one [`RecordBatch`] at a time
434//! by continually polling [`SendableRecordBatchStream`]s. Output and
435//! intermediate [`RecordBatch`]s each have approximately `batch_size` rows,
436//! which amortizes per-batch overhead of execution.
437//!
438//! Note that certain operations, sometimes called "pipeline breakers",
439//! (for example full sorts or hash aggregations) are fundamentally non streaming and
440//! must read their input fully before producing **any** output. As much as possible,
441//! other operators read a single [`RecordBatch`] from their input to produce a
442//! single [`RecordBatch`] as output.
443//!
444//! For example, given this SQL query:
445//!
446//! ```sql
447//! SELECT date_trunc('month', time) FROM data WHERE id IN (10,20,30);
448//! ```
449//!
450//! The diagram below shows the call sequence when a consumer calls [`next()`] to
451//! get the next [`RecordBatch`] of output. While it is possible that some
452//! steps run on different threads, typically tokio will use the same thread
453//! that called [`next()`] to read from the input, apply the filter, and
454//! return the results without interleaving any other operations. This results
455//! in excellent cache locality as the same CPU core that produces the data often
456//! consumes it immediately as well.
457//!
458//! ```text
459//!
460//! Step 3: FilterExec calls next() Step 2: ProjectionExec calls
461//! on input Stream next() on input Stream
462//! β β β β β β β β β β β β β β β β β β β β β β β β β
463//! β Step 1: Consumer
464//! βΌ βΌ β calls next()
465//! ββββββββββββββββββ βββββββ»ββββββββββββββ ββββββββββββββββββββββββββ
466//! β β β β β β β β β β β β β β β β β
467//! β DataSource β β β β β
468//! β (e.g. β β FilterExec β β ProjectionExec β
469//! β ParquetSource) β βid IN (10, 20, 30) β βdate_bin('month', time) β
470//! β β β β β β£ β β β β β β β β β β βΆ
471//! β β β β β β
472//! ββββββββββββββββββ βββββββββββββ³ββββββββ ββββββββββββββββββββββββββ
473//! β β² β² Step 6: ProjectionExec
474//! β β β computes date_trunc into a
475//! β β β β β β β β β β β β β β β β β β β β β β β β new RecordBatch returned
476//! βββββββββββββββββββββββ βββββββββββββββ from client
477//! β RecordBatch β β RecordBatch β
478//! βββββββββββββββββββββββ βββββββββββββββ
479//!
480//! Step 4: DataSource returns a Step 5: FilterExec returns a new
481//! single RecordBatch RecordBatch with only matching rows
482//! ```
483//!
484//! [`next()`]: futures::StreamExt::next
485//!
486//! ## Thread Scheduling, CPU / IO Thread Pools, and [Tokio] [`Runtime`]s
487//!
488//! DataFusion automatically runs each plan with multiple CPU cores using
489//! a [Tokio] [`Runtime`] as a thread pool. While tokio is most commonly used
490//! for asynchronous network I/O, the combination of an efficient, work-stealing
491//! scheduler, and first class compiler support for automatic continuation
492//! generation (`async`) also makes it a compelling choice for CPU intensive
493//! applications as explained in the [Using Rustlangβs Async Tokio
494//! Runtime for CPU-Bound Tasks] blog.
495//!
496//! The number of cores used is determined by the `target_partitions`
497//! configuration setting, which defaults to the number of CPU cores.
498//! While preparing for execution, DataFusion tries to create this many distinct
499//! `async` [`Stream`]s for each [`ExecutionPlan`].
500//! The [`Stream`]s for certain [`ExecutionPlan`]s, such as [`RepartitionExec`]
501//! and [`CoalescePartitionsExec`], spawn [Tokio] [`task`]s, that are run by
502//! threads managed by the [`Runtime`].
503//! Many DataFusion [`Stream`]s perform CPU intensive processing.
504//!
505//! Using `async` for CPU intensive tasks makes it easy for [`TableProvider`]s
506//! to perform network I/O using standard Rust `async` during execution.
507//! However, this design also makes it very easy to mix CPU intensive and latency
508//! sensitive I/O work on the same thread pool ([`Runtime`]).
509//! Using the same (default) [`Runtime`] is convenient, and often works well for
510//! initial development and processing local files, but it can lead to problems
511//! under load and/or when reading from network sources such as AWS S3.
512//!
513//! If your system does not fully utilize either the CPU or network bandwidth
514//! during execution, or you see significantly higher tail (e.g. p99) latencies
515//! responding to network requests, **it is likely you need to use a different
516//! [`Runtime`] for CPU intensive DataFusion plans**. This effect can be especially
517//! pronounced when running several queries concurrently.
518//!
519//! As shown in the following figure, using the same [`Runtime`] for both CPU
520//! intensive processing and network requests can introduce significant
521//! delays in responding to those network requests. Delays in processing network
522//! requests can and does lead network flow control to throttle the available
523//! bandwidth in response.
524//!
525//! ```text
526//! Legend
527//!
528//! ββββββββ
529//! Processing network request β β CPU bound work
530//! is delayed due to processing ββββββββ
531//! CPU bound work βββ
532//! β β Network request
533//! ββ βββ processing
534//!
535//! ββ
536//! β β β β β β β β β β β β β β β β β β β β β β
537//! β β
538//!
539//! βΌ βΌ
540//! βββββββββββββββ βββββββββββββββββββββββββββββββββββββββββββββββββββ
541//! β βthread 1 β ββ ββ Decoding ββ Filtering ββ β
542//! β β βββββββββββββββββββββββββββββββββββββββββββββββββββ
543//! β β ββββββββββββββββ³ββββββββββββββββββββ³βββββββββββββββ
544//! βTokio Runtimeβthread 2 β Decoding β Filtering β Decoding β ...
545//! β(thread pool)β ββββββββββββββββ»ββββββββββββββββββββ»βββββββββββββββ
546//! β β ... ...
547//! β β βββββββββββββββββββββ³βββββββββββββββββββββββ ββββββββββββββββ
548//! β βthread N β Decoding β Filtering ββ β β Decoding β
549//! βββββββββββββββ βββββββββββββββββββββ»βββββββββββββββββββββββ ββββββββββββββββ
550//! ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ
551//! time
552//! ```
553//!
554//! The bottleneck resulting from network throttling can be avoided
555//! by using separate [`Runtime`]s for the different types of work, as shown
556//! in the diagram below.
557//!
558//! ```text
559//! A separate thread pool processes network Legend
560//! requests, reducing the latency for
561//! processing each request ββββββββ
562//! β β CPU bound work
563//! β ββββββββ
564//! β βββ
565//! β β β β β β β β Network request
566//! β β β β β βββ processing
567//! β
568//! βΌ βΌ
569//! βββββββββββββββ βββββββββ
570//! β βthread 1 β ββ ββ β
571//! β β βββββββββ
572//! βTokio Runtimeβ ...
573//! β(thread pool)βthread 2
574//! β β
575//! β"IO Runtime" β ...
576//! β β βββ
577//! β βthread N β β
578//! βββββββββββββββ βββ
579//! ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ
580//! time
581//!
582//! βββββββββββββββ ββββββββββββββββββββββββββββββββββββββββββ
583//! β βthread 1 β Decoding ββ Filtering β
584//! β β ββββββββββββββββββββββββββββββββββββββββββ
585//! βTokio Runtimeβ ββββββββββββββββ³ββββββββββββββββββββ³βββββββββββββββ
586//! β(thread pool)βthread 2 β Decoding β Filtering β Decoding β ...
587//! β β ββββββββββββββββ»ββββββββββββββββββββ»βββββββββββββββ
588//! β CPU Runtime β ... ...
589//! β β βββββββββββββββββββββ³ββββββββββββββββββββ³βββββββββββββββ
590//! β βthread N β Decoding β Filtering β Decoding β
591//! βββββββββββββββ βββββββββββββββββββββ»ββββββββββββββββββββ»βββββββββββββββ
592//! ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ
593//! time
594//!```
595//!
596//! Note that DataFusion does not use [`tokio::task::spawn_blocking`] for
597//! CPU-bounded work, because `spawn_blocking` is designed for blocking **IO**,
598//! not designed CPU bound tasks. Among other challenges, spawned blocking
599//! tasks can't yield waiting for input (can't call `await`) so they
600//! can't be used to limit the number of concurrent CPU bound tasks or
601//! keep the processing pipeline to the same core.
602//!
603//! [Tokio]: https://tokio.rs
604//! [`Runtime`]: tokio::runtime::Runtime
605//! [`task`]: tokio::task
606//! [Using Rustlangβs Async Tokio Runtime for CPU-Bound Tasks]: https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
607//! [`RepartitionExec`]: physical_plan::repartition::RepartitionExec
608//! [`CoalescePartitionsExec`]: physical_plan::coalesce_partitions::CoalescePartitionsExec
609//!
610//! ## State Management and Configuration
611//!
612//! [`ConfigOptions`] contain options to control DataFusion's
613//! execution.
614//!
615//! [`ConfigOptions`]: datafusion_common::config::ConfigOptions
616//!
617//! The state required to execute queries is managed by the following
618//! structures:
619//!
620//! 1. [`SessionContext`]: State needed to create [`LogicalPlan`]s such
621//! as the table definitions and the function registries.
622//!
623//! 2. [`TaskContext`]: State needed for execution such as the
624//! [`MemoryPool`], [`DiskManager`], and [`ObjectStoreRegistry`].
625//!
626//! 3. [`ExecutionProps`]: Per-execution properties and data (such as
627//! starting timestamps, etc).
628//!
629//! [`SessionContext`]: crate::execution::context::SessionContext
630//! [`TaskContext`]: crate::execution::context::TaskContext
631//! [`ExecutionProps`]: crate::execution::context::ExecutionProps
632//!
633//! ### Resource Management
634//!
635//! The amount of memory and temporary local disk space used by
636//! DataFusion when running a plan can be controlled using the
637//! [`MemoryPool`] and [`DiskManager`]. Other runtime options can be
638//! found on [`RuntimeEnv`].
639//!
640//! [`DiskManager`]: crate::execution::DiskManager
641//! [`MemoryPool`]: crate::execution::memory_pool::MemoryPool
642//! [`RuntimeEnv`]: crate::execution::runtime_env::RuntimeEnv
643//! [`ObjectStoreRegistry`]: crate::datasource::object_store::ObjectStoreRegistry
644//!
645//! ## Crate Organization
646//!
647//! Most users interact with DataFusion via this crate (`datafusion`), which re-exports
648//! all functionality needed to build and execute queries.
649//!
650//! There are three other crates that provide additional functionality that
651//! must be used directly:
652//! * [`datafusion_proto`]: Plan serialization and deserialization
653//! * [`datafusion_substrait`]: Support for the substrait plan serialization format
654//! * [`datafusion_sqllogictest`] : The DataFusion SQL logic test runner
655//!
656//! [`datafusion_proto`]: https://crates.io/crates/datafusion-proto
657//! [`datafusion_substrait`]: https://crates.io/crates/datafusion-substrait
658//! [`datafusion_sqllogictest`]: https://crates.io/crates/datafusion-sqllogictest
659//!
660//! DataFusion is internally split into multiple sub crates to
661//! enforce modularity and improve compilation times. See the
662//! [list of modules](#modules) for all available sub-crates. Major ones are
663//!
664//! * [datafusion_common]: Common traits and types
665//! * [datafusion_catalog]: Catalog APIs such as [`SchemaProvider`] and [`CatalogProvider`]
666//! * [datafusion_datasource]: File and Data IO such as [`FileSource`] and [`DataSink`]
667//! * [datafusion_session]: [`Session`] and related structures
668//! * [datafusion_execution]: State and structures needed for execution
669//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure
670//! * [datafusion_functions]: Scalar function packages
671//! * [datafusion_functions_aggregate]: Aggregate functions such as `MIN`, `MAX`, `SUM`, etc
672//! * [datafusion_functions_nested]: Scalar function packages for `ARRAY`s, `MAP`s and `STRUCT`s
673//! * [datafusion_functions_table]: Table Functions such as `GENERATE_SERIES`
674//! * [datafusion_functions_window]: Window functions such as `ROW_NUMBER`, `RANK`, etc
675//! * [datafusion_optimizer]: [`OptimizerRule`]s and [`AnalyzerRule`]s
676//! * [datafusion_physical_expr]: [`PhysicalExpr`] and related expressions
677//! * [datafusion_physical_plan]: [`ExecutionPlan`] and related expressions
678//! * [datafusion_physical_optimizer]: [`ExecutionPlan`] and related expressions
679//! * [datafusion_sql]: SQL planner ([`SqlToRel`])
680//!
681//! [`SchemaProvider`]: datafusion_catalog::SchemaProvider
682//! [`CatalogProvider`]: datafusion_catalog::CatalogProvider
683//! [`Session`]: datafusion_session::Session
684//! [`FileSource`]: datafusion_datasource::file::FileSource
685//! [`DataSink`]: datafusion_datasource::sink::DataSink
686//!
687//! ## Citing DataFusion in Academic Papers
688//!
689//! You can use the following citation to reference DataFusion in academic papers:
690//!
691//! ```text
692//! @inproceedings{lamb2024apache
693//! title={Apache Arrow DataFusion: A Fast, Embeddable, Modular Analytic Query Engine},
694//! author={Lamb, Andrew and Shen, Yijie and Heres, Dani{\"e}l and Chakraborty, Jayjeet and Kabak, Mehmet Ozan and Hsieh, Liang-Chi and Sun, Chao},
695//! booktitle={Companion of the 2024 International Conference on Management of Data},
696//! pages={5--17},
697//! year={2024}
698//! }
699//! ```
700//!
701//! [sqlparser]: https://docs.rs/sqlparser/latest/sqlparser
702//! [`SqlToRel`]: sql::planner::SqlToRel
703//! [`Expr`]: datafusion_expr::Expr
704//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
705//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
706//! [`OptimizerRule`]: optimizer::optimizer::OptimizerRule
707//! [`ExecutionPlan`]: physical_plan::ExecutionPlan
708//! [`PhysicalPlanner`]: physical_planner::PhysicalPlanner
709//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule
710//! [`Schema`]: arrow::datatypes::Schema
711//! [`PhysicalExpr`]: physical_plan::PhysicalExpr
712//! [`RecordBatch`]: arrow::array::RecordBatch
713//! [`RecordBatchReader`]: arrow::record_batch::RecordBatchReader
714//! [`Array`]: arrow::array::Array
715
716/// DataFusion crate version
717pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION");
718
719extern crate core;
720extern crate sqlparser;
721
722pub mod dataframe;
723pub mod datasource;
724pub mod error;
725pub mod execution;
726pub mod physical_planner;
727pub mod prelude;
728pub mod scalar;
729
730// re-export dependencies from arrow-rs to minimize version maintenance for crate users
731pub use arrow;
732#[cfg(feature = "parquet")]
733pub use parquet;
734
735// re-export DataFusion sub-crates at the top level. Use `pub use *`
736// so that the contents of the subcrates appears in rustdocs
737// for details, see https://github.com/apache/datafusion/issues/6648
738
739/// re-export of [`datafusion_common`] crate
740pub mod common {
741 pub use datafusion_common::*;
742
743 /// re-export of [`datafusion_common_runtime`] crate
744 pub mod runtime {
745 pub use datafusion_common_runtime::*;
746 }
747}
748
749// Backwards compatibility
750pub use common::config;
751
752// NB datafusion execution is re-exported in the `execution` module
753
754/// re-export of [`datafusion_catalog`] crate
755pub mod catalog {
756 pub use datafusion_catalog::*;
757}
758
759/// re-export of [`datafusion_expr`] crate
760pub mod logical_expr {
761 pub use datafusion_expr::*;
762}
763
764/// re-export of [`datafusion_expr_common`] crate
765pub mod logical_expr_common {
766 pub use datafusion_expr_common::*;
767}
768
769/// re-export of [`datafusion_optimizer`] crate
770pub mod optimizer {
771 pub use datafusion_optimizer::*;
772}
773
774/// re-export of [`datafusion_physical_optimizer`] crate
775pub mod physical_optimizer {
776 pub use datafusion_physical_optimizer::*;
777}
778
779/// re-export of [`datafusion_physical_expr`] crate
780pub mod physical_expr_common {
781 pub use datafusion_physical_expr_common::*;
782}
783
784/// re-export of [`datafusion_physical_expr`] crate
785pub mod physical_expr {
786 pub use datafusion_physical_expr::*;
787}
788
789/// re-export of [`datafusion_physical_plan`] crate
790pub mod physical_plan {
791 pub use datafusion_physical_plan::*;
792}
793
794// Reexport testing macros for compatibility
795pub use datafusion_common::assert_batches_eq;
796pub use datafusion_common::assert_batches_sorted_eq;
797
798/// re-export of [`datafusion_sql`] crate
799pub mod sql {
800 pub use datafusion_sql::*;
801}
802
803/// re-export of [`datafusion_functions`] crate
804pub mod functions {
805 pub use datafusion_functions::*;
806}
807
808/// re-export of [`datafusion_functions_nested`] crate, if "nested_expressions" feature is enabled
809pub mod functions_nested {
810 #[cfg(feature = "nested_expressions")]
811 pub use datafusion_functions_nested::*;
812}
813
814/// re-export of [`datafusion_functions_nested`] crate as [`functions_array`] for backward compatibility, if "nested_expressions" feature is enabled
815#[deprecated(since = "41.0.0", note = "use datafusion-functions-nested instead")]
816pub mod functions_array {
817 #[cfg(feature = "nested_expressions")]
818 pub use datafusion_functions_nested::*;
819}
820
821/// re-export of [`datafusion_functions_aggregate`] crate
822pub mod functions_aggregate {
823 pub use datafusion_functions_aggregate::*;
824}
825
826/// re-export of [`datafusion_functions_window`] crate
827pub mod functions_window {
828 pub use datafusion_functions_window::*;
829}
830
831/// re-export of [`datafusion_functions_table`] crate
832pub mod functions_table {
833 pub use datafusion_functions_table::*;
834}
835
836/// re-export of variable provider for `@name` and `@@name` style runtime values.
837pub mod variable {
838 pub use datafusion_expr::var_provider::{VarProvider, VarType};
839}
840
841#[cfg(not(target_arch = "wasm32"))]
842pub mod test;
843
844mod schema_equivalence;
845pub mod test_util;
846
847#[cfg(doctest)]
848doc_comment::doctest!("../../../README.md", readme_example_test);
849
850// Instructions for Documentation Examples
851//
852// The following commands test the examples from the user guide as part of
853// `cargo test --doc`
854//
855// # Adding new tests:
856//
857// Simply add code like this to your .md file and ensure your md file is
858// included in the lists below.
859//
860// ```rust
861// <code here will be tested>
862// ```
863//
864// Note that sometimes it helps to author the doctest as a standalone program
865// first, and then copy it into the user guide.
866//
867// # Debugging Test Failures
868//
869// Unfortunately, the line numbers reported by doctest do not correspond to the
870// line numbers of in the .md files. Thus, if a doctest fails, use the name of
871// the test to find the relevant file in the list below, and then find the
872// example in that file to fix.
873//
874// For example, if `user_guide_expressions(line 123)` fails,
875// go to `docs/source/user-guide/expressions.md` to find the relevant problem.
876//
877#[cfg(doctest)]
878doc_comment::doctest!(
879 "../../../docs/source/user-guide/concepts-readings-events.md",
880 user_guide_concepts_readings_events
881);
882
883#[cfg(doctest)]
884doc_comment::doctest!(
885 "../../../docs/source/user-guide/configs.md",
886 user_guide_configs
887);
888
889#[cfg(doctest)]
890doc_comment::doctest!(
891 "../../../docs/source/user-guide/runtime_configs.md",
892 user_guide_runtime_configs
893);
894
895#[cfg(doctest)]
896doc_comment::doctest!(
897 "../../../docs/source/user-guide/crate-configuration.md",
898 user_guide_crate_configuration
899);
900
901#[cfg(doctest)]
902doc_comment::doctest!(
903 "../../../docs/source/user-guide/dataframe.md",
904 user_guide_dataframe
905);
906
907#[cfg(doctest)]
908doc_comment::doctest!(
909 "../../../docs/source/user-guide/example-usage.md",
910 user_guide_example_usage
911);
912
913#[cfg(doctest)]
914doc_comment::doctest!(
915 "../../../docs/source/user-guide/explain-usage.md",
916 user_guide_explain_usage
917);
918
919#[cfg(doctest)]
920doc_comment::doctest!(
921 "../../../docs/source/user-guide/expressions.md",
922 user_guide_expressions
923);
924
925#[cfg(doctest)]
926doc_comment::doctest!("../../../docs/source/user-guide/faq.md", user_guide_faq);
927
928#[cfg(doctest)]
929doc_comment::doctest!(
930 "../../../docs/source/user-guide/introduction.md",
931 user_guide_introduction
932);
933
934#[cfg(doctest)]
935doc_comment::doctest!(
936 "../../../docs/source/user-guide/cli/datasources.md",
937 user_guide_cli_datasource
938);
939
940#[cfg(doctest)]
941doc_comment::doctest!(
942 "../../../docs/source/user-guide/cli/installation.md",
943 user_guide_cli_installation
944);
945
946#[cfg(doctest)]
947doc_comment::doctest!(
948 "../../../docs/source/user-guide/cli/overview.md",
949 user_guide_cli_overview
950);
951
952#[cfg(doctest)]
953doc_comment::doctest!(
954 "../../../docs/source/user-guide/cli/usage.md",
955 user_guide_cli_usage
956);
957
958#[cfg(doctest)]
959doc_comment::doctest!(
960 "../../../docs/source/user-guide/features.md",
961 user_guide_features
962);
963
964#[cfg(doctest)]
965doc_comment::doctest!(
966 "../../../docs/source/user-guide/sql/aggregate_functions.md",
967 user_guide_sql_aggregate_functions
968);
969
970#[cfg(doctest)]
971doc_comment::doctest!(
972 "../../../docs/source/user-guide/sql/data_types.md",
973 user_guide_sql_data_types
974);
975
976#[cfg(doctest)]
977doc_comment::doctest!(
978 "../../../docs/source/user-guide/sql/ddl.md",
979 user_guide_sql_ddl
980);
981
982#[cfg(doctest)]
983doc_comment::doctest!(
984 "../../../docs/source/user-guide/sql/dml.md",
985 user_guide_sql_dml
986);
987
988#[cfg(doctest)]
989doc_comment::doctest!(
990 "../../../docs/source/user-guide/sql/explain.md",
991 user_guide_sql_exmplain
992);
993
994#[cfg(doctest)]
995doc_comment::doctest!(
996 "../../../docs/source/user-guide/sql/information_schema.md",
997 user_guide_sql_information_schema
998);
999
1000#[cfg(doctest)]
1001doc_comment::doctest!(
1002 "../../../docs/source/user-guide/sql/operators.md",
1003 user_guide_sql_operators
1004);
1005
1006#[cfg(doctest)]
1007doc_comment::doctest!(
1008 "../../../docs/source/user-guide/sql/prepared_statements.md",
1009 user_guide_prepared_statements
1010);
1011
1012#[cfg(doctest)]
1013doc_comment::doctest!(
1014 "../../../docs/source/user-guide/sql/scalar_functions.md",
1015 user_guide_sql_scalar_functions
1016);
1017
1018#[cfg(doctest)]
1019doc_comment::doctest!(
1020 "../../../docs/source/user-guide/sql/select.md",
1021 user_guide_sql_select
1022);
1023
1024#[cfg(doctest)]
1025doc_comment::doctest!(
1026 "../../../docs/source/user-guide/sql/special_functions.md",
1027 user_guide_sql_special_functions
1028);
1029
1030#[cfg(doctest)]
1031doc_comment::doctest!(
1032 "../../../docs/source/user-guide/sql/subqueries.md",
1033 user_guide_sql_subqueries
1034);
1035
1036#[cfg(doctest)]
1037doc_comment::doctest!(
1038 "../../../docs/source/user-guide/sql/window_functions.md",
1039 user_guide_sql_window_functions
1040);
1041
1042#[cfg(doctest)]
1043doc_comment::doctest!(
1044 "../../../docs/source/user-guide/sql/format_options.md",
1045 user_guide_sql_format_options
1046);
1047
1048#[cfg(doctest)]
1049doc_comment::doctest!(
1050 "../../../docs/source/library-user-guide/adding-udfs.md",
1051 library_user_guide_adding_udfs
1052);
1053
1054#[cfg(doctest)]
1055doc_comment::doctest!(
1056 "../../../docs/source/library-user-guide/building-logical-plans.md",
1057 library_user_guide_building_logical_plans
1058);
1059
1060#[cfg(doctest)]
1061doc_comment::doctest!(
1062 "../../../docs/source/library-user-guide/catalogs.md",
1063 library_user_guide_catalogs
1064);
1065
1066#[cfg(doctest)]
1067doc_comment::doctest!(
1068 "../../../docs/source/library-user-guide/custom-table-providers.md",
1069 library_user_guide_custom_table_providers
1070);
1071
1072#[cfg(doctest)]
1073doc_comment::doctest!(
1074 "../../../docs/source/library-user-guide/extending-operators.md",
1075 library_user_guide_extending_operators
1076);
1077
1078#[cfg(doctest)]
1079doc_comment::doctest!(
1080 "../../../docs/source/library-user-guide/extensions.md",
1081 library_user_guide_extensions
1082);
1083
1084#[cfg(doctest)]
1085doc_comment::doctest!(
1086 "../../../docs/source/library-user-guide/index.md",
1087 library_user_guide_index
1088);
1089
1090#[cfg(doctest)]
1091doc_comment::doctest!(
1092 "../../../docs/source/library-user-guide/profiling.md",
1093 library_user_guide_profiling
1094);
1095
1096#[cfg(doctest)]
1097doc_comment::doctest!(
1098 "../../../docs/source/library-user-guide/query-optimizer.md",
1099 library_user_guide_query_optimizer
1100);
1101
1102#[cfg(doctest)]
1103doc_comment::doctest!(
1104 "../../../docs/source/library-user-guide/using-the-dataframe-api.md",
1105 library_user_guide_dataframe_api
1106);
1107
1108#[cfg(doctest)]
1109doc_comment::doctest!(
1110 "../../../docs/source/library-user-guide/using-the-sql-api.md",
1111 library_user_guide_sql_api
1112);
1113
1114#[cfg(doctest)]
1115doc_comment::doctest!(
1116 "../../../docs/source/library-user-guide/working-with-exprs.md",
1117 library_user_guide_working_with_exprs
1118);
1119
1120#[cfg(doctest)]
1121doc_comment::doctest!(
1122 "../../../docs/source/library-user-guide/upgrading.md",
1123 library_user_guide_upgrading
1124);
1125
1126#[cfg(doctest)]
1127doc_comment::doctest!(
1128 "../../../docs/source/contributor-guide/api-health.md",
1129 contributor_guide_api_health
1130);