use std::fs;
use std::fs::metadata;
use std::sync::Arc;
use super::SendableRecordBatchReader;
use crate::error::{ExecutionError, Result};
use array::{
BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, LargeStringArray, StringArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{
array::{self, ArrayRef},
datatypes::Schema,
};
pub struct RecordBatchIterator {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
index: usize,
}
impl RecordBatchIterator {
pub fn new(schema: SchemaRef, batches: Vec<Arc<RecordBatch>>) -> Self {
RecordBatchIterator {
schema,
index: 0,
batches,
}
}
}
impl Iterator for RecordBatchIterator {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.batches.len() {
self.index += 1;
Some(Ok(self.batches[self.index - 1].as_ref().clone()))
} else {
None
}
}
}
impl RecordBatchReader for RecordBatchIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
pub fn collect(it: SendableRecordBatchReader) -> Result<Vec<RecordBatch>> {
it.into_iter()
.collect::<ArrowResult<Vec<_>>>()
.map_err(|e| ExecutionError::from(e))
}
pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
build_file_list(path_name, filenames, ext)?;
} else {
if path_name.ends_with(ext) {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
}
Ok(())
}
pub fn create_batch_empty(schema: &Schema) -> ArrowResult<RecordBatch> {
let columns = schema
.fields()
.iter()
.map(|f| match f.data_type() {
DataType::Float32 => {
Ok(Arc::new(Float32Array::from(vec![] as Vec<f32>)) as ArrayRef)
}
DataType::Float64 => {
Ok(Arc::new(Float64Array::from(vec![] as Vec<f64>)) as ArrayRef)
}
DataType::Int64 => {
Ok(Arc::new(Int64Array::from(vec![] as Vec<i64>)) as ArrayRef)
}
DataType::Int32 => {
Ok(Arc::new(Int32Array::from(vec![] as Vec<i32>)) as ArrayRef)
}
DataType::Int16 => {
Ok(Arc::new(Int16Array::from(vec![] as Vec<i16>)) as ArrayRef)
}
DataType::Int8 => {
Ok(Arc::new(Int8Array::from(vec![] as Vec<i8>)) as ArrayRef)
}
DataType::UInt64 => {
Ok(Arc::new(UInt64Array::from(vec![] as Vec<u64>)) as ArrayRef)
}
DataType::UInt32 => {
Ok(Arc::new(UInt32Array::from(vec![] as Vec<u32>)) as ArrayRef)
}
DataType::UInt16 => {
Ok(Arc::new(UInt16Array::from(vec![] as Vec<u16>)) as ArrayRef)
}
DataType::UInt8 => {
Ok(Arc::new(UInt8Array::from(vec![] as Vec<u8>)) as ArrayRef)
}
DataType::Utf8 => {
Ok(Arc::new(StringArray::from(vec![] as Vec<&str>)) as ArrayRef)
}
DataType::LargeUtf8 => {
Ok(Arc::new(LargeStringArray::from(vec![] as Vec<&str>)) as ArrayRef)
}
DataType::Boolean => {
Ok(Arc::new(BooleanArray::from(vec![] as Vec<bool>)) as ArrayRef)
}
_ => Err(ExecutionError::NotImplemented(format!(
"Cannot convert datatype {:?} to array",
f.data_type()
))),
})
.collect::<Result<_>>()
.map_err(ExecutionError::into_arrow_external_error)?;
RecordBatch::try_new(Arc::new(schema.to_owned()), columns)
}