use std::rc::Rc;
use std::str;
use super::super::errors::*;
use super::super::types::*;
use arrow::datatypes::*;
pub struct MinFunction {
data_type: DataType,
value: ScalarValue,
}
impl MinFunction {
pub fn new(data_type: &DataType) -> Self {
MinFunction {
data_type: data_type.clone(),
value: ScalarValue::Null,
}
}
}
macro_rules! min_in_column {
($SELF:ident, $BUF:ident, $VARIANT:ident) => {{
for i in 0..$BUF.len() as usize {
let value = *$BUF.get(i);
match $SELF.value {
ScalarValue::Null => $SELF.value = ScalarValue::$VARIANT(value),
ScalarValue::$VARIANT(x) => if value < x {
$SELF.value = ScalarValue::$VARIANT(value)
},
ref other => panic!("Type mismatch in MAX() for datatype {} - {:?}",
stringify!($VARIANT), other),
}
}
}}
}
macro_rules! min_in_scalar {
($SELF:ident, $VALUE:ident, $VARIANT:ident) => {{
match $SELF.value {
ScalarValue::Null => $SELF.value = ScalarValue::$VARIANT(*$VALUE),
ScalarValue::$VARIANT(x) => if *$VALUE < x {
$SELF.value = ScalarValue::$VARIANT(*$VALUE)
},
_ => panic!("type mismatch in MIN()"),
}
Ok(())
}}
}
impl AggregateFunction for MinFunction {
fn name(&self) -> String {
"MAX".to_string()
}
fn args(&self) -> Vec<Field> {
vec![Field::new("arg", self.data_type.clone(), true)]
}
fn return_type(&self) -> DataType {
self.data_type.clone()
}
fn execute(&mut self, args: &Vec<Value>) -> Result<()> {
assert_eq!(1, args.len());
match args[0] {
Value::Column(ref array) => {
match array.data() {
ArrayData::UInt8(ref buf) => min_in_column!(self, buf, UInt8),
ArrayData::UInt16(ref buf) => min_in_column!(self, buf, UInt16),
ArrayData::UInt32(ref buf) => min_in_column!(self, buf, UInt32),
ArrayData::UInt64(ref buf) => min_in_column!(self, buf, UInt64),
ArrayData::Int8(ref buf) => min_in_column!(self, buf, Int8),
ArrayData::Int16(ref buf) => min_in_column!(self, buf, Int16),
ArrayData::Int32(ref buf) => min_in_column!(self, buf, Int32),
ArrayData::Int64(ref buf) => min_in_column!(self, buf, Int64),
ArrayData::Float32(ref buf) => min_in_column!(self, buf, Float32),
ArrayData::Float64(ref buf) => min_in_column!(self, buf, Float64),
ArrayData::Utf8(ref list) => {
if list.len() > 0 {
let mut s = str::from_utf8(list.get(0)).unwrap().to_string();
for i in 1..list.len() {
let s2 = str::from_utf8(list.get(i)).unwrap().to_string();
if s2 < s {
s = s2;
}
}
self.value = match &self.value {
ScalarValue::Null =>
ScalarValue::Utf8(Rc::new(s)),
ScalarValue::Utf8(current) => if &s < current.as_ref() {
ScalarValue::Utf8(Rc::new(s))
} else {
self.value.clone()
},
_ => panic!()
};
}
},
_ => unimplemented!("MIN() unsupported array datatype"),
}
Ok(())
}
Value::Scalar(ref v) => match v.as_ref() {
ScalarValue::UInt8(ref value) => min_in_scalar!(self, value, UInt8),
ScalarValue::UInt16(ref value) => min_in_scalar!(self, value, UInt16),
ScalarValue::UInt32(ref value) => min_in_scalar!(self, value, UInt32),
ScalarValue::UInt64(ref value) => min_in_scalar!(self, value, UInt64),
ScalarValue::Int8(ref value) => min_in_scalar!(self, value, Int8),
ScalarValue::Int16(ref value) => min_in_scalar!(self, value, Int16),
ScalarValue::Int32(ref value) => min_in_scalar!(self, value, Int32),
ScalarValue::Int64(ref value) => min_in_scalar!(self, value, Int64),
ScalarValue::Float32(ref value) => min_in_scalar!(self, value, Float32),
ScalarValue::Float64(ref value) => min_in_scalar!(self, value, Float64),
ScalarValue::Utf8(ref value) => {
self.value = match &self.value {
ScalarValue::Null =>
ScalarValue::Utf8(value.clone()),
ScalarValue::Utf8(ref current) => if value < current {
ScalarValue::Utf8(value.clone())
} else {
self.value.clone()
},
_ => panic!()
};
Ok(())
}
_ => unimplemented!("MIN() unsupported scalar datatype"),
},
}
}
fn finish(&self) -> Result<Value> {
Ok(Value::Scalar(Rc::new(self.value.clone())))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_min() {
let mut min = MinFunction::new(&DataType::Float64);
assert_eq!(DataType::Float64, min.return_type());
let values: Vec<f64> = vec![12.0, 22.0, 32.0, 6.0, 58.1];
min.execute(&vec![Value::Column(Rc::new(Array::from(values)))])
.unwrap();
let result = min.finish().unwrap();
match result {
Value::Scalar(ref v) => assert_eq!(v.get_f64().unwrap(), 6.0),
_ => panic!(),
}
}
}