Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
datafusion - Rust
[go: Go Back, main page]

Crate datafusion

source ·
Expand description

DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s use cases include building very fast database and analytic systems, customized to particular workloads.

“Out of the box,” DataFusion quickly runs complex SQL and DataFrame queries using a sophisticated query planner, a columnar, multi-threaded, vectorized execution engine, and partitioned data sources (Parquet, CSV, JSON, and Avro).

DataFusion can also be easily customized to support additional data sources, query languages, functions, custom operators and more.

Examples

The main entry point for interacting with DataFusion is the SessionContext.

DataFrame

To execute a query against data stored in a CSV file using a DataFrame:


let ctx = SessionContext::new();

// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
           .aggregate(vec![col("a")], vec![min(col("b"))])?
           .limit(0, Some(100))?;

// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
   .to_string();

let expected = vec![
    "+---+----------------+",
    "| a | MIN(?table?.b) |",
    "+---+----------------+",
    "| 1 | 2              |",
    "+---+----------------+"
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);

SQL

To execute a query against a CSV file using SQL:


let ctx = SessionContext::new();

ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
  .to_string();

let expected = vec![
    "+---+----------------+",
    "| a | MIN(example.b) |",
    "+---+----------------+",
    "| 1 | 2              |",
    "+---+----------------+"
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);

More Examples

There are many additional annotated examples of using DataFusion in the datafusion-examples directory.

Customization and Extension

DataFusion supports extension at many points:

You can find examples of each of them in the datafusion-examples directory.

Code Organization

Overview Presentations

The following presentations offer high level overviews of the different components and how they interact together.

  • [Apr 2023]: The Apache Arrow DataFusion Architecture talks
  • [February 2021]: How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow: recording
  • [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
  • [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
  • [February 2021]: How DataFusion is used within the Ballista Project is described in *Ballista: Distributed Compute with Rust and Apache Arrow: recording

Architecture

DataFusion is a fully fledged query engine capable of performing complex operations. Specifically, when DataFusion receives an SQL query, there are different steps that it passes through until a result is obtained. Broadly, they are:

  1. The string is parsed to an Abstract syntax tree (AST) using sqlparser.
  2. The planner SqlToRel converts logical expressions on the AST to logical expressions Exprs.
  3. The planner SqlToRel converts logical nodes on the AST to a LogicalPlan.
  4. OptimizerRules are applied to the LogicalPlan to optimize it.
  5. The LogicalPlan is converted to an ExecutionPlan by a PhysicalPlanner
  6. The ExecutionPlanis executed against data through the SessionContext

With the DataFrame API, steps 1-3 are not used as the DataFrame builds the LogicalPlan directly.

Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a lot of effort to ensure that phase 6 runs efficiently and without errors.

DataFusion’s planning is divided in two main parts: logical planning and physical planning.

Logical planning

Logical planning yields LogicalPlans and logical Expr expressions which are Schemaaware and represent statements whose result is independent of how it should physically be executed.

A LogicalPlan is a Directed Acyclic Graph (DAG) of other LogicalPlans, and each node contains Exprs. All of these are located in datafusion_expr module.

Physical planning

An ExecutionPlan (sometimes referred to as a “physical plan”) is a plan that can be executed against data. Compared to a logical plan, the physical plan has concrete information about how calculations should be performed (e.g. what Rust functions are used) and how data should be loaded into memory.

ExecutionPlans uses the Apache Arrow format as its in-memory representation of data, through the arrow crate. The arrow crate documents how the memory is physically represented.

A ExecutionPlan is composed by nodes (which each implement the ExecutionPlan trait). Each node can contain physical expressions (PhysicalExpr) or aggreagate expressions (AggregateExpr). All of these are located in the physical_plan module.

Broadly speaking,

(*) Technically, it aggregates the results on each partition and then merges the results into a single partition.

The following physical nodes are currently implemented:

Future topics (coming soon):

  • Analyzer Rules
  • Resource management (memory and disk)

Re-exports

Modules

  • This module contains utilities to manipulate avro metadata.
  • This module contains interfaces and default implementations of table namespacing concepts, including catalogs and schemas.
  • DataFusion Configuration Options
  • DataFrame API for building and executing query plans.
  • DataFusion data sources
  • DataFusion error types
  • This module contains the shared state available at different parts of query planning and execution
  • A trait to define from_slice functions for arrow types
  • This module contains a query optimizer that operates against a physical plan and applies rules to a physical plan, such as “Repartition”.
  • Traits for physical query plan, supporting parallel execution for partitioned relations.
  • A “prelude” for users of the datafusion crate.
  • ScalarValue reimported from datafusion-common to easy migration when datafusion was split into several different crates
  • Utility functions to make testing DataFusion based crates easier
  • Variable provider

Macros

  • Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
  • Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line

Constants