Crate datafusion
source ·Expand description
DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s use cases include building very fast database and analytic systems, customized to particular workloads.
“Out of the box,” DataFusion quickly runs complex SQL and
DataFrame
queries using a sophisticated query planner, a columnar,
multi-threaded, vectorized execution engine, and partitioned data
sources (Parquet, CSV, JSON, and Avro).
DataFusion can also be easily customized to support additional data sources, query languages, functions, custom operators and more.
Examples
The main entry point for interacting with DataFusion is the
SessionContext
.
DataFrame
To execute a query against data stored
in a CSV file using a DataFrame
:
let ctx = SessionContext::new();
// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
SQL
To execute a query against a CSV file using SQL:
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | MIN(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
More Examples
There are many additional annotated examples of using DataFusion in the datafusion-examples directory.
Customization and Extension
DataFusion supports extension at many points:
- read from any datasource (
TableProvider
) - define your own catalogs, schemas, and table lists (
CatalogProvider
) - build your own query langue or plans using the (
LogicalPlanBuilder
) - declare and use user-defined scalar functions (
ScalarUDF
) - declare and use user-defined aggregate functions (
AggregateUDF
) - add custom optimizer rewrite passes (
OptimizerRule
andPhysicalOptimizerRule
) - extend the planner to use user-defined logical and physical nodes (
QueryPlanner
)
You can find examples of each of them in the datafusion-examples directory.
Code Organization
Overview Presentations
The following presentations offer high level overviews of the different components and how they interact together.
- [Apr 2023]: The Apache Arrow DataFusion Architecture talks
- [February 2021]: How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow: recording
- [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
- [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
- [February 2021]: How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow: recording
Architecture
DataFusion is a fully fledged query engine capable of performing complex operations. Specifically, when DataFusion receives an SQL query, there are different steps that it passes through until a result is obtained. Broadly, they are:
- The string is parsed to an Abstract syntax tree (AST) using sqlparser.
- The planner
SqlToRel
converts logical expressions on the AST to logical expressionsExpr
s. - The planner
SqlToRel
converts logical nodes on the AST to aLogicalPlan
. OptimizerRule
s are applied to theLogicalPlan
to optimize it.- The
LogicalPlan
is converted to anExecutionPlan
by aPhysicalPlanner
- The
ExecutionPlan
is executed against data through theSessionContext
With the DataFrame
API, steps 1-3 are not used as the DataFrame builds the LogicalPlan
directly.
Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a lot of effort to ensure that phase 6 runs efficiently and without errors.
DataFusion’s planning is divided in two main parts: logical planning and physical planning.
Logical planning
Logical planning yields LogicalPlan
s and logical Expr
expressions which are Schema
aware and represent statements
whose result is independent of how it should physically be
executed.
A LogicalPlan
is a Directed Acyclic Graph (DAG) of other
LogicalPlan
s, and each node contains Expr
s. All of these
are located in datafusion_expr
module.
Physical planning
An ExecutionPlan
(sometimes referred to as a “physical plan”)
is a plan that can be executed against data. Compared to a
logical plan, the physical plan has concrete information about how
calculations should be performed (e.g. what Rust functions are
used) and how data should be loaded into memory.
ExecutionPlan
s uses the Apache Arrow format as its in-memory
representation of data, through the arrow crate. The arrow
crate documents how the memory is physically represented.
A ExecutionPlan
is composed by nodes (which each implement the
ExecutionPlan
trait). Each node can contain physical
expressions (PhysicalExpr
) or aggreagate expressions
(AggregateExpr
). All of these are located in the
physical_plan
module.
Broadly speaking,
- an
ExecutionPlan
receives a partition number and asynchronously returns an iterator overRecordBatch
(a node-specific struct that implementsRecordBatchReader
) - a
PhysicalExpr
receives aRecordBatch
and returns anArray
- an
AggregateExpr
receives a series ofRecordBatch
es and returns aRecordBatch
of a single row(*)
(*) Technically, it aggregates the results on each partition and then merges the results into a single partition.
The following physical nodes are currently implemented:
- Projection:
ProjectionExec
- Filter:
FilterExec
- Grouped and non-grouped aggregations:
AggregateExec
- Hash Join:
HashJoinExec
- Cross Join:
CrossJoinExec
- Sort Merge Join:
SortMergeJoinExec
- Union:
UnionExec
- Sort:
SortExec
- Coalesce partitions:
CoalescePartitionsExec
- Limit:
LocalLimitExec
andGlobalLimitExec
- Scan CSV:
CsvExec
- Scan Parquet:
ParquetExec
- Scan Avro:
AvroExec
- Scan newline-delimited JSON:
NdJsonExec
- Scan from memory:
MemoryExec
- Explain the plan:
ExplainExec
Future topics (coming soon):
- Analyzer Rules
- Resource management (memory and disk)
Re-exports
pub use arrow;
pub use parquet;
pub use datafusion_common as common;
pub use datafusion_expr as logical_expr;
pub use datafusion_optimizer as optimizer;
pub use datafusion_physical_expr as physical_expr;
pub use datafusion_row as row;
pub use datafusion_sql as sql;
Modules
- This module contains utilities to manipulate avro metadata.
- This module contains interfaces and default implementations of table namespacing concepts, including catalogs and schemas.
- DataFusion Configuration Options
- DataFrame API for building and executing query plans.
- DataFusion data sources
- DataFusion error types
- This module contains the shared state available at different parts of query planning and execution
- A trait to define from_slice functions for arrow types
- This module contains a query optimizer that operates against a physical plan and applies rules to a physical plan, such as “Repartition”.
- Traits for physical query plan, supporting parallel execution for partitioned relations.
- A “prelude” for users of the datafusion crate.
- ScalarValue reimported from datafusion-common to easy migration when datafusion was split into several different crates
- Utility functions to make testing DataFusion based crates easier
- Variable provider
Macros
- Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
- Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line
Constants
- DataFusion crate version