Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
datafusion 0.15.0 - Docs.rs
[go: Go Back, main page]

datafusion 0.15.0

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the merge plan for executing partitions in parallel and then merging the results
//! into a single partition

use crate::error::Result;
use crate::execution::physical_plan::common::RecordBatchIterator;
use crate::execution::physical_plan::{common, ExecutionPlan};
use crate::execution::physical_plan::{BatchIterator, Partition};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
pub struct MergeExec {
    /// Input schema
    schema: Arc<Schema>,
    /// Input partitions
    partitions: Vec<Arc<dyn Partition>>,
}

impl MergeExec {
    /// Create a new MergeExec
    pub fn new(schema: Arc<Schema>, partitions: Vec<Arc<dyn Partition>>) -> Self {
        MergeExec { schema, partitions }
    }
}

impl ExecutionPlan for MergeExec {
    fn schema(&self) -> Arc<Schema> {
        self.schema.clone()
    }

    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
        Ok(vec![Arc::new(MergePartition {
            schema: self.schema.clone(),
            partitions: self.partitions.clone(),
        })])
    }
}

struct MergePartition {
    /// Input schema
    schema: Arc<Schema>,
    /// Input partitions
    partitions: Vec<Arc<dyn Partition>>,
}

impl Partition for MergePartition {
    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
            .partitions
            .iter()
            .map(|p| {
                let p = p.clone();
                thread::spawn(move || {
                    let it = p.execute()?;
                    common::collect(it)
                })
            })
            .collect();

        // combine the results from each thread
        let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
        for thread in threads {
            let join = thread.join().expect("Failed to join thread");
            let result = join?;
            result
                .iter()
                .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
        }

        Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
            self.schema.clone(),
            combined_results,
        ))))
    }
}

#[cfg(test)]
mod tests {

    use super::*;
    use crate::execution::physical_plan::common;
    use crate::execution::physical_plan::csv::CsvExec;
    use crate::test;

    #[test]
    fn merge() -> Result<()> {
        let schema = test::aggr_test_schema();

        let num_partitions = 4;
        let path =
            test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?;

        let csv = CsvExec::try_new(&path, schema.clone(), true, None, 1024)?;

        // input should have 4 partitions
        let input = csv.partitions()?;
        assert_eq!(input.len(), num_partitions);

        let merge = MergeExec::new(schema.clone(), input);

        // output of MergeExec should have a single partition
        let merged = merge.partitions()?;
        assert_eq!(merged.len(), 1);

        // the result should contain 4 batches (one per input partition)
        let iter = merged[0].execute()?;
        let batches = common::collect(iter)?;
        assert_eq!(batches.len(), num_partitions);

        // there should be a total of 100 rows
        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
        assert_eq!(row_count, 100);

        Ok(())
    }
}