Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
datafusion-sql 37.0.0 - Docs.rs
[go: Go Back, main page]

datafusion-sql 37.0.0

DataFusion SQL Query Planner
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

use arrow::datatypes::Schema;
use datafusion_common::{
    not_impl_err, plan_err, sql_err, Constraints, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::{
    CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
    Operator,
};
use sqlparser::ast::{
    Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query, SetExpr, SetOperator,
    SetQuantifier, Value,
};

use sqlparser::parser::ParserError::ParserError;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
    /// Generate a logical plan from an SQL query
    pub(crate) fn query_to_plan(
        &self,
        query: Query,
        planner_context: &mut PlannerContext,
    ) -> Result<LogicalPlan> {
        self.query_to_plan_with_schema(query, planner_context)
    }

    /// Generate a logic plan from an SQL query.
    /// It's implementation of `subquery_to_plan` and `query_to_plan`.
    /// It shouldn't be invoked directly.
    fn query_to_plan_with_schema(
        &self,
        query: Query,
        planner_context: &mut PlannerContext,
    ) -> Result<LogicalPlan> {
        let set_expr = query.body;
        if let Some(with) = query.with {
            // Process CTEs from top to bottom
            let is_recursive = with.recursive;

            for cte in with.cte_tables {
                // A `WITH` block can't use the same name more than once
                let cte_name = self.normalizer.normalize(cte.alias.name.clone());
                if planner_context.contains_cte(&cte_name) {
                    return sql_err!(ParserError(format!(
                        "WITH query name {cte_name:?} specified more than once"
                    )));
                }

                if is_recursive {
                    if !self
                        .context_provider
                        .options()
                        .execution
                        .enable_recursive_ctes
                    {
                        return not_impl_err!("Recursive CTEs are not enabled");
                    }

                    match *cte.query.body {
                        SetExpr::SetOperation {
                            op: SetOperator::Union,
                            left,
                            right,
                            set_quantifier,
                        } => {
                            let distinct = set_quantifier != SetQuantifier::All;

                            // Each recursive CTE consists from two parts in the logical plan:
                            //   1. A static term   (the left hand side on the SQL, where the
                            //                       referencing to the same CTE is not allowed)
                            //
                            //   2. A recursive term (the right hand side, and the recursive
                            //                       part)

                            // Since static term does not have any specific properties, it can
                            // be compiled as if it was a regular expression. This will
                            // allow us to infer the schema to be used in the recursive term.

                            // ---------- Step 1: Compile the static term ------------------
                            let static_plan = self
                                .set_expr_to_plan(*left, &mut planner_context.clone())?;

                            // Since the recursive CTEs include a component that references a
                            // table with its name, like the example below:
                            //
                            // WITH RECURSIVE values(n) AS (
                            //      SELECT 1 as n -- static term
                            //    UNION ALL
                            //      SELECT n + 1
                            //      FROM values -- self reference
                            //      WHERE n < 100
                            // )
                            //
                            // We need a temporary 'relation' to be referenced and used. PostgreSQL
                            // calls this a 'working table', but it is entirely an implementation
                            // detail and a 'real' table with that name might not even exist (as
                            // in the case of DataFusion).
                            //
                            // Since we can't simply register a table during planning stage (it is
                            // an execution problem), we'll use a relation object that preserves the
                            // schema of the input perfectly and also knows which recursive CTE it is
                            // bound to.

                            // ---------- Step 2: Create a temporary relation ------------------
                            // Step 2.1: Create a table source for the temporary relation
                            let work_table_source =
                                self.context_provider.create_cte_work_table(
                                    &cte_name,
                                    Arc::new(Schema::from(static_plan.schema().as_ref())),
                                )?;

                            // Step 2.2: Create a temporary relation logical plan that will be used
                            // as the input to the recursive term
                            let work_table_plan = LogicalPlanBuilder::scan(
                                cte_name.to_string(),
                                work_table_source,
                                None,
                            )?
                            .build()?;

                            let name = cte_name.clone();

                            // Step 2.3: Register the temporary relation in the planning context
                            // For all the self references in the variadic term, we'll replace it
                            // with the temporary relation we created above by temporarily registering
                            // it as a CTE. This temporary relation in the planning context will be
                            // replaced by the actual CTE plan once we're done with the planning.
                            planner_context.insert_cte(cte_name.clone(), work_table_plan);

                            // ---------- Step 3: Compile the recursive term ------------------
                            // this uses the named_relation we inserted above to resolve the
                            // relation. This ensures that the recursive term uses the named relation logical plan
                            // and thus the 'continuance' physical plan as its input and source
                            let recursive_plan = self
                                .set_expr_to_plan(*right, &mut planner_context.clone())?;

                            // ---------- Step 4: Create the final plan ------------------
                            // Step 4.1: Compile the final plan
                            let logical_plan = LogicalPlanBuilder::from(static_plan)
                                .to_recursive_query(name, recursive_plan, distinct)?
                                .build()?;

                            let final_plan =
                                self.apply_table_alias(logical_plan, cte.alias)?;

                            // Step 4.2: Remove the temporary relation from the planning context and replace it
                            // with the final plan.
                            planner_context.insert_cte(cte_name.clone(), final_plan);
                        }
                        _ => {
                            return Err(DataFusionError::SQL(
                                ParserError(format!("Unsupported CTE: {cte}")),
                                None,
                            ));
                        }
                    };
                } else {
                    // create logical plan & pass backreferencing CTEs
                    // CTE expr don't need extend outer_query_schema
                    let logical_plan =
                        self.query_to_plan(*cte.query, &mut planner_context.clone())?;

                    // Each `WITH` block can change the column names in the last
                    // projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2").
                    let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;

                    planner_context.insert_cte(cte_name, logical_plan);
                }
            }
        }
        let plan = self.set_expr_to_plan(*(set_expr.clone()), planner_context)?;
        let plan = self.order_by(plan, query.order_by, planner_context)?;
        let plan = self.limit(plan, query.offset, query.limit)?;

        let plan = match *set_expr {
            SetExpr::Select(select) if select.into.is_some() => {
                let select_into = select.into.unwrap();
                LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
                    name: self.object_name_to_table_reference(select_into.name)?,
                    constraints: Constraints::empty(),
                    input: Arc::new(plan),
                    if_not_exists: false,
                    or_replace: false,
                    column_defaults: vec![],
                }))
            }
            _ => plan,
        };

        Ok(plan)
    }

    /// Wrap a plan in a limit
    fn limit(
        &self,
        input: LogicalPlan,
        skip: Option<SQLOffset>,
        fetch: Option<SQLExpr>,
    ) -> Result<LogicalPlan> {
        if skip.is_none() && fetch.is_none() {
            return Ok(input);
        }

        let skip = match skip {
            Some(skip_expr) => {
                let expr = self.sql_to_expr(
                    skip_expr.value,
                    input.schema(),
                    &mut PlannerContext::new(),
                )?;
                let n = get_constant_result(&expr, "OFFSET")?;
                convert_usize_with_check(n, "OFFSET")
            }
            _ => Ok(0),
        }?;

        let fetch = match fetch {
            Some(limit_expr)
                if limit_expr != sqlparser::ast::Expr::Value(Value::Null) =>
            {
                let expr = self.sql_to_expr(
                    limit_expr,
                    input.schema(),
                    &mut PlannerContext::new(),
                )?;
                let n = get_constant_result(&expr, "LIMIT")?;
                Some(convert_usize_with_check(n, "LIMIT")?)
            }
            _ => None,
        };

        LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
    }

    /// Wrap the logical in a sort
    fn order_by(
        &self,
        plan: LogicalPlan,
        order_by: Vec<OrderByExpr>,
        planner_context: &mut PlannerContext,
    ) -> Result<LogicalPlan> {
        if order_by.is_empty() {
            return Ok(plan);
        }

        let order_by_rex =
            self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?;

        if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
            // In case of `DISTINCT ON` we must capture the sort expressions since during the plan
            // optimization we're effectively doing a `first_value` aggregation according to them.
            let distinct_on = distinct_on.clone().with_sort_expr(order_by_rex)?;
            Ok(LogicalPlan::Distinct(Distinct::On(distinct_on)))
        } else {
            LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
        }
    }
}

/// Retrieves the constant result of an expression, evaluating it if possible.
///
/// This function takes an expression and an argument name as input and returns
/// a `Result<i64>` indicating either the constant result of the expression or an
/// error if the expression cannot be evaluated.
///
/// # Arguments
///
/// * `expr` - An `Expr` representing the expression to evaluate.
/// * `arg_name` - The name of the argument for error messages.
///
/// # Returns
///
/// * `Result<i64>` - An `Ok` variant containing the constant result if evaluation is successful,
///   or an `Err` variant containing an error message if evaluation fails.
///
/// <https://github.com/apache/arrow-datafusion/issues/9821> tracks a more general solution
fn get_constant_result(expr: &Expr, arg_name: &str) -> Result<i64> {
    match expr {
        Expr::Literal(ScalarValue::Int64(Some(s))) => Ok(*s),
        Expr::BinaryExpr(binary_expr) => {
            let lhs = get_constant_result(&binary_expr.left, arg_name)?;
            let rhs = get_constant_result(&binary_expr.right, arg_name)?;
            let res = match binary_expr.op {
                Operator::Plus => lhs + rhs,
                Operator::Minus => lhs - rhs,
                Operator::Multiply => lhs * rhs,
                _ => return plan_err!("Unsupported operator for {arg_name} clause"),
            };
            Ok(res)
        }
        _ => plan_err!("Unexpected expression in {arg_name} clause"),
    }
}

/// Converts an `i64` to `usize`, performing a boundary check.
fn convert_usize_with_check(n: i64, arg_name: &str) -> Result<usize> {
    if n < 0 {
        plan_err!("{arg_name} must be >= 0, '{n}' was provided.")
    } else {
        Ok(n as usize)
    }
}