Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
mod.rs - source
[go: Go Back, main page]

datafusion/dataframe/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataFrame`] API for building and executing query plans.
19
20#[cfg(feature = "parquet")]
21mod parquet;
22
23use crate::arrow::record_batch::RecordBatch;
24use crate::arrow::util::pretty;
25use crate::datasource::file_format::csv::CsvFormatFactory;
26use crate::datasource::file_format::format_as_file_type;
27use crate::datasource::file_format::json::JsonFormatFactory;
28use crate::datasource::{
29    provider_as_source, DefaultTableSource, MemTable, TableProvider,
30};
31use crate::error::Result;
32use crate::execution::context::{SessionState, TaskContext};
33use crate::execution::FunctionRegistry;
34use crate::logical_expr::utils::find_window_exprs;
35use crate::logical_expr::{
36    col, ident, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
37    LogicalPlanBuilderOptions, Partitioning, TableType,
38};
39use crate::physical_plan::{
40    collect, collect_partitioned, execute_stream, execute_stream_partitioned,
41    ExecutionPlan, SendableRecordBatchStream,
42};
43use crate::prelude::SessionContext;
44use std::any::Any;
45use std::borrow::Cow;
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
50use arrow::compute::{cast, concat};
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::config::{CsvOptions, JsonOptions};
53use datafusion_common::{
54    exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
55    DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions,
56};
57use datafusion_expr::select_expr::SelectExpr;
58use datafusion_expr::{
59    case,
60    dml::InsertOp,
61    expr::{Alias, ScalarFunction},
62    is_null, lit,
63    utils::COUNT_STAR_EXPANSION,
64    SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
65};
66use datafusion_functions::core::coalesce;
67use datafusion_functions_aggregate::expr_fn::{
68    avg, count, max, median, min, stddev, sum,
69};
70
71use async_trait::async_trait;
72use datafusion_catalog::Session;
73use datafusion_sql::TableReference;
74
75/// Contains options that control how data is
76/// written out from a DataFrame
77pub struct DataFrameWriteOptions {
78    /// Controls how new data should be written to the table, determining whether
79    /// to append, overwrite, or replace existing data.
80    insert_op: InsertOp,
81    /// Controls if all partitions should be coalesced into a single output file
82    /// Generally will have slower performance when set to true.
83    single_file_output: bool,
84    /// Sets which columns should be used for hive-style partitioned writes by name.
85    /// Can be set to empty vec![] for non-partitioned writes.
86    partition_by: Vec<String>,
87    /// Sets which columns should be used for sorting the output by name.
88    /// Can be set to empty vec![] for non-sorted writes.
89    sort_by: Vec<SortExpr>,
90}
91
92impl DataFrameWriteOptions {
93    /// Create a new DataFrameWriteOptions with default values
94    pub fn new() -> Self {
95        DataFrameWriteOptions {
96            insert_op: InsertOp::Append,
97            single_file_output: false,
98            partition_by: vec![],
99            sort_by: vec![],
100        }
101    }
102
103    /// Set the insert operation
104    pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
105        self.insert_op = insert_op;
106        self
107    }
108
109    /// Set the single_file_output value to true or false
110    pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111        self.single_file_output = single_file_output;
112        self
113    }
114
115    /// Sets the partition_by columns for output partitioning
116    pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
117        self.partition_by = partition_by;
118        self
119    }
120
121    /// Sets the sort_by columns for output sorting
122    pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
123        self.sort_by = sort_by;
124        self
125    }
126}
127
128impl Default for DataFrameWriteOptions {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// Represents a logical set of rows with the same named columns.
135///
136/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame
137/// represents a 2 dimensional table of rows and columns.
138///
139/// The typical workflow using DataFrames looks like
140///
141/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
142///    and [`read_parquet`].
143///
144/// 2. Build a desired calculation by calling methods such as [`filter`],
145///    [`select`], [`aggregate`], and [`limit`]
146///
147/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
148///
149/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
150///    required for execution.
151///
152/// DataFrames are "lazy" in the sense that most methods do not actually compute
153/// anything, they just build up a plan. Calling [`collect`] executes the plan
154/// using the same DataFusion planning and execution process used to execute SQL
155/// and other queries.
156///
157/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
158/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html
159/// [`read_csv`]: SessionContext::read_csv
160/// [`read_parquet`]: SessionContext::read_parquet
161/// [`filter`]: DataFrame::filter
162/// [`select`]: DataFrame::select
163/// [`aggregate`]: DataFrame::aggregate
164/// [`limit`]: DataFrame::limit
165/// [`collect`]: DataFrame::collect
166///
167/// # Example
168/// ```
169/// # use std::sync::Arc;
170/// # use datafusion::prelude::*;
171/// # use datafusion::error::Result;
172/// # use datafusion::functions_aggregate::expr_fn::min;
173/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
174/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
175/// # #[tokio::main]
176/// # async fn main() -> Result<()> {
177/// let ctx = SessionContext::new();
178/// // Read the data from a csv file
179/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
180/// // create a new dataframe that computes the equivalent of
181/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
182/// let df = df.filter(col("a").lt_eq(col("b")))?
183///            .aggregate(vec![col("a")], vec![min(col("b"))])?
184///            .limit(0, Some(100))?;
185/// // Perform the actual computation
186/// let results = df.collect();
187///
188/// // Create a new dataframe with in-memory data
189/// let schema = Schema::new(vec![
190///     Field::new("id", DataType::Int32, true),
191///     Field::new("name", DataType::Utf8, true),
192/// ]);
193/// let batch = RecordBatch::try_new(
194///     Arc::new(schema),
195///     vec![
196///         Arc::new(Int32Array::from(vec![1, 2, 3])),
197///         Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
198///     ],
199/// )?;
200/// let df = ctx.read_batch(batch)?;
201/// df.show().await?;
202///
203/// // Create a new dataframe with in-memory data using macro
204/// let df = dataframe!(
205///     "id" => [1, 2, 3],
206///     "name" => ["foo", "bar", "baz"]
207///  )?;
208/// df.show().await?;
209/// # Ok(())
210/// # }
211/// ```
212#[derive(Debug, Clone)]
213pub struct DataFrame {
214    // Box the (large) SessionState to reduce the size of DataFrame on the stack
215    session_state: Box<SessionState>,
216    plan: LogicalPlan,
217    // Whether projection ops can skip validation or not. This flag if false
218    // allows for an optimization in `with_column` and `with_column_renamed` functions
219    // where the recursive work required to columnize and normalize expressions can
220    // be skipped if set to false. Since these function calls are often chained or
221    // called many times in dataframe operations this can result in a significant
222    // performance gain.
223    //
224    // The conditions where this can be set to false is when the dataframe function
225    // call results in the last operation being a
226    // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
227    // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
228    // call. This requirement guarantees that the plan has had all columnization
229    // and normalization applied to existing expressions and only new expressions
230    // will require that work. Any operation that update the plan in any way
231    // via anything other than a `project` call should set this to true.
232    projection_requires_validation: bool,
233}
234
235impl DataFrame {
236    /// Create a new `DataFrame ` based on an existing `LogicalPlan`
237    ///
238    /// This is a low-level method and is not typically used by end users. See
239    /// [`SessionContext::read_csv`] and other methods for creating a
240    /// `DataFrame` from an existing datasource.
241    pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
242        Self {
243            session_state: Box::new(session_state),
244            plan,
245            projection_requires_validation: true,
246        }
247    }
248
249    /// Creates logical expression from a SQL query text.
250    /// The expression is created and processed against the current schema.
251    ///
252    /// # Example: Parsing SQL queries
253    /// ```
254    /// # use arrow::datatypes::{DataType, Field, Schema};
255    /// # use datafusion::prelude::*;
256    /// # use datafusion_common::{DFSchema, Result};
257    /// # #[tokio::main]
258    /// # async fn main() -> Result<()> {
259    /// // datafusion will parse number as i64 first.
260    /// let sql = "a > 1 and b in (1, 10)";
261    /// let expected = col("a").gt(lit(1 as i64))
262    ///   .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
263    /// let ctx = SessionContext::new();
264    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
265    /// let expr = df.parse_sql_expr(sql)?;
266    /// assert_eq!(expected, expr);
267    /// # Ok(())
268    /// # }
269    /// ```
270    pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
271        let df_schema = self.schema();
272
273        self.session_state.create_logical_expr(sql, df_schema)
274    }
275
276    /// Consume the DataFrame and produce a physical plan
277    pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
278        self.session_state.create_physical_plan(&self.plan).await
279    }
280
281    /// Filter the DataFrame by column. Returns a new DataFrame only containing the
282    /// specified columns.
283    ///
284    /// ```
285    /// # use datafusion::prelude::*;
286    /// # use datafusion::error::Result;
287    /// # use datafusion_common::assert_batches_sorted_eq;
288    /// # #[tokio::main]
289    /// # async fn main() -> Result<()> {
290    /// let ctx = SessionContext::new();
291    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
292    /// let df = df.select_columns(&["a", "b"])?;
293    /// let expected = vec![
294    ///     "+---+---+",
295    ///     "| a | b |",
296    ///     "+---+---+",
297    ///     "| 1 | 2 |",
298    ///     "+---+---+"
299    /// ];
300    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
305        let fields = columns
306            .iter()
307            .map(|name| {
308                self.plan
309                    .schema()
310                    .qualified_field_with_unqualified_name(name)
311            })
312            .collect::<Result<Vec<_>>>()?;
313        let expr: Vec<Expr> = fields
314            .into_iter()
315            .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
316            .collect();
317        self.select(expr)
318    }
319    /// Project arbitrary list of expression strings into a new `DataFrame`.
320    /// Method will parse string expressions into logical plan expressions.
321    ///
322    /// The output `DataFrame` has one column for each element in `exprs`.
323    ///
324    /// # Example
325    /// ```
326    /// # use datafusion::prelude::*;
327    /// # use datafusion::error::Result;
328    /// # #[tokio::main]
329    /// # async fn main() -> Result<()> {
330    /// let ctx = SessionContext::new();
331    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
332    /// let df : DataFrame = df.select_exprs(&["a * b", "c"])?;
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame> {
337        let expr_list = exprs
338            .iter()
339            .map(|e| self.parse_sql_expr(e))
340            .collect::<Result<Vec<_>>>()?;
341
342        self.select(expr_list)
343    }
344
345    /// Project arbitrary expressions (like SQL SELECT expressions) into a new
346    /// `DataFrame`.
347    ///
348    /// The output `DataFrame` has one column for each element in `expr_list`.
349    ///
350    /// # Example
351    /// ```
352    /// # use datafusion::prelude::*;
353    /// # use datafusion::error::Result;
354    /// # use datafusion_common::assert_batches_sorted_eq;
355    /// # #[tokio::main]
356    /// # async fn main() -> Result<()> {
357    /// let ctx = SessionContext::new();
358    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
359    /// let df = df.select(vec![col("a"), col("b") * col("c")])?;
360    /// let expected = vec![
361    ///     "+---+-----------------------+",
362    ///     "| a | ?table?.b * ?table?.c |",
363    ///     "+---+-----------------------+",
364    ///     "| 1 | 6                     |",
365    ///     "+---+-----------------------+"
366    /// ];
367    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub fn select(
372        self,
373        expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
374    ) -> Result<DataFrame> {
375        let expr_list: Vec<SelectExpr> =
376            expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
377
378        let expressions = expr_list
379            .iter()
380            .filter_map(|e| match e {
381                SelectExpr::Expression(expr) => Some(expr.clone()),
382                _ => None,
383            })
384            .collect::<Vec<_>>();
385
386        let window_func_exprs = find_window_exprs(&expressions);
387        let plan = if window_func_exprs.is_empty() {
388            self.plan
389        } else {
390            LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
391        };
392
393        let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
394
395        Ok(DataFrame {
396            session_state: self.session_state,
397            plan: project_plan,
398            projection_requires_validation: false,
399        })
400    }
401
402    /// Returns a new DataFrame containing all columns except the specified columns.
403    ///
404    /// ```
405    /// # use datafusion::prelude::*;
406    /// # use datafusion::error::Result;
407    /// # use datafusion_common::assert_batches_sorted_eq;
408    /// # #[tokio::main]
409    /// # async fn main() -> Result<()> {
410    /// let ctx = SessionContext::new();
411    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
412    /// // +----+----+----+
413    /// // | a  | b  | c  |
414    /// // +----+----+----+
415    /// // | 1  | 2  | 3  |
416    /// // +----+----+----+
417    /// let df = df.drop_columns(&["a"])?;
418    /// let expected = vec![
419    ///     "+---+---+",
420    ///     "| b | c |",
421    ///     "+---+---+",
422    ///     "| 2 | 3 |",
423    ///     "+---+---+"
424    /// ];
425    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
426    /// # Ok(())
427    /// # }
428    /// ```
429    pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
430        let fields_to_drop = columns
431            .iter()
432            .map(|name| {
433                self.plan
434                    .schema()
435                    .qualified_field_with_unqualified_name(name)
436            })
437            .filter(|r| r.is_ok())
438            .collect::<Result<Vec<_>>>()?;
439        let expr: Vec<Expr> = self
440            .plan
441            .schema()
442            .fields()
443            .into_iter()
444            .enumerate()
445            .map(|(idx, _)| self.plan.schema().qualified_field(idx))
446            .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
447            .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
448            .collect();
449        self.select(expr)
450    }
451
452    /// Expand multiple list/struct columns into a set of rows and new columns.
453    ///
454    /// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
455    ///
456    /// # Example
457    /// ```
458    /// # use datafusion::prelude::*;
459    /// # use datafusion::error::Result;
460    /// # use datafusion_common::assert_batches_sorted_eq;
461    /// # #[tokio::main]
462    /// # async fn main() -> Result<()> {
463    /// let ctx = SessionContext::new();
464    /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
465    /// // expand into multiple columns if it's json array, flatten field name if it's nested structure
466    /// let df = df.unnest_columns(&["b","c","d"])?;
467    /// let expected = vec![
468    ///     "+---+------+-------+-----+-----+",
469    ///     "| a | b    | c     | d.e | d.f |",
470    ///     "+---+------+-------+-----+-----+",
471    ///     "| 1 | 2.0  | false | 1   | 2   |",
472    ///     "| 1 | 1.3  | true  | 1   | 2   |",
473    ///     "| 1 | -6.1 |       | 1   | 2   |",
474    ///     "| 2 | 3.0  | false |     |     |",
475    ///     "| 2 | 2.3  | true  |     |     |",
476    ///     "| 2 | -7.1 |       |     |     |",
477    ///     "+---+------+-------+-----+-----+"
478    /// ];
479    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
480    /// # Ok(())
481    /// # }
482    /// ```
483    pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame> {
484        self.unnest_columns_with_options(columns, UnnestOptions::new())
485    }
486
487    /// Expand multiple list columns into a set of rows, with
488    /// behavior controlled by [`UnnestOptions`].
489    ///
490    /// Please see the documentation on [`UnnestOptions`] for more
491    /// details about the meaning of unnest.
492    pub fn unnest_columns_with_options(
493        self,
494        columns: &[&str],
495        options: UnnestOptions,
496    ) -> Result<DataFrame> {
497        let columns = columns.iter().map(|c| Column::from(*c)).collect();
498        let plan = LogicalPlanBuilder::from(self.plan)
499            .unnest_columns_with_options(columns, options)?
500            .build()?;
501        Ok(DataFrame {
502            session_state: self.session_state,
503            plan,
504            projection_requires_validation: true,
505        })
506    }
507
508    /// Return a DataFrame with only rows for which `predicate` evaluates to
509    /// `true`.
510    ///
511    /// Rows for which `predicate` evaluates to `false` or `null`
512    /// are filtered out.
513    ///
514    /// # Example
515    /// ```
516    /// # use datafusion::prelude::*;
517    /// # use datafusion::error::Result;
518    /// # use datafusion_common::assert_batches_sorted_eq;
519    /// # #[tokio::main]
520    /// # async fn main() -> Result<()> {
521    /// let ctx = SessionContext::new();
522    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
523    /// let df = df.filter(col("a").lt_eq(col("b")))?;
524    /// // all rows where a <= b are returned
525    /// let expected = vec![
526    ///     "+---+---+---+",
527    ///     "| a | b | c |",
528    ///     "+---+---+---+",
529    ///     "| 1 | 2 | 3 |",
530    ///     "| 4 | 5 | 6 |",
531    ///     "| 7 | 8 | 9 |",
532    ///     "+---+---+---+"
533    /// ];
534    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
535    /// # Ok(())
536    /// # }
537    /// ```
538    pub fn filter(self, predicate: Expr) -> Result<DataFrame> {
539        let plan = LogicalPlanBuilder::from(self.plan)
540            .filter(predicate)?
541            .build()?;
542        Ok(DataFrame {
543            session_state: self.session_state,
544            plan,
545            projection_requires_validation: true,
546        })
547    }
548
549    /// Return a new `DataFrame` that aggregates the rows of the current
550    /// `DataFrame`, first optionally grouping by the given expressions.
551    ///
552    /// # Example
553    /// ```
554    /// # use datafusion::prelude::*;
555    /// # use datafusion::error::Result;
556    /// # use datafusion::functions_aggregate::expr_fn::min;
557    /// # use datafusion_common::assert_batches_sorted_eq;
558    /// # #[tokio::main]
559    /// # async fn main() -> Result<()> {
560    /// let ctx = SessionContext::new();
561    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
562    ///
563    /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
564    /// let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
565    /// let expected1 = vec![
566    ///     "+---+----------------+",
567    ///     "| a | min(?table?.b) |",
568    ///     "+---+----------------+",
569    ///     "| 1 | 2              |",
570    ///     "| 4 | 5              |",
571    ///     "| 7 | 8              |",
572    ///     "+---+----------------+"
573    /// ];
574    /// assert_batches_sorted_eq!(expected1, &df1.collect().await?);
575    /// // The following use is the equivalent of "SELECT MIN(b)"
576    /// let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
577    /// let expected2 = vec![
578    ///     "+----------------+",
579    ///     "| min(?table?.b) |",
580    ///     "+----------------+",
581    ///     "| 2              |",
582    ///     "+----------------+"
583    /// ];
584    /// # assert_batches_sorted_eq!(expected2, &df2.collect().await?);
585    /// # Ok(())
586    /// # }
587    /// ```
588    pub fn aggregate(
589        self,
590        group_expr: Vec<Expr>,
591        aggr_expr: Vec<Expr>,
592    ) -> Result<DataFrame> {
593        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
594        let aggr_expr_len = aggr_expr.len();
595        let options =
596            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
597        let plan = LogicalPlanBuilder::from(self.plan)
598            .with_options(options)
599            .aggregate(group_expr, aggr_expr)?
600            .build()?;
601        let plan = if is_grouping_set {
602            let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len;
603            // For grouping sets we do a project to not expose the internal grouping id
604            let exprs = plan
605                .schema()
606                .columns()
607                .into_iter()
608                .enumerate()
609                .filter(|(idx, _)| *idx != grouping_id_pos)
610                .map(|(_, column)| Expr::Column(column))
611                .collect::<Vec<_>>();
612            LogicalPlanBuilder::from(plan).project(exprs)?.build()?
613        } else {
614            plan
615        };
616        Ok(DataFrame {
617            session_state: self.session_state,
618            plan,
619            projection_requires_validation: !is_grouping_set,
620        })
621    }
622
623    /// Return a new DataFrame that adds the result of evaluating one or more
624    /// window functions ([`Expr::WindowFunction`]) to the existing columns
625    pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
626        let plan = LogicalPlanBuilder::from(self.plan)
627            .window(window_exprs)?
628            .build()?;
629        Ok(DataFrame {
630            session_state: self.session_state,
631            plan,
632            projection_requires_validation: true,
633        })
634    }
635
636    /// Returns a new `DataFrame` with a limited number of rows.
637    ///
638    /// # Arguments
639    /// `skip` - Number of rows to skip before fetch any row
640    /// `fetch` - Maximum number of rows to return, after skipping `skip` rows.
641    ///
642    /// # Example
643    /// ```
644    /// # use datafusion::prelude::*;
645    /// # use datafusion::error::Result;
646    /// # use datafusion_common::assert_batches_sorted_eq;
647    /// # #[tokio::main]
648    /// # async fn main() -> Result<()> {
649    /// let ctx = SessionContext::new();
650    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
651    /// let df = df.limit(1, Some(2))?;
652    /// let expected = vec![
653    ///     "+---+---+---+",
654    ///     "| a | b | c |",
655    ///     "+---+---+---+",
656    ///     "| 4 | 5 | 6 |",
657    ///     "| 7 | 8 | 9 |",
658    ///     "+---+---+---+"
659    /// ];
660    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
661    /// # Ok(())
662    /// # }
663    /// ```
664    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame> {
665        let plan = LogicalPlanBuilder::from(self.plan)
666            .limit(skip, fetch)?
667            .build()?;
668        Ok(DataFrame {
669            session_state: self.session_state,
670            plan,
671            projection_requires_validation: self.projection_requires_validation,
672        })
673    }
674
675    /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
676    ///
677    /// The two [`DataFrame`]s must have exactly the same schema
678    ///
679    /// # Example
680    /// ```
681    /// # use datafusion::prelude::*;
682    /// # use datafusion::error::Result;
683    /// # use datafusion_common::assert_batches_sorted_eq;
684    /// # #[tokio::main]
685    /// # async fn main() -> Result<()> {
686    /// let ctx = SessionContext::new();
687    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?   ;
688    /// let d2 = df.clone();
689    /// let df = df.union(d2)?;
690    /// let expected = vec![
691    ///     "+---+---+---+",
692    ///     "| a | b | c |",
693    ///     "+---+---+---+",
694    ///     "| 1 | 2 | 3 |",
695    ///     "| 1 | 2 | 3 |",
696    ///     "+---+---+---+"
697    /// ];
698    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
699    /// # Ok(())
700    /// # }
701    /// ```
702    pub fn union(self, dataframe: DataFrame) -> Result<DataFrame> {
703        let plan = LogicalPlanBuilder::from(self.plan)
704            .union(dataframe.plan)?
705            .build()?;
706        Ok(DataFrame {
707            session_state: self.session_state,
708            plan,
709            projection_requires_validation: true,
710        })
711    }
712
713    /// Calculate the union of two [`DataFrame`]s using column names, preserving duplicate rows.
714    ///
715    /// The two [`DataFrame`]s are combined using column names rather than position,
716    /// filling missing columns with null.
717    ///
718    ///
719    /// # Example
720    /// ```
721    /// # use datafusion::prelude::*;
722    /// # use datafusion::error::Result;
723    /// # use datafusion_common::assert_batches_sorted_eq;
724    /// # #[tokio::main]
725    /// # async fn main() -> Result<()> {
726    /// let ctx = SessionContext::new();
727    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
728    /// let d2 = df.clone().select_columns(&["b", "c", "a"])?.with_column("d", lit("77"))?;
729    /// let df = df.union_by_name(d2)?;
730    /// let expected = vec![
731    ///     "+---+---+---+----+",
732    ///     "| a | b | c | d  |",
733    ///     "+---+---+---+----+",
734    ///     "| 1 | 2 | 3 |    |",
735    ///     "| 1 | 2 | 3 | 77 |",
736    ///     "+---+---+---+----+"
737    /// ];
738    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
739    /// # Ok(())
740    /// # }
741    /// ```
742    pub fn union_by_name(self, dataframe: DataFrame) -> Result<DataFrame> {
743        let plan = LogicalPlanBuilder::from(self.plan)
744            .union_by_name(dataframe.plan)?
745            .build()?;
746        Ok(DataFrame {
747            session_state: self.session_state,
748            plan,
749            projection_requires_validation: true,
750        })
751    }
752
753    /// Calculate the distinct union of two [`DataFrame`]s.
754    ///
755    /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate
756    /// rows are discarded.
757    ///
758    /// # Example
759    /// ```
760    /// # use datafusion::prelude::*;
761    /// # use datafusion::error::Result;
762    /// # use datafusion_common::assert_batches_sorted_eq;
763    /// # #[tokio::main]
764    /// # async fn main() -> Result<()> {
765    /// let ctx = SessionContext::new();
766    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
767    /// let d2 = df.clone();
768    /// let df = df.union_distinct(d2)?;
769    /// // df2 are duplicate of df
770    /// let expected = vec![
771    ///     "+---+---+---+",
772    ///     "| a | b | c |",
773    ///     "+---+---+---+",
774    ///     "| 1 | 2 | 3 |",
775    ///     "+---+---+---+"
776    /// ];
777    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
778    /// # Ok(())
779    /// # }
780    /// ```
781    pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
782        let plan = LogicalPlanBuilder::from(self.plan)
783            .union_distinct(dataframe.plan)?
784            .build()?;
785        Ok(DataFrame {
786            session_state: self.session_state,
787            plan,
788            projection_requires_validation: true,
789        })
790    }
791
792    /// Calculate the union of two [`DataFrame`]s using column names with all duplicated rows removed.
793    ///
794    /// The two [`DataFrame`]s are combined using column names rather than position,
795    /// filling missing columns with null.
796    ///
797    ///
798    /// # Example
799    /// ```
800    /// # use datafusion::prelude::*;
801    /// # use datafusion::error::Result;
802    /// # use datafusion_common::assert_batches_sorted_eq;
803    /// # #[tokio::main]
804    /// # async fn main() -> Result<()> {
805    /// let ctx = SessionContext::new();
806    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
807    /// let d2 = df.clone().select_columns(&["b", "c", "a"])?;
808    /// let df = df.union_by_name_distinct(d2)?;
809    /// let expected = vec![
810    ///     "+---+---+---+",
811    ///     "| a | b | c |",
812    ///     "+---+---+---+",
813    ///     "| 1 | 2 | 3 |",
814    ///     "+---+---+---+"
815    /// ];
816    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
817    /// # Ok(())
818    /// # }
819    /// ```
820    pub fn union_by_name_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
821        let plan = LogicalPlanBuilder::from(self.plan)
822            .union_by_name_distinct(dataframe.plan)?
823            .build()?;
824        Ok(DataFrame {
825            session_state: self.session_state,
826            plan,
827            projection_requires_validation: true,
828        })
829    }
830
831    /// Return a new `DataFrame` with all duplicated rows removed.
832    ///
833    /// # Example
834    /// ```
835    /// # use datafusion::prelude::*;
836    /// # use datafusion::error::Result;
837    /// # use datafusion_common::assert_batches_sorted_eq;
838    /// # #[tokio::main]
839    /// # async fn main() -> Result<()> {
840    /// let ctx = SessionContext::new();
841    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
842    /// let df = df.distinct()?;
843    /// let expected = vec![
844    ///     "+---+---+---+",
845    ///     "| a | b | c |",
846    ///     "+---+---+---+",
847    ///     "| 1 | 2 | 3 |",
848    ///     "+---+---+---+"
849    /// ];
850    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
851    /// # Ok(())
852    /// # }
853    /// ```
854    pub fn distinct(self) -> Result<DataFrame> {
855        let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
856        Ok(DataFrame {
857            session_state: self.session_state,
858            plan,
859            projection_requires_validation: true,
860        })
861    }
862
863    /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
864    /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
865    /// expressions.
866    ///
867    /// # Example
868    /// ```
869    /// # use datafusion::prelude::*;
870    /// # use datafusion::error::Result;
871    /// # use datafusion_common::assert_batches_sorted_eq;
872    /// # #[tokio::main]
873    /// # async fn main() -> Result<()> {
874    /// let ctx = SessionContext::new();
875    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
876    ///   // Return a single row (a, b) for each distinct value of a
877    ///   .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
878    /// let expected = vec![
879    ///     "+---+---+",
880    ///     "| a | b |",
881    ///     "+---+---+",
882    ///     "| 1 | 2 |",
883    ///     "+---+---+"
884    /// ];
885    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
886    /// # Ok(())
887    /// # }
888    /// ```
889    pub fn distinct_on(
890        self,
891        on_expr: Vec<Expr>,
892        select_expr: Vec<Expr>,
893        sort_expr: Option<Vec<SortExpr>>,
894    ) -> Result<DataFrame> {
895        let plan = LogicalPlanBuilder::from(self.plan)
896            .distinct_on(on_expr, select_expr, sort_expr)?
897            .build()?;
898        Ok(DataFrame {
899            session_state: self.session_state,
900            plan,
901            projection_requires_validation: true,
902        })
903    }
904
905    /// Return a new `DataFrame` that has statistics for a DataFrame.
906    ///
907    /// Only summarizes numeric datatypes at the moment and returns nulls for
908    /// non numeric datatypes. The output format is modeled after pandas
909    ///
910    /// # Example
911    /// ```
912    /// # use datafusion::prelude::*;
913    /// # use datafusion::error::Result;
914    /// # use arrow::util::pretty;
915    /// # use datafusion_common::assert_batches_sorted_eq;
916    /// # #[tokio::main]
917    /// # async fn main() -> Result<()> {
918    /// let ctx = SessionContext::new();
919    /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
920    /// let stat = df.describe().await?;
921    /// # // some output column are ignored
922    /// let expected = vec![
923    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
924    ///     "| describe   | c_custkey          | c_name             | c_address                          | c_nationkey        | c_phone         | c_acctbal          | c_mktsegment | c_comment                                                                                                |",
925    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
926    ///     "| count      | 9.0                | 9                  | 9                                  | 9.0                | 9               | 9.0                | 9            | 9                                                                                                        |",
927    ///     "| max        | 10.0               | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS         | 20.0               | 30-114-968-4951 | 9561.95            | MACHINERY    | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
928    ///     "| mean       | 6.0                | null               | null                               | 9.88888888888889   | null            | 5153.2155555555555 | null         | null                                                                                                     |",
929    ///     "| median     | 6.0                | null               | null                               | 8.0                | null            | 6819.74            | null         | null                                                                                                     |",
930    ///     "| min        | 2.0                | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0                | 11-719-748-3364 | 121.65             | AUTOMOBILE   |  deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov   |",
931    ///     "| null_count | 0.0                | 0                  | 0                                  | 0.0                | 0               | 0.0                | 0            | 0                                                                                                        |",
932    ///     "| std        | 2.7386127875258306 | null               | null                               | 7.2188026092359046 | null            | 3522.169804254585  | null         | null                                                                                                     |",
933    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
934    /// assert_batches_sorted_eq!(expected, &stat.collect().await?);
935    /// # Ok(())
936    /// # }
937    /// ```
938    pub async fn describe(self) -> Result<Self> {
939        //the functions now supported
940        let supported_describe_functions =
941            vec!["count", "null_count", "mean", "std", "min", "max", "median"];
942
943        let original_schema_fields = self.schema().fields().iter();
944
945        //define describe column
946        let mut describe_schemas = vec![Field::new("describe", DataType::Utf8, false)];
947        describe_schemas.extend(original_schema_fields.clone().map(|field| {
948            if field.data_type().is_numeric() {
949                Field::new(field.name(), DataType::Float64, true)
950            } else {
951                Field::new(field.name(), DataType::Utf8, true)
952            }
953        }));
954
955        //collect recordBatch
956        let describe_record_batch = vec![
957            // count aggregation
958            self.clone().aggregate(
959                vec![],
960                original_schema_fields
961                    .clone()
962                    .map(|f| count(ident(f.name())).alias(f.name()))
963                    .collect::<Vec<_>>(),
964            ),
965            // null_count aggregation
966            self.clone().aggregate(
967                vec![],
968                original_schema_fields
969                    .clone()
970                    .map(|f| {
971                        sum(case(is_null(ident(f.name())))
972                            .when(lit(true), lit(1))
973                            .otherwise(lit(0))
974                            .unwrap())
975                        .alias(f.name())
976                    })
977                    .collect::<Vec<_>>(),
978            ),
979            // mean aggregation
980            self.clone().aggregate(
981                vec![],
982                original_schema_fields
983                    .clone()
984                    .filter(|f| f.data_type().is_numeric())
985                    .map(|f| avg(ident(f.name())).alias(f.name()))
986                    .collect::<Vec<_>>(),
987            ),
988            // std aggregation
989            self.clone().aggregate(
990                vec![],
991                original_schema_fields
992                    .clone()
993                    .filter(|f| f.data_type().is_numeric())
994                    .map(|f| stddev(ident(f.name())).alias(f.name()))
995                    .collect::<Vec<_>>(),
996            ),
997            // min aggregation
998            self.clone().aggregate(
999                vec![],
1000                original_schema_fields
1001                    .clone()
1002                    .filter(|f| {
1003                        !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1004                    })
1005                    .map(|f| min(ident(f.name())).alias(f.name()))
1006                    .collect::<Vec<_>>(),
1007            ),
1008            // max aggregation
1009            self.clone().aggregate(
1010                vec![],
1011                original_schema_fields
1012                    .clone()
1013                    .filter(|f| {
1014                        !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1015                    })
1016                    .map(|f| max(ident(f.name())).alias(f.name()))
1017                    .collect::<Vec<_>>(),
1018            ),
1019            // median aggregation
1020            self.clone().aggregate(
1021                vec![],
1022                original_schema_fields
1023                    .clone()
1024                    .filter(|f| f.data_type().is_numeric())
1025                    .map(|f| median(ident(f.name())).alias(f.name()))
1026                    .collect::<Vec<_>>(),
1027            ),
1028        ];
1029
1030        // first column with function names
1031        let mut array_ref_vec: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
1032            supported_describe_functions.clone(),
1033        ))];
1034        for field in original_schema_fields {
1035            let mut array_datas = vec![];
1036            for result in describe_record_batch.iter() {
1037                let array_ref = match result {
1038                    Ok(df) => {
1039                        let batches = df.clone().collect().await;
1040                        match batches {
1041                            Ok(batches)
1042                                if batches.len() == 1
1043                                    && batches[0]
1044                                        .column_by_name(field.name())
1045                                        .is_some() =>
1046                            {
1047                                let column =
1048                                    batches[0].column_by_name(field.name()).unwrap();
1049
1050                                if column.data_type().is_null() {
1051                                    Arc::new(StringArray::from(vec!["null"]))
1052                                } else if field.data_type().is_numeric() {
1053                                    cast(column, &DataType::Float64)?
1054                                } else {
1055                                    cast(column, &DataType::Utf8)?
1056                                }
1057                            }
1058                            _ => Arc::new(StringArray::from(vec!["null"])),
1059                        }
1060                    }
1061                    //Handling error when only boolean/binary column, and in other cases
1062                    Err(err)
1063                        if err.to_string().contains(
1064                            "Error during planning: \
1065                                            Aggregate requires at least one grouping \
1066                                            or aggregate expression",
1067                        ) =>
1068                    {
1069                        Arc::new(StringArray::from(vec!["null"]))
1070                    }
1071                    Err(e) => return exec_err!("{}", e),
1072                };
1073                array_datas.push(array_ref);
1074            }
1075            array_ref_vec.push(concat(
1076                array_datas
1077                    .iter()
1078                    .map(|af| af.as_ref())
1079                    .collect::<Vec<_>>()
1080                    .as_slice(),
1081            )?);
1082        }
1083
1084        let describe_record_batch =
1085            RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?;
1086
1087        let provider = MemTable::try_new(
1088            describe_record_batch.schema(),
1089            vec![vec![describe_record_batch]],
1090        )?;
1091
1092        let plan = LogicalPlanBuilder::scan(
1093            UNNAMED_TABLE,
1094            provider_as_source(Arc::new(provider)),
1095            None,
1096        )?
1097        .build()?;
1098
1099        Ok(DataFrame {
1100            session_state: self.session_state,
1101            plan,
1102            projection_requires_validation: self.projection_requires_validation,
1103        })
1104    }
1105
1106    /// Apply a sort by provided expressions with default direction
1107    pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
1108        self.sort(
1109            expr.into_iter()
1110                .map(|e| e.sort(true, false))
1111                .collect::<Vec<SortExpr>>(),
1112        )
1113    }
1114
1115    /// Sort the DataFrame by the specified sorting expressions.
1116    ///
1117    /// Note that any expression can be turned into
1118    /// a sort expression by calling its [sort](Expr::sort) method.
1119    ///
1120    /// # Example
1121    ///
1122    /// ```
1123    /// # use datafusion::prelude::*;
1124    /// # use datafusion::error::Result;
1125    /// # use datafusion_common::assert_batches_sorted_eq;
1126    /// # #[tokio::main]
1127    /// # async fn main() -> Result<()> {
1128    /// let ctx = SessionContext::new();
1129    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1130    /// let df = df.sort(vec![
1131    ///   col("a").sort(false, true),   // a DESC, nulls first
1132    ///   col("b").sort(true, false), // b ASC, nulls last
1133    ///  ])?;
1134    /// let expected = vec![
1135    ///     "+---+---+---+",
1136    ///     "| a | b | c |",
1137    ///     "+---+---+---+",
1138    ///     "| 1 | 2 | 3 |",
1139    ///     "| 4 | 5 | 6 |",
1140    ///     "| 7 | 8 | 9 |",
1141    ///     "+---+---+---+",
1142    /// ];
1143    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1144    /// # Ok(())
1145    /// # }
1146    /// ```
1147    pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
1148        let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
1149        Ok(DataFrame {
1150            session_state: self.session_state,
1151            plan,
1152            projection_requires_validation: self.projection_requires_validation,
1153        })
1154    }
1155
1156    /// Join this `DataFrame` with another `DataFrame` using explicitly specified
1157    /// columns and an optional filter expression.
1158    ///
1159    /// See [`join_on`](Self::join_on) for a more concise way to specify the
1160    /// join condition. Since DataFusion will automatically identify and
1161    /// optimize equality predicates there is no performance difference between
1162    /// this function and `join_on`
1163    ///
1164    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
1165    /// example below), which are then combined with the optional `filter`
1166    /// expression. If `left_cols` and `right_cols` contain ambiguous column
1167    /// references, they will be disambiguated by prioritizing the left relation
1168    /// for `left_cols` and the right relation for `right_cols`.
1169    ///
1170    /// Note that in case of outer join, the `filter` is applied to only matched rows.
1171    ///
1172    /// # Example
1173    /// ```
1174    /// # use datafusion::prelude::*;
1175    /// # use datafusion::error::Result;
1176    /// # use datafusion_common::assert_batches_sorted_eq;
1177    /// # #[tokio::main]
1178    /// # async fn main() -> Result<()> {
1179    /// let ctx = SessionContext::new();
1180    /// let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1181    /// let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1182    ///   .select(vec![
1183    ///     col("a").alias("a2"),
1184    ///     col("b").alias("b2"),
1185    ///     col("c").alias("c2")])?;
1186    /// // Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
1187    /// // finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
1188    /// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
1189    /// let expected = vec![
1190    ///     "+---+---+---+----+----+----+",
1191    ///     "| a | b | c | a2 | b2 | c2 |",
1192    ///     "+---+---+---+----+----+----+",
1193    ///     "| 1 | 2 | 3 | 1  | 2  | 3  |",
1194    ///     "+---+---+---+----+----+----+"
1195    /// ];
1196    /// assert_batches_sorted_eq!(expected, &join.collect().await?);
1197    /// # Ok(())
1198    /// # }
1199    /// ```
1200    ///
1201    pub fn join(
1202        self,
1203        right: DataFrame,
1204        join_type: JoinType,
1205        left_cols: &[&str],
1206        right_cols: &[&str],
1207        filter: Option<Expr>,
1208    ) -> Result<DataFrame> {
1209        let plan = LogicalPlanBuilder::from(self.plan)
1210            .join(
1211                right.plan,
1212                join_type,
1213                (left_cols.to_vec(), right_cols.to_vec()),
1214                filter,
1215            )?
1216            .build()?;
1217        Ok(DataFrame {
1218            session_state: self.session_state,
1219            plan,
1220            projection_requires_validation: true,
1221        })
1222    }
1223
1224    /// Join this `DataFrame` with another `DataFrame` using the specified
1225    /// expressions.
1226    ///
1227    /// Note that DataFusion automatically optimizes joins, including
1228    /// identifying and optimizing equality predicates.
1229    ///
1230    /// # Example
1231    /// ```
1232    /// # use datafusion::prelude::*;
1233    /// # use datafusion::error::Result;
1234    /// # use datafusion_common::assert_batches_sorted_eq;
1235    /// # #[tokio::main]
1236    /// # async fn main() -> Result<()> {
1237    /// let ctx = SessionContext::new();
1238    /// let left = ctx
1239    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1240    ///     .await?;
1241    /// let right = ctx
1242    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1243    ///     .await?
1244    ///     .select(vec![
1245    ///         col("a").alias("a2"),
1246    ///         col("b").alias("b2"),
1247    ///         col("c").alias("c2"),
1248    ///     ])?;
1249    ///
1250    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
1251    /// // finding all pairs of rows from `left` and `right` where
1252    /// // where `a != a2` and `b != b2`.
1253    /// let join_on = left.join_on(
1254    ///     right,
1255    ///     JoinType::Inner,
1256    ///     [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
1257    /// )?;
1258    /// let expected = vec![
1259    ///     "+---+---+---+----+----+----+",
1260    ///     "| a | b | c | a2 | b2 | c2 |",
1261    ///     "+---+---+---+----+----+----+",
1262    ///     "+---+---+---+----+----+----+"
1263    /// ];
1264    /// # assert_batches_sorted_eq!(expected, &join_on.collect().await?);
1265    /// # Ok(())
1266    /// # }
1267    /// ```
1268    pub fn join_on(
1269        self,
1270        right: DataFrame,
1271        join_type: JoinType,
1272        on_exprs: impl IntoIterator<Item = Expr>,
1273    ) -> Result<DataFrame> {
1274        let plan = LogicalPlanBuilder::from(self.plan)
1275            .join_on(right.plan, join_type, on_exprs)?
1276            .build()?;
1277        Ok(DataFrame {
1278            session_state: self.session_state,
1279            plan,
1280            projection_requires_validation: true,
1281        })
1282    }
1283
1284    /// Repartition a DataFrame based on a logical partitioning scheme.
1285    ///
1286    /// # Example
1287    /// ```
1288    /// # use datafusion::prelude::*;
1289    /// # use datafusion::error::Result;
1290    /// # use datafusion_common::assert_batches_sorted_eq;
1291    /// # #[tokio::main]
1292    /// # async fn main() -> Result<()> {
1293    /// let ctx = SessionContext::new();
1294    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1295    /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
1296    /// let expected = vec![
1297    ///     "+---+---+---+",
1298    ///     "| a | b | c |",
1299    ///     "+---+---+---+",
1300    ///     "| 1 | 2 | 3 |",
1301    ///     "| 4 | 5 | 6 |",
1302    ///     "| 7 | 8 | 9 |",
1303    ///     "+---+---+---+"
1304    /// ];
1305    /// # assert_batches_sorted_eq!(expected, &df1.collect().await?);
1306    /// # Ok(())
1307    /// # }
1308    /// ```
1309    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame> {
1310        let plan = LogicalPlanBuilder::from(self.plan)
1311            .repartition(partitioning_scheme)?
1312            .build()?;
1313        Ok(DataFrame {
1314            session_state: self.session_state,
1315            plan,
1316            projection_requires_validation: true,
1317        })
1318    }
1319
1320    /// Return the total number of rows in this `DataFrame`.
1321    ///
1322    /// Note that this method will actually run a plan to calculate the count,
1323    /// which may be slow for large or complicated DataFrames.
1324    ///
1325    /// # Example
1326    /// ```
1327    /// # use datafusion::prelude::*;
1328    /// # use datafusion::error::Result;
1329    /// # #[tokio::main]
1330    /// # async fn main() -> Result<()> {
1331    /// let ctx = SessionContext::new();
1332    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1333    /// let count = df.count().await?; // 1
1334    /// # assert_eq!(count, 1);
1335    /// # Ok(())
1336    /// # }
1337    /// ```
1338    pub async fn count(self) -> Result<usize> {
1339        let rows = self
1340            .aggregate(
1341                vec![],
1342                vec![count(Expr::Literal(COUNT_STAR_EXPANSION, None))],
1343            )?
1344            .collect()
1345            .await?;
1346        let len = *rows
1347            .first()
1348            .and_then(|r| r.columns().first())
1349            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1350            .and_then(|a| a.values().first())
1351            .ok_or(DataFusionError::Internal(
1352                "Unexpected output when collecting for count()".to_string(),
1353            ))? as usize;
1354        Ok(len)
1355    }
1356
1357    /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es  into memory.
1358    ///
1359    /// Prior to calling `collect`, modifying a DataFrame simply updates a plan
1360    /// (no actual computation is performed). `collect` triggers the computation.
1361    ///
1362    /// See [`Self::execute_stream`] to execute a DataFrame without buffering.
1363    ///
1364    /// # Example
1365    /// ```
1366    /// # use datafusion::prelude::*;
1367    /// # use datafusion::error::Result;
1368    /// # #[tokio::main]
1369    /// # async fn main() -> Result<()> {
1370    /// let ctx = SessionContext::new();
1371    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1372    /// let batches = df.collect().await?;
1373    /// # Ok(())
1374    /// # }
1375    /// ```
1376    pub async fn collect(self) -> Result<Vec<RecordBatch>> {
1377        let task_ctx = Arc::new(self.task_ctx());
1378        let plan = self.create_physical_plan().await?;
1379        collect(plan, task_ctx).await
1380    }
1381
1382    /// Execute the `DataFrame` and print the results to the console.
1383    ///
1384    /// # Example
1385    /// ```
1386    /// # use datafusion::prelude::*;
1387    /// # use datafusion::error::Result;
1388    /// # #[tokio::main]
1389    /// # async fn main() -> Result<()> {
1390    /// let ctx = SessionContext::new();
1391    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1392    /// df.show().await?;
1393    /// # Ok(())
1394    /// # }
1395    /// ```
1396    pub async fn show(self) -> Result<()> {
1397        println!("{}", self.to_string().await?);
1398        Ok(())
1399    }
1400
1401    /// Execute the `DataFrame` and return a string representation of the results.
1402    ///
1403    /// # Example
1404    /// ```
1405    /// # use datafusion::prelude::*;
1406    /// # use datafusion::error::Result;
1407    /// # use datafusion::execution::SessionStateBuilder;
1408    ///
1409    /// # #[tokio::main]
1410    /// # async fn main() -> Result<()> {
1411    /// let cfg = SessionConfig::new()
1412    ///     .set_str("datafusion.format.null", "no-value");
1413    /// let session_state = SessionStateBuilder::new()
1414    ///     .with_config(cfg)
1415    ///     .with_default_features()
1416    ///     .build();
1417    /// let ctx = SessionContext::new_with_state(session_state);
1418    /// let df = ctx.sql("select null as 'null-column'").await?;
1419    /// let result = df.to_string().await?;
1420    /// assert_eq!(result,
1421    /// "+-------------+
1422    /// | null-column |
1423    /// +-------------+
1424    /// | no-value    |
1425    /// +-------------+"
1426    /// );
1427    /// # Ok(())
1428    /// # }
1429    pub async fn to_string(self) -> Result<String> {
1430        let options = self.session_state.config().options().format.clone();
1431        let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
1432
1433        let results = self.collect().await?;
1434        Ok(
1435            pretty::pretty_format_batches_with_options(&results, &arrow_options)?
1436                .to_string(),
1437        )
1438    }
1439
1440    /// Execute the `DataFrame` and print only the first `num` rows of the
1441    /// result to the console.
1442    ///
1443    /// # Example
1444    /// ```
1445    /// # use datafusion::prelude::*;
1446    /// # use datafusion::error::Result;
1447    /// # #[tokio::main]
1448    /// # async fn main() -> Result<()> {
1449    /// let ctx = SessionContext::new();
1450    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1451    /// df.show_limit(10).await?;
1452    /// # Ok(())
1453    /// # }
1454    /// ```
1455    pub async fn show_limit(self, num: usize) -> Result<()> {
1456        let results = self.limit(0, Some(num))?.collect().await?;
1457        Ok(pretty::print_batches(&results)?)
1458    }
1459
1460    /// Return a new [`TaskContext`] which would be used to execute this DataFrame
1461    pub fn task_ctx(&self) -> TaskContext {
1462        TaskContext::from(self.session_state.as_ref())
1463    }
1464
1465    /// Executes this DataFrame and returns a stream over a single partition
1466    ///
1467    /// See [Self::collect] to buffer the `RecordBatch`es in memory.
1468    ///
1469    /// # Example
1470    /// ```
1471    /// # use datafusion::prelude::*;
1472    /// # use datafusion::error::Result;
1473    /// # #[tokio::main]
1474    /// # async fn main() -> Result<()> {
1475    /// let ctx = SessionContext::new();
1476    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1477    /// let stream = df.execute_stream().await?;
1478    /// # Ok(())
1479    /// # }
1480    /// ```
1481    ///
1482    /// # Aborting Execution
1483    ///
1484    /// Dropping the stream will abort the execution of the query, and free up
1485    /// any allocated resources
1486    pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
1487        let task_ctx = Arc::new(self.task_ctx());
1488        let plan = self.create_physical_plan().await?;
1489        execute_stream(plan, task_ctx)
1490    }
1491
1492    /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
1493    /// maintaining the input partitioning.
1494    ///
1495    /// # Example
1496    /// ```
1497    /// # use datafusion::prelude::*;
1498    /// # use datafusion::error::Result;
1499    /// # #[tokio::main]
1500    /// # async fn main() -> Result<()> {
1501    /// let ctx = SessionContext::new();
1502    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1503    /// let batches = df.collect_partitioned().await?;
1504    /// # Ok(())
1505    /// # }
1506    /// ```
1507    pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>> {
1508        let task_ctx = Arc::new(self.task_ctx());
1509        let plan = self.create_physical_plan().await?;
1510        collect_partitioned(plan, task_ctx).await
1511    }
1512
1513    /// Executes this DataFrame and returns one stream per partition.
1514    ///
1515    /// # Example
1516    /// ```
1517    /// # use datafusion::prelude::*;
1518    /// # use datafusion::error::Result;
1519    /// # #[tokio::main]
1520    /// # async fn main() -> Result<()> {
1521    /// let ctx = SessionContext::new();
1522    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1523    /// let batches = df.execute_stream_partitioned().await?;
1524    /// # Ok(())
1525    /// # }
1526    /// ```
1527    /// # Aborting Execution
1528    ///
1529    /// Dropping the stream will abort the execution of the query, and free up
1530    /// any allocated resources
1531    pub async fn execute_stream_partitioned(
1532        self,
1533    ) -> Result<Vec<SendableRecordBatchStream>> {
1534        let task_ctx = Arc::new(self.task_ctx());
1535        let plan = self.create_physical_plan().await?;
1536        execute_stream_partitioned(plan, task_ctx)
1537    }
1538
1539    /// Returns the `DFSchema` describing the output of this DataFrame.
1540    ///
1541    /// The output `DFSchema` contains information on the name, data type, and
1542    /// nullability for each column.
1543    ///
1544    /// # Example
1545    /// ```
1546    /// # use datafusion::prelude::*;
1547    /// # use datafusion::error::Result;
1548    /// # #[tokio::main]
1549    /// # async fn main() -> Result<()> {
1550    /// let ctx = SessionContext::new();
1551    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1552    /// let schema = df.schema();
1553    /// # Ok(())
1554    /// # }
1555    /// ```
1556    pub fn schema(&self) -> &DFSchema {
1557        self.plan.schema()
1558    }
1559
1560    /// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1561    /// this DataFrame.
1562    ///
1563    /// See [`Self::into_unoptimized_plan`] for more details.
1564    pub fn logical_plan(&self) -> &LogicalPlan {
1565        &self.plan
1566    }
1567
1568    /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
1569    pub fn into_parts(self) -> (SessionState, LogicalPlan) {
1570        (*self.session_state, self.plan)
1571    }
1572
1573    /// Return the [`LogicalPlan`] represented by this DataFrame without running
1574    /// any optimizers
1575    ///
1576    /// Note: This method should not be used outside testing, as it loses the
1577    /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
1578    /// consequently subsequent operations may take place against a different
1579    /// state (e.g. a different value of `now()`)
1580    ///
1581    /// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1582    /// corresponding [`SessionState`].
1583    pub fn into_unoptimized_plan(self) -> LogicalPlan {
1584        self.plan
1585    }
1586
1587    /// Return the optimized [`LogicalPlan`] represented by this DataFrame.
1588    ///
1589    /// Note: This method should not be used outside testing -- see
1590    /// [`Self::into_unoptimized_plan`] for more details.
1591    pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
1592        // Optimize the plan first for better UX
1593        self.session_state.optimize(&self.plan)
1594    }
1595
1596    /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
1597    /// as a table view using [`SessionContext::register_table`].
1598    ///
1599    /// Note: This discards the [`SessionState`] associated with this
1600    /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
1601    pub fn into_view(self) -> Arc<dyn TableProvider> {
1602        Arc::new(DataFrameTableProvider { plan: self.plan })
1603    }
1604
1605    /// Return a DataFrame with the explanation of its plan so far.
1606    ///
1607    /// if `analyze` is specified, runs the plan and reports metrics
1608    ///
1609    /// ```
1610    /// # use datafusion::prelude::*;
1611    /// # use datafusion::error::Result;
1612    /// # #[tokio::main]
1613    /// # async fn main() -> Result<()> {
1614    /// let ctx = SessionContext::new();
1615    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1616    /// let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
1617    /// # Ok(())
1618    /// # }
1619    /// ```
1620    pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
1621        if matches!(self.plan, LogicalPlan::Explain(_)) {
1622            return plan_err!("Nested EXPLAINs are not supported");
1623        }
1624        let plan = LogicalPlanBuilder::from(self.plan)
1625            .explain(verbose, analyze)?
1626            .build()?;
1627        Ok(DataFrame {
1628            session_state: self.session_state,
1629            plan,
1630            projection_requires_validation: self.projection_requires_validation,
1631        })
1632    }
1633
1634    /// Return a `FunctionRegistry` used to plan udf's calls
1635    ///
1636    /// # Example
1637    /// ```
1638    /// # use datafusion::prelude::*;
1639    /// # use datafusion::error::Result;
1640    /// # #[tokio::main]
1641    /// # async fn main() -> Result<()> {
1642    /// let ctx = SessionContext::new();
1643    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1644    /// let f = df.registry();
1645    /// // use f.udf("name", vec![...]) to use the udf
1646    /// # Ok(())
1647    /// # }
1648    /// ```
1649    pub fn registry(&self) -> &dyn FunctionRegistry {
1650        self.session_state.as_ref()
1651    }
1652
1653    /// Calculate the intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1654    ///
1655    /// ```
1656    /// # use datafusion::prelude::*;
1657    /// # use datafusion::error::Result;
1658    /// # use datafusion_common::assert_batches_sorted_eq;
1659    /// # #[tokio::main]
1660    /// # async fn main() -> Result<()> {
1661    /// let ctx = SessionContext::new();
1662    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1663    /// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1664    /// let df = df.intersect(d2)?;
1665    /// let expected = vec![
1666    ///     "+---+---+---+",
1667    ///     "| a | b | c |",
1668    ///     "+---+---+---+",
1669    ///     "| 1 | 2 | 3 |",
1670    ///     "+---+---+---+"
1671    /// ];
1672    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1673    /// # Ok(())
1674    /// # }
1675    /// ```
1676    pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
1677        let left_plan = self.plan;
1678        let right_plan = dataframe.plan;
1679        let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
1680        Ok(DataFrame {
1681            session_state: self.session_state,
1682            plan,
1683            projection_requires_validation: true,
1684        })
1685    }
1686
1687    /// Calculate the exception of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1688    ///
1689    /// ```
1690    /// # use datafusion::prelude::*;
1691    /// # use datafusion::error::Result;
1692    /// # use datafusion_common::assert_batches_sorted_eq;
1693    /// # #[tokio::main]
1694    /// # async fn main() -> Result<()> {
1695    /// let ctx = SessionContext::new();
1696    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1697    /// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1698    /// let result = df.except(d2)?;
1699    /// // those columns are not in example.csv, but in example_long.csv
1700    /// let expected = vec![
1701    ///     "+---+---+---+",
1702    ///     "| a | b | c |",
1703    ///     "+---+---+---+",
1704    ///     "| 4 | 5 | 6 |",
1705    ///     "| 7 | 8 | 9 |",
1706    ///     "+---+---+---+"
1707    /// ];
1708    /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1709    /// # Ok(())
1710    /// # }
1711    /// ```
1712    pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
1713        let left_plan = self.plan;
1714        let right_plan = dataframe.plan;
1715        let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
1716        Ok(DataFrame {
1717            session_state: self.session_state,
1718            plan,
1719            projection_requires_validation: true,
1720        })
1721    }
1722
1723    /// Execute this `DataFrame` and write the results to `table_name`.
1724    ///
1725    /// Returns a single [RecordBatch] containing a single column and
1726    /// row representing the count of total rows written.
1727    ///
1728    /// Unlike most other `DataFrame` methods, this method executes eagerly.
1729    /// Data is written to the table using the [`TableProvider::insert_into`]
1730    /// method. This is the same underlying implementation used by SQL `INSERT
1731    /// INTO` statements.
1732    pub async fn write_table(
1733        self,
1734        table_name: &str,
1735        write_options: DataFrameWriteOptions,
1736    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1737        let plan = if write_options.sort_by.is_empty() {
1738            self.plan
1739        } else {
1740            LogicalPlanBuilder::from(self.plan)
1741                .sort(write_options.sort_by)?
1742                .build()?
1743        };
1744
1745        let table_ref: TableReference = table_name.into();
1746        let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1747        let target = match table_schema.table(table_ref.table()).await? {
1748            Some(ref provider) => Ok(Arc::clone(provider)),
1749            _ => plan_err!("No table named '{table_name}'"),
1750        }?;
1751
1752        let target = Arc::new(DefaultTableSource::new(target));
1753
1754        let plan = LogicalPlanBuilder::insert_into(
1755            plan,
1756            table_ref,
1757            target,
1758            write_options.insert_op,
1759        )?
1760        .build()?;
1761
1762        DataFrame {
1763            session_state: self.session_state,
1764            plan,
1765            projection_requires_validation: self.projection_requires_validation,
1766        }
1767        .collect()
1768        .await
1769    }
1770
1771    /// Execute the `DataFrame` and write the results to CSV file(s).
1772    ///
1773    /// # Example
1774    /// ```
1775    /// # use datafusion::prelude::*;
1776    /// # use datafusion::error::Result;
1777    /// # use std::fs;
1778    /// # #[tokio::main]
1779    /// # async fn main() -> Result<()> {
1780    /// use datafusion::dataframe::DataFrameWriteOptions;
1781    /// let ctx = SessionContext::new();
1782    /// // Sort the data by column "b" and write it to a new location
1783    /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1784    ///   .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1785    ///   .write_csv(
1786    ///     "output.csv",
1787    ///     DataFrameWriteOptions::new(),
1788    ///     None, // can also specify CSV writing options here
1789    /// ).await?;
1790    /// # fs::remove_file("output.csv")?;
1791    /// # Ok(())
1792    /// # }
1793    /// ```
1794    pub async fn write_csv(
1795        self,
1796        path: &str,
1797        options: DataFrameWriteOptions,
1798        writer_options: Option<CsvOptions>,
1799    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1800        if options.insert_op != InsertOp::Append {
1801            return not_impl_err!(
1802                "{} is not implemented for DataFrame::write_csv.",
1803                options.insert_op
1804            );
1805        }
1806
1807        let format = if let Some(csv_opts) = writer_options {
1808            Arc::new(CsvFormatFactory::new_with_options(csv_opts))
1809        } else {
1810            Arc::new(CsvFormatFactory::new())
1811        };
1812
1813        let file_type = format_as_file_type(format);
1814
1815        let plan = if options.sort_by.is_empty() {
1816            self.plan
1817        } else {
1818            LogicalPlanBuilder::from(self.plan)
1819                .sort(options.sort_by)?
1820                .build()?
1821        };
1822
1823        let plan = LogicalPlanBuilder::copy_to(
1824            plan,
1825            path.into(),
1826            file_type,
1827            HashMap::new(),
1828            options.partition_by,
1829        )?
1830        .build()?;
1831
1832        DataFrame {
1833            session_state: self.session_state,
1834            plan,
1835            projection_requires_validation: self.projection_requires_validation,
1836        }
1837        .collect()
1838        .await
1839    }
1840
1841    /// Execute the `DataFrame` and write the results to JSON file(s).
1842    ///
1843    /// # Example
1844    /// ```
1845    /// # use datafusion::prelude::*;
1846    /// # use datafusion::error::Result;
1847    /// # use std::fs;
1848    /// # #[tokio::main]
1849    /// # async fn main() -> Result<()> {
1850    /// use datafusion::dataframe::DataFrameWriteOptions;
1851    /// let ctx = SessionContext::new();
1852    /// // Sort the data by column "b" and write it to a new location
1853    /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1854    ///   .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1855    ///   .write_json(
1856    ///     "output.json",
1857    ///     DataFrameWriteOptions::new(),
1858    ///     None
1859    /// ).await?;
1860    /// # fs::remove_file("output.json")?;
1861    /// # Ok(())
1862    /// # }
1863    /// ```
1864    pub async fn write_json(
1865        self,
1866        path: &str,
1867        options: DataFrameWriteOptions,
1868        writer_options: Option<JsonOptions>,
1869    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1870        if options.insert_op != InsertOp::Append {
1871            return not_impl_err!(
1872                "{} is not implemented for DataFrame::write_json.",
1873                options.insert_op
1874            );
1875        }
1876
1877        let format = if let Some(json_opts) = writer_options {
1878            Arc::new(JsonFormatFactory::new_with_options(json_opts))
1879        } else {
1880            Arc::new(JsonFormatFactory::new())
1881        };
1882
1883        let file_type = format_as_file_type(format);
1884
1885        let plan = if options.sort_by.is_empty() {
1886            self.plan
1887        } else {
1888            LogicalPlanBuilder::from(self.plan)
1889                .sort(options.sort_by)?
1890                .build()?
1891        };
1892
1893        let plan = LogicalPlanBuilder::copy_to(
1894            plan,
1895            path.into(),
1896            file_type,
1897            Default::default(),
1898            options.partition_by,
1899        )?
1900        .build()?;
1901
1902        DataFrame {
1903            session_state: self.session_state,
1904            plan,
1905            projection_requires_validation: self.projection_requires_validation,
1906        }
1907        .collect()
1908        .await
1909    }
1910
1911    /// Add or replace a column in the DataFrame.
1912    ///
1913    /// # Example
1914    /// ```
1915    /// # use datafusion::prelude::*;
1916    /// # use datafusion::error::Result;
1917    /// # #[tokio::main]
1918    /// # async fn main() -> Result<()> {
1919    /// let ctx = SessionContext::new();
1920    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1921    /// let df = df.with_column("ab_sum", col("a") + col("b"))?;
1922    /// # Ok(())
1923    /// # }
1924    /// ```
1925    pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
1926        let window_func_exprs = find_window_exprs(std::slice::from_ref(&expr));
1927
1928        let (window_fn_str, plan) = if window_func_exprs.is_empty() {
1929            (None, self.plan)
1930        } else {
1931            (
1932                Some(window_func_exprs[0].to_string()),
1933                LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
1934            )
1935        };
1936
1937        let mut col_exists = false;
1938        let new_column = expr.alias(name);
1939        let mut fields: Vec<(Expr, bool)> = plan
1940            .schema()
1941            .iter()
1942            .filter_map(|(qualifier, field)| {
1943                if field.name() == name {
1944                    col_exists = true;
1945                    Some((new_column.clone(), true))
1946                } else {
1947                    let e = col(Column::from((qualifier, field)));
1948                    window_fn_str
1949                        .as_ref()
1950                        .filter(|s| *s == &e.to_string())
1951                        .is_none()
1952                        .then_some((e, self.projection_requires_validation))
1953                }
1954            })
1955            .collect();
1956
1957        if !col_exists {
1958            fields.push((new_column, true));
1959        }
1960
1961        let project_plan = LogicalPlanBuilder::from(plan)
1962            .project_with_validation(fields)?
1963            .build()?;
1964
1965        Ok(DataFrame {
1966            session_state: self.session_state,
1967            plan: project_plan,
1968            projection_requires_validation: false,
1969        })
1970    }
1971
1972    /// Rename one column by applying a new projection. This is a no-op if the column to be
1973    /// renamed does not exist.
1974    ///
1975    /// The method supports case sensitive rename with wrapping column name into one of following symbols (  "  or  '  or  `  )
1976    ///
1977    /// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
1978    /// case sensitive rename without need to wrap column name into special symbols
1979    ///
1980    /// # Example
1981    /// ```
1982    /// # use datafusion::prelude::*;
1983    /// # use datafusion::error::Result;
1984    /// # #[tokio::main]
1985    /// # async fn main() -> Result<()> {
1986    /// let ctx = SessionContext::new();
1987    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1988    /// let df = df.with_column_renamed("ab_sum", "total")?;
1989    ///
1990    /// # Ok(())
1991    /// # }
1992    /// ```
1993    pub fn with_column_renamed(
1994        self,
1995        old_name: impl Into<String>,
1996        new_name: &str,
1997    ) -> Result<DataFrame> {
1998        let ident_opts = self
1999            .session_state
2000            .config_options()
2001            .sql_parser
2002            .enable_ident_normalization;
2003        let old_column: Column = if ident_opts {
2004            Column::from_qualified_name(old_name)
2005        } else {
2006            Column::from_qualified_name_ignore_case(old_name)
2007        };
2008
2009        let (qualifier_rename, field_rename) =
2010            match self.plan.schema().qualified_field_from_column(&old_column) {
2011                Ok(qualifier_and_field) => qualifier_and_field,
2012                // no-op if field not found
2013                Err(DataFusionError::SchemaError(
2014                    SchemaError::FieldNotFound { .. },
2015                    _,
2016                )) => return Ok(self),
2017                Err(err) => return Err(err),
2018            };
2019        let projection = self
2020            .plan
2021            .schema()
2022            .iter()
2023            .map(|(qualifier, field)| {
2024                if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2025                    (
2026                        col(Column::from((qualifier, field)))
2027                            .alias_qualified(qualifier.cloned(), new_name),
2028                        false,
2029                    )
2030                } else {
2031                    (col(Column::from((qualifier, field))), false)
2032                }
2033            })
2034            .collect::<Vec<_>>();
2035        let project_plan = LogicalPlanBuilder::from(self.plan)
2036            .project_with_validation(projection)?
2037            .build()?;
2038        Ok(DataFrame {
2039            session_state: self.session_state,
2040            plan: project_plan,
2041            projection_requires_validation: false,
2042        })
2043    }
2044
2045    /// Replace all parameters in logical plan with the specified
2046    /// values, in preparation for execution.
2047    ///
2048    /// # Example
2049    ///
2050    /// ```
2051    /// use datafusion::prelude::*;
2052    /// # use datafusion::{error::Result, assert_batches_eq};
2053    /// # #[tokio::main]
2054    /// # async fn main() -> Result<()> {
2055    /// # use datafusion_common::ScalarValue;
2056    /// let ctx = SessionContext::new();
2057    /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
2058    /// let results = ctx
2059    ///   .sql("SELECT a FROM example WHERE b = $1")
2060    ///   .await?
2061    ///    // replace $1 with value 2
2062    ///   .with_param_values(vec![
2063    ///      // value at index 0 --> $1
2064    ///      ScalarValue::from(2i64)
2065    ///    ])?
2066    ///   .collect()
2067    ///   .await?;
2068    /// assert_batches_eq!(
2069    ///  &[
2070    ///    "+---+",
2071    ///    "| a |",
2072    ///    "+---+",
2073    ///    "| 1 |",
2074    ///    "+---+",
2075    ///  ],
2076    ///  &results
2077    /// );
2078    /// // Note you can also provide named parameters
2079    /// let results = ctx
2080    ///   .sql("SELECT a FROM example WHERE b = $my_param")
2081    ///   .await?
2082    ///    // replace $my_param with value 2
2083    ///    // Note you can also use a HashMap as well
2084    ///   .with_param_values(vec![
2085    ///       ("my_param", ScalarValue::from(2i64))
2086    ///    ])?
2087    ///   .collect()
2088    ///   .await?;
2089    /// assert_batches_eq!(
2090    ///  &[
2091    ///    "+---+",
2092    ///    "| a |",
2093    ///    "+---+",
2094    ///    "| 1 |",
2095    ///    "+---+",
2096    ///  ],
2097    ///  &results
2098    /// );
2099    /// # Ok(())
2100    /// # }
2101    /// ```
2102    pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
2103        let plan = self.plan.with_param_values(query_values)?;
2104        Ok(DataFrame {
2105            session_state: self.session_state,
2106            plan,
2107            projection_requires_validation: self.projection_requires_validation,
2108        })
2109    }
2110
2111    /// Cache DataFrame as a memory table.
2112    ///
2113    /// ```
2114    /// # use datafusion::prelude::*;
2115    /// # use datafusion::error::Result;
2116    /// # #[tokio::main]
2117    /// # async fn main() -> Result<()> {
2118    /// let ctx = SessionContext::new();
2119    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2120    /// let df = df.cache().await?;
2121    /// # Ok(())
2122    /// # }
2123    /// ```
2124    pub async fn cache(self) -> Result<DataFrame> {
2125        let context = SessionContext::new_with_state((*self.session_state).clone());
2126        // The schema is consistent with the output
2127        let plan = self.clone().create_physical_plan().await?;
2128        let schema = plan.schema();
2129        let task_ctx = Arc::new(self.task_ctx());
2130        let partitions = collect_partitioned(plan, task_ctx).await?;
2131        let mem_table = MemTable::try_new(schema, partitions)?;
2132        context.read_table(Arc::new(mem_table))
2133    }
2134
2135    /// Apply an alias to the DataFrame.
2136    ///
2137    /// This method replaces the qualifiers of output columns with the given alias.
2138    pub fn alias(self, alias: &str) -> Result<DataFrame> {
2139        let plan = LogicalPlanBuilder::from(self.plan).alias(alias)?.build()?;
2140        Ok(DataFrame {
2141            session_state: self.session_state,
2142            plan,
2143            projection_requires_validation: self.projection_requires_validation,
2144        })
2145    }
2146
2147    /// Fill null values in specified columns with a given value
2148    /// If no columns are specified (empty vector), applies to all columns
2149    /// Only fills if the value can be cast to the column's type
2150    ///
2151    /// # Arguments
2152    /// * `value` - Value to fill nulls with
2153    /// * `columns` - List of column names to fill. If empty, fills all columns.
2154    ///
2155    /// # Example
2156    /// ```
2157    /// # use datafusion::prelude::*;
2158    /// # use datafusion::error::Result;
2159    /// # use datafusion_common::ScalarValue;
2160    /// # #[tokio::main]
2161    /// # async fn main() -> Result<()> {
2162    /// let ctx = SessionContext::new();
2163    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2164    /// // Fill nulls in only columns "a" and "c":
2165    /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
2166    /// // Fill nulls across all columns:
2167    /// let df = df.fill_null(ScalarValue::from(0), vec![])?;
2168    /// # Ok(())
2169    /// # }
2170    /// ```
2171    pub fn fill_null(
2172        &self,
2173        value: ScalarValue,
2174        columns: Vec<String>,
2175    ) -> Result<DataFrame> {
2176        let cols = if columns.is_empty() {
2177            self.logical_plan()
2178                .schema()
2179                .fields()
2180                .iter()
2181                .map(|f| f.as_ref().clone())
2182                .collect()
2183        } else {
2184            self.find_columns(&columns)?
2185        };
2186
2187        // Create projections for each column
2188        let projections = self
2189            .logical_plan()
2190            .schema()
2191            .fields()
2192            .iter()
2193            .map(|field| {
2194                if cols.contains(field) {
2195                    // Try to cast fill value to column type. If the cast fails, fallback to the original column.
2196                    match value.clone().cast_to(field.data_type()) {
2197                        Ok(fill_value) => Expr::Alias(Alias {
2198                            expr: Box::new(Expr::ScalarFunction(ScalarFunction {
2199                                func: coalesce(),
2200                                args: vec![col(field.name()), lit(fill_value)],
2201                            })),
2202                            relation: None,
2203                            name: field.name().to_string(),
2204                            metadata: None,
2205                        }),
2206                        Err(_) => col(field.name()),
2207                    }
2208                } else {
2209                    col(field.name())
2210                }
2211            })
2212            .collect::<Vec<_>>();
2213
2214        self.clone().select(projections)
2215    }
2216
2217    // Helper to find columns from names
2218    fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2219        let schema = self.logical_plan().schema();
2220        names
2221            .iter()
2222            .map(|name| {
2223                schema
2224                    .field_with_name(None, name)
2225                    .cloned()
2226                    .map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2227            })
2228            .collect()
2229    }
2230
2231    /// Helper for creating DataFrame.
2232    /// # Example
2233    /// ```
2234    /// use std::sync::Arc;
2235    /// use arrow::array::{ArrayRef, Int32Array, StringArray};
2236    /// use datafusion::prelude::DataFrame;
2237    /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
2238    /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
2239    /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
2240    /// // +----+------+,
2241    /// // | id | name |,
2242    /// // +----+------+,
2243    /// // | 1  | foo  |,
2244    /// // | 2  | bar  |,
2245    /// // | 3  | baz  |,
2246    /// // +----+------+,
2247    /// ```
2248    pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result<Self> {
2249        let fields = columns
2250            .iter()
2251            .map(|(name, array)| Field::new(*name, array.data_type().clone(), true))
2252            .collect::<Vec<_>>();
2253
2254        let arrays = columns
2255            .into_iter()
2256            .map(|(_, array)| array)
2257            .collect::<Vec<_>>();
2258
2259        let schema = Arc::new(Schema::new(fields));
2260        let batch = RecordBatch::try_new(schema, arrays)?;
2261        let ctx = SessionContext::new();
2262        let df = ctx.read_batch(batch)?;
2263        Ok(df)
2264    }
2265}
2266
2267/// Macro for creating DataFrame.
2268/// # Example
2269/// ```
2270/// use datafusion::prelude::dataframe;
2271/// # use datafusion::error::Result;
2272/// # #[tokio::main]
2273/// # async fn main() -> Result<()> {
2274/// let df = dataframe!(
2275///    "id" => [1, 2, 3],
2276///    "name" => ["foo", "bar", "baz"]
2277///  )?;
2278/// df.show().await?;
2279/// // +----+------+,
2280/// // | id | name |,
2281/// // +----+------+,
2282/// // | 1  | foo  |,
2283/// // | 2  | bar  |,
2284/// // | 3  | baz  |,
2285/// // +----+------+,
2286/// let df_empty = dataframe!()?; // empty DataFrame
2287/// assert_eq!(df_empty.schema().fields().len(), 0);
2288/// assert_eq!(df_empty.count().await?, 0);
2289/// # Ok(())
2290/// # }
2291/// ```
2292#[macro_export]
2293macro_rules! dataframe {
2294    () => {{
2295        use std::sync::Arc;
2296
2297        use datafusion::prelude::SessionContext;
2298        use datafusion::arrow::array::RecordBatch;
2299        use datafusion::arrow::datatypes::Schema;
2300
2301        let ctx = SessionContext::new();
2302        let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
2303        ctx.read_batch(batch)
2304    }};
2305
2306    ($($name:expr => $data:expr),+ $(,)?) => {{
2307        use datafusion::prelude::DataFrame;
2308        use datafusion::common::test_util::IntoArrayRef;
2309
2310        let columns = vec![
2311            $(
2312                ($name, $data.into_array_ref()),
2313            )+
2314        ];
2315
2316        DataFrame::from_columns(columns)
2317    }};
2318}
2319
2320#[derive(Debug)]
2321struct DataFrameTableProvider {
2322    plan: LogicalPlan,
2323}
2324
2325#[async_trait]
2326impl TableProvider for DataFrameTableProvider {
2327    fn as_any(&self) -> &dyn Any {
2328        self
2329    }
2330
2331    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
2332        Some(Cow::Borrowed(&self.plan))
2333    }
2334
2335    fn supports_filters_pushdown(
2336        &self,
2337        filters: &[&Expr],
2338    ) -> Result<Vec<TableProviderFilterPushDown>> {
2339        // A filter is added on the DataFrame when given
2340        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2341    }
2342
2343    fn schema(&self) -> SchemaRef {
2344        let schema: Schema = self.plan.schema().as_ref().into();
2345        Arc::new(schema)
2346    }
2347
2348    fn table_type(&self) -> TableType {
2349        TableType::View
2350    }
2351
2352    async fn scan(
2353        &self,
2354        state: &dyn Session,
2355        projection: Option<&Vec<usize>>,
2356        filters: &[Expr],
2357        limit: Option<usize>,
2358    ) -> Result<Arc<dyn ExecutionPlan>> {
2359        let mut expr = LogicalPlanBuilder::from(self.plan.clone());
2360        // Add filter when given
2361        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
2362        if let Some(filter) = filter {
2363            expr = expr.filter(filter)?
2364        }
2365
2366        if let Some(p) = projection {
2367            expr = expr.select(p.iter().copied())?
2368        }
2369
2370        // add a limit if given
2371        if let Some(l) = limit {
2372            expr = expr.limit(0, Some(l))?
2373        }
2374        let plan = expr.build()?;
2375        state.create_physical_plan(&plan).await
2376    }
2377}
2378
2379// see tests in datafusion/core/tests/dataframe/mod.rs:2816