Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
DataFrame in datafusion::dataframe - Rust
[go: Go Back, main page]

Struct DataFrame

Source
pub struct DataFrame { /* private fields */ }
Expand description

Represents a logical set of rows with the same named columns.

Similar to a Pandas DataFrame or Spark DataFrame, a DataFusion DataFrame represents a 2 dimensional table of rows and columns.

The typical workflow using DataFrames looks like

  1. Create a DataFrame via methods on SessionContext, such as read_csv and read_parquet.

  2. Build a desired calculation by calling methods such as filter, select, aggregate, and limit

  3. Execute into RecordBatches by calling collect

A DataFrame is a wrapper around a LogicalPlan and the SessionState required for execution.

DataFrames are “lazy” in the sense that most methods do not actually compute anything, they just build up a plan. Calling collect executes the plan using the same DataFusion planning and execution process used to execute SQL and other queries.

§Example

let ctx = SessionContext::new();
// Read the data from a csv file
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a new dataframe that computes the equivalent of
// `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
let df = df.filter(col("a").lt_eq(col("b")))?
           .aggregate(vec![col("a")], vec![min(col("b"))])?
           .limit(0, Some(100))?;
// Perform the actual computation
let results = df.collect();

Implementations§

Source§

impl DataFrame

Source

pub async fn write_parquet( self, path: &str, options: DataFrameWriteOptions, writer_options: Option<TableParquetOptions>, ) -> Result<Vec<RecordBatch>, DataFusionError>

Available on crate feature parquet only.

Execute the DataFrame and write the results to Parquet file(s).

§Example
use datafusion::dataframe::DataFrameWriteOptions;
let ctx = SessionContext::new();
// Sort the data by column "b" and write it to a new location
ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
  .write_parquet(
    "output.parquet",
    DataFrameWriteOptions::new(),
    None, // can also specify parquet writing options here
).await?;
Source§

impl DataFrame

Source

pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self

Create a new DataFrame based on an existing LogicalPlan

This is a low-level method and is not typically used by end users. See SessionContext::read_csv and other methods for creating a DataFrame from an existing datasource.

Source

pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr>

Creates logical expression from a SQL query text. The expression is created and processed against the current schema.

§Example: Parsing SQL queries
// datafusion will parse number as i64 first.
let sql = "a > 1 and b in (1, 10)";
let expected = col("a").gt(lit(1 as i64))
  .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let expr = df.parse_sql_expr(sql)?;
assert_eq!(expected, expr);
Source

pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>

Consume the DataFrame and produce a physical plan

Source

pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>

Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
let expected = vec![
    "+---+---+",
    "| a | b |",
    "+---+---+",
    "| 1 | 2 |",
    "+---+---+"
];
Source

pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame>

Project arbitrary list of expression strings into a new DataFrame. Method will parse string expressions into logical plan expressions.

The output DataFrame has one column for each element in exprs.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df : DataFrame = df.select_exprs(&["a * b", "c"])?;
Source

pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>

Project arbitrary expressions (like SQL SELECT expressions) into a new DataFrame.

The output DataFrame has one column for each element in expr_list.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a"), col("b") * col("c")])?;
let expected = vec![
    "+---+-----------------------+",
    "| a | ?table?.b * ?table?.c |",
    "+---+-----------------------+",
    "| 1 | 6                     |",
    "+---+-----------------------+"
];
Source

pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame>

Returns a new DataFrame containing all columns except the specified columns.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// +----+----+----+
// | a  | b  | c  |
// +----+----+----+
// | 1  | 2  | 3  |
// +----+----+----+
let df = df.drop_columns(&["a"])?;
let expected = vec![
    "+---+---+",
    "| b | c |",
    "+---+---+",
    "| 2 | 3 |",
    "+---+---+"
];
Source

pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame>

Expand multiple list/struct columns into a set of rows and new columns.

See also: UnnestOptions documentation for the behavior of unnest

§Example
let ctx = SessionContext::new();
let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
// expand into multiple columns if it's json array, flatten field name if it's nested structure
let df = df.unnest_columns(&["b","c","d"])?;
let expected = vec![
    "+---+------+-------+-----+-----+",
    "| a | b    | c     | d.e | d.f |",
    "+---+------+-------+-----+-----+",
    "| 1 | 2.0  | false | 1   | 2   |",
    "| 1 | 1.3  | true  | 1   | 2   |",
    "| 1 | -6.1 |       | 1   | 2   |",
    "| 2 | 3.0  | false |     |     |",
    "| 2 | 2.3  | true  |     |     |",
    "| 2 | -7.1 |       |     |     |",
    "+---+------+-------+-----+-----+"
];
Source

pub fn unnest_columns_with_options( self, columns: &[&str], options: UnnestOptions, ) -> Result<DataFrame>

Expand multiple list columns into a set of rows, with behavior controlled by UnnestOptions.

Please see the documentation on UnnestOptions for more details about the meaning of unnest.

Source

pub fn filter(self, predicate: Expr) -> Result<DataFrame>

Return a DataFrame with only rows for which predicate evaluates to true.

Rows for which predicate evaluates to false or null are filtered out.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
// all rows where a <= b are returned
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "| 4 | 5 | 6 |",
    "| 7 | 8 | 9 |",
    "+---+---+---+"
];
Source

pub fn aggregate( self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr>, ) -> Result<DataFrame>

Return a new DataFrame that aggregates the rows of the current DataFrame, first optionally grouping by the given expressions.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;

// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
let expected1 = vec![
    "+---+----------------+",
    "| a | min(?table?.b) |",
    "+---+----------------+",
    "| 1 | 2              |",
    "| 4 | 5              |",
    "| 7 | 8              |",
    "+---+----------------+"
];
assert_batches_sorted_eq!(expected1, &df1.collect().await?);
// The following use is the equivalent of "SELECT MIN(b)"
let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
let expected2 = vec![
    "+----------------+",
    "| min(?table?.b) |",
    "+----------------+",
    "| 2              |",
    "+----------------+"
];
Source

pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>

Return a new DataFrame that adds the result of evaluating one or more window functions (Expr::WindowFunction) to the existing columns

Source

pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>

Returns a new DataFrame with a limited number of rows.

§Arguments

skip - Number of rows to skip before fetch any row fetch - Maximum number of rows to return, after skipping skip rows.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.limit(1, Some(2))?;
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 4 | 5 | 6 |",
    "| 7 | 8 | 9 |",
    "+---+---+---+"
];
Source

pub fn union(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the union of two DataFrames, preserving duplicate rows.

The two DataFrames must have exactly the same schema

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?   ;
let d2 = df.clone();
let df = df.union(d2)?;
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "| 1 | 2 | 3 |",
    "+---+---+---+"
];
Source

pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the distinct union of two DataFrames.

The two DataFrames must have exactly the same schema. Any duplicate rows are discarded.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.union_distinct(d2)?;
// df2 are duplicate of df
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "+---+---+---+"
];
Source

pub fn distinct(self) -> Result<DataFrame>

Return a new DataFrame with all duplicated rows removed.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "+---+---+---+"
];
Source

pub fn distinct_on( self, on_expr: Vec<Expr>, select_expr: Vec<Expr>, sort_expr: Option<Vec<SortExpr>>, ) -> Result<DataFrame>

Return a new DataFrame with duplicated rows removed as per the specified expression list according to the provided sorting expressions grouped by the DISTINCT ON clause expressions.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  // Return a single row (a, b) for each distinct value of a
  .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
let expected = vec![
    "+---+---+",
    "| a | b |",
    "+---+---+",
    "| 1 | 2 |",
    "+---+---+"
];
Source

pub async fn describe(self) -> Result<Self>

Return a new DataFrame that has statistics for a DataFrame.

Only summarizes numeric datatypes at the moment and returns nulls for non numeric datatypes. The output format is modeled after pandas

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
let stat = df.describe().await?;
let expected = vec![
    "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
    "| describe   | c_custkey          | c_name             | c_address                          | c_nationkey        | c_phone         | c_acctbal          | c_mktsegment | c_comment                                                                                                |",
    "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
    "| count      | 9.0                | 9                  | 9                                  | 9.0                | 9               | 9.0                | 9            | 9                                                                                                        |",
    "| max        | 10.0               | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS         | 20.0               | 30-114-968-4951 | 9561.95            | MACHINERY    | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
    "| mean       | 6.0                | null               | null                               | 9.88888888888889   | null            | 5153.2155555555555 | null         | null                                                                                                     |",
    "| median     | 6.0                | null               | null                               | 8.0                | null            | 6819.74            | null         | null                                                                                                     |",
    "| min        | 2.0                | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0                | 11-719-748-3364 | 121.65             | AUTOMOBILE   |  deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov   |",
    "| null_count | 0.0                | 0                  | 0                                  | 0.0                | 0               | 0.0                | 0            | 0                                                                                                        |",
    "| std        | 2.7386127875258306 | null               | null                               | 7.2188026092359046 | null            | 3522.169804254585  | null         | null                                                                                                     |",
    "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
assert_batches_sorted_eq!(expected, &stat.collect().await?);
Source

pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame>

Apply a sort by provided expressions with default direction

Source

pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame>

Sort the DataFrame by the specified sorting expressions.

Note that any expression can be turned into a sort expression by calling its sort method.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![
  col("a").sort(false, true),   // a DESC, nulls first
  col("b").sort(true, false), // b ASC, nulls last
 ])?;
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "| 4 | 5 | 6 |",
    "| 7 | 8 | 9 |",
    "+---+---+---+",
];
Source

pub fn join( self, right: DataFrame, join_type: JoinType, left_cols: &[&str], right_cols: &[&str], filter: Option<Expr>, ) -> Result<DataFrame>

Join this DataFrame with another DataFrame using explicitly specified columns and an optional filter expression.

See join_on for a more concise way to specify the join condition. Since DataFusion will automatically identify and optimize equality predicates there is no performance difference between this function and join_on

left_cols and right_cols are used to form “equijoin” predicates (see example below), which are then combined with the optional filter expression. If left_cols and right_cols contain ambiguous column references, they will be disambiguated by prioritizing the left relation for left_cols and the right relation for right_cols.

Note that in case of outer join, the filter is applied to only matched rows.

§Example
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  .select(vec![
    col("a").alias("a2"),
    col("b").alias("b2"),
    col("c").alias("c2")])?;
// Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
// finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let expected = vec![
    "+---+---+---+----+----+----+",
    "| a | b | c | a2 | b2 | c2 |",
    "+---+---+---+----+----+----+",
    "| 1 | 2 | 3 | 1  | 2  | 3  |",
    "+---+---+---+----+----+----+"
];
assert_batches_sorted_eq!(expected, &join.collect().await?);
Source

pub fn join_on( self, right: DataFrame, join_type: JoinType, on_exprs: impl IntoIterator<Item = Expr>, ) -> Result<DataFrame>

Join this DataFrame with another DataFrame using the specified expressions.

Note that DataFusion automatically optimizes joins, including identifying and optimizing equality predicates.

§Example
let ctx = SessionContext::new();
let left = ctx
    .read_csv("tests/data/example.csv", CsvReadOptions::new())
    .await?;
let right = ctx
    .read_csv("tests/data/example.csv", CsvReadOptions::new())
    .await?
    .select(vec![
        col("a").alias("a2"),
        col("b").alias("b2"),
        col("c").alias("c2"),
    ])?;

// Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
// finding all pairs of rows from `left` and `right` where
// where `a != a2` and `b != b2`.
let join_on = left.join_on(
    right,
    JoinType::Inner,
    [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
)?;
let expected = vec![
    "+---+---+---+----+----+----+",
    "| a | b | c | a2 | b2 | c2 |",
    "+---+---+---+----+----+----+",
    "+---+---+---+----+----+----+"
];
Source

pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>

Repartition a DataFrame based on a logical partitioning scheme.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "| 4 | 5 | 6 |",
    "| 7 | 8 | 9 |",
    "+---+---+---+"
];
Source

pub async fn count(self) -> Result<usize>

Return the total number of rows in this DataFrame.

Note that this method will actually run a plan to calculate the count, which may be slow for large or complicated DataFrames.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let count = df.count().await?; // 1
Source

pub async fn collect(self) -> Result<Vec<RecordBatch>>

Execute this DataFrame and buffer all resulting RecordBatches into memory.

Prior to calling collect, modifying a DataFrame simply updates a plan (no actual computation is performed). collect triggers the computation.

See Self::execute_stream to execute a DataFrame without buffering.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
Source

pub async fn show(self) -> Result<()>

Execute the DataFrame and print the results to the console.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
Source

pub async fn show_limit(self, num: usize) -> Result<()>

Execute the DataFrame and print only the first num rows of the result to the console.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
Source

pub fn task_ctx(&self) -> TaskContext

Return a new TaskContext which would be used to execute this DataFrame

Source

pub async fn execute_stream(self) -> Result<SendableRecordBatchStream>

Executes this DataFrame and returns a stream over a single partition

See Self::collect to buffer the RecordBatches in memory.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
§Aborting Execution

Dropping the stream will abort the execution of the query, and free up any allocated resources

Source

pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>

Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
Source

pub async fn execute_stream_partitioned( self, ) -> Result<Vec<SendableRecordBatchStream>>

Executes this DataFrame and returns one stream per partition.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
§Aborting Execution

Dropping the stream will abort the execution of the query, and free up any allocated resources

Source

pub fn schema(&self) -> &DFSchema

Returns the DFSchema describing the output of this DataFrame.

The output DFSchema contains information on the name, data type, and nullability for each column.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
Source

pub fn logical_plan(&self) -> &LogicalPlan

Return a reference to the unoptimized LogicalPlan that comprises this DataFrame.

See Self::into_unoptimized_plan for more details.

Source

pub fn into_parts(self) -> (SessionState, LogicalPlan)

Returns both the LogicalPlan and SessionState that comprise this DataFrame

Source

pub fn into_unoptimized_plan(self) -> LogicalPlan

Return the LogicalPlan represented by this DataFrame without running any optimizers

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state (e.g. a different value of now())

See Self::into_parts to retrieve the owned LogicalPlan and corresponding SessionState.

Source

pub fn into_optimized_plan(self) -> Result<LogicalPlan>

Return the optimized LogicalPlan represented by this DataFrame.

Note: This method should not be used outside testing – see Self::into_unoptimized_plan for more details.

Source

pub fn into_view(self) -> Arc<dyn TableProvider>

Converts this DataFrame into a TableProvider that can be registered as a table view using SessionContext::register_table.

Note: This discards the SessionState associated with this DataFrame in favour of the one passed to TableProvider::scan

Source

pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>

Return a DataFrame with the explanation of its plan so far.

if analyze is specified, runs the plan and reports metrics

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
Source

pub fn registry(&self) -> &dyn FunctionRegistry

Return a FunctionRegistry used to plan udf’s calls

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
Source

pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the intersection of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.intersect(d2)?;
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 1 | 2 | 3 |",
    "+---+---+---+"
];
Source

pub fn except(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let result = df.except(d2)?;
// those columns are not in example.csv, but in example_long.csv
let expected = vec![
    "+---+---+---+",
    "| a | b | c |",
    "+---+---+---+",
    "| 4 | 5 | 6 |",
    "| 7 | 8 | 9 |",
    "+---+---+---+"
];
Source

pub async fn write_table( self, table_name: &str, write_options: DataFrameWriteOptions, ) -> Result<Vec<RecordBatch>, DataFusionError>

Execute this DataFrame and write the results to table_name.

Returns a single RecordBatch containing a single column and row representing the count of total rows written.

Unlike most other DataFrame methods, this method executes eagerly. Data is written to the table using the TableProvider::insert_into method. This is the same underlying implementation used by SQL INSERT INTO statements.

Source

pub async fn write_csv( self, path: &str, options: DataFrameWriteOptions, writer_options: Option<CsvOptions>, ) -> Result<Vec<RecordBatch>, DataFusionError>

Execute the DataFrame and write the results to CSV file(s).

§Example
use datafusion::dataframe::DataFrameWriteOptions;
let ctx = SessionContext::new();
// Sort the data by column "b" and write it to a new location
ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
  .write_csv(
    "output.csv",
    DataFrameWriteOptions::new(),
    None, // can also specify CSV writing options here
).await?;
Source

pub async fn write_json( self, path: &str, options: DataFrameWriteOptions, writer_options: Option<JsonOptions>, ) -> Result<Vec<RecordBatch>, DataFusionError>

Execute the DataFrame and write the results to JSON file(s).

§Example
use datafusion::dataframe::DataFrameWriteOptions;
let ctx = SessionContext::new();
// Sort the data by column "b" and write it to a new location
ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
  .write_json(
    "output.json",
    DataFrameWriteOptions::new(),
    None
).await?;
Source

pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>

Add or replace a column in the DataFrame.

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
Source

pub fn with_column_renamed( self, old_name: impl Into<String>, new_name: &str, ) -> Result<DataFrame>

Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.

The method supports case sensitive rename with wrapping column name into one of following symbols ( “ or ’ or ` )

Alternatively setting DataFusion param datafusion.sql_parser.enable_ident_normalization to false will enable case sensitive rename without need to wrap column name into special symbols

§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
Source

pub fn with_param_values( self, query_values: impl Into<ParamValues>, ) -> Result<Self>

Replace all parameters in logical plan with the specified values, in preparation for execution.

§Example
use datafusion::prelude::*;
let ctx = SessionContext::new();
let results = ctx
  .sql("SELECT a FROM example WHERE b = $1")
  .await?
   // replace $1 with value 2
  .with_param_values(vec![
     // value at index 0 --> $1
     ScalarValue::from(2i64)
   ])?
  .collect()
  .await?;
assert_batches_eq!(
 &[
   "+---+",
   "| a |",
   "+---+",
   "| 1 |",
   "+---+",
 ],
 &results
);
// Note you can also provide named parameters
let results = ctx
  .sql("SELECT a FROM example WHERE b = $my_param")
  .await?
   // replace $my_param with value 2
   // Note you can also use a HashMap as well
  .with_param_values(vec![
      ("my_param", ScalarValue::from(2i64))
   ])?
  .collect()
  .await?;
assert_batches_eq!(
 &[
   "+---+",
   "| a |",
   "+---+",
   "| 1 |",
   "+---+",
 ],
 &results
);
Source

pub async fn cache(self) -> Result<DataFrame>

Cache DataFrame as a memory table.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.cache().await?;
Source

pub fn alias(self, alias: &str) -> Result<DataFrame>

Apply an alias to the DataFrame.

This method replaces the qualifiers of output columns with the given alias.

Source

pub fn fill_null( &self, value: ScalarValue, columns: Vec<String>, ) -> Result<DataFrame>

Fill null values in specified columns with a given value If no columns are specified (empty vector), applies to all columns Only fills if the value can be cast to the column’s type

§Arguments
  • value - Value to fill nulls with
  • columns - List of column names to fill. If empty, fills all columns.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Fill nulls in only columns "a" and "c":
let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
// Fill nulls across all columns:
let df = df.fill_null(ScalarValue::from(0), vec![])?;

Trait Implementations§

Source§

impl Clone for DataFrame

Source§

fn clone(&self) -> DataFrame

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for DataFrame

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T

Source§

impl<T> Ungil for T
where T: Send,