use std::{error::Error, path::PathBuf};
#[macro_export]
macro_rules! assert_batches_eq {
($EXPECTED_LINES: expr, $CHUNKS: expr) => {
let expected_lines: Vec<String> =
$EXPECTED_LINES.iter().map(|&s| s.into()).collect();
let formatted = $crate::arrow::util::pretty::pretty_format_batches_with_options(
$CHUNKS,
&$crate::format::DEFAULT_FORMAT_OPTIONS,
)
.unwrap()
.to_string();
let actual_lines: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
#[macro_export]
macro_rules! assert_batches_sorted_eq {
($EXPECTED_LINES: expr, $CHUNKS: expr) => {
let mut expected_lines: Vec<String> =
$EXPECTED_LINES.iter().map(|&s| s.into()).collect();
let num_lines = expected_lines.len();
if num_lines > 3 {
expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
let formatted = $crate::arrow::util::pretty::pretty_format_batches_with_options(
$CHUNKS,
&$crate::format::DEFAULT_FORMAT_OPTIONS,
)
.unwrap()
.to_string();
let mut actual_lines: Vec<&str> = formatted.trim().lines().collect();
let num_lines = actual_lines.len();
if num_lines > 3 {
actual_lines.as_mut_slice()[2..num_lines - 1].sort_unstable()
}
assert_eq!(
expected_lines, actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
#[macro_export]
macro_rules! assert_contains {
($ACTUAL: expr, $EXPECTED: expr) => {
let actual_value: String = $ACTUAL.into();
let expected_value: String = $EXPECTED.into();
assert!(
actual_value.contains(&expected_value),
"Can not find expected in actual.\n\nExpected:\n{}\n\nActual:\n{}",
expected_value,
actual_value
);
};
}
#[macro_export]
macro_rules! assert_not_contains {
($ACTUAL: expr, $UNEXPECTED: expr) => {
let actual_value: String = $ACTUAL.into();
let unexpected_value: String = $UNEXPECTED.into();
assert!(
!actual_value.contains(&unexpected_value),
"Found unexpected in actual.\n\nUnexpected:\n{}\n\nActual:\n{}",
unexpected_value,
actual_value
);
};
}
pub fn datafusion_test_data() -> String {
match get_data_dir("DATAFUSION_TEST_DATA", "../../datafusion/core/tests/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get arrow data dir: {err}"),
}
}
pub fn arrow_test_data() -> String {
match get_data_dir("ARROW_TEST_DATA", "../../testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get arrow data dir: {err}"),
}
}
#[cfg(feature = "parquet")]
pub fn parquet_test_data() -> String {
match get_data_dir("PARQUET_TEST_DATA", "../../parquet-testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get parquet data dir: {err}"),
}
}
pub fn get_data_dir(
udf_env: &str,
submodule_data: &str,
) -> Result<PathBuf, Box<dyn Error>> {
if let Ok(dir) = std::env::var(udf_env) {
let trimmed = dir.trim().to_string();
if !trimmed.is_empty() {
let pb = PathBuf::from(trimmed);
if pb.is_dir() {
return Ok(pb);
} else {
return Err(format!(
"the data dir `{}` defined by env {} not found",
pb.display(),
udf_env
)
.into());
}
}
}
let dir = env!("CARGO_MANIFEST_DIR");
let pb = PathBuf::from(dir).join(submodule_data);
if pb.is_dir() {
Ok(pb)
} else {
Err(format!(
"env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
HINT: try running `git submodule update --init`",
udf_env,
pb.display(),
).into())
}
}
#[macro_export]
macro_rules! create_array {
(Boolean, $values: expr) => {
std::sync::Arc::new(arrow::array::BooleanArray::from($values))
};
(Int8, $values: expr) => {
std::sync::Arc::new(arrow::array::Int8Array::from($values))
};
(Int16, $values: expr) => {
std::sync::Arc::new(arrow::array::Int16Array::from($values))
};
(Int32, $values: expr) => {
std::sync::Arc::new(arrow::array::Int32Array::from($values))
};
(Int64, $values: expr) => {
std::sync::Arc::new(arrow::array::Int64Array::from($values))
};
(UInt8, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt8Array::from($values))
};
(UInt16, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt16Array::from($values))
};
(UInt32, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt32Array::from($values))
};
(UInt64, $values: expr) => {
std::sync::Arc::new(arrow::array::UInt64Array::from($values))
};
(Float16, $values: expr) => {
std::sync::Arc::new(arrow::array::Float16Array::from($values))
};
(Float32, $values: expr) => {
std::sync::Arc::new(arrow::array::Float32Array::from($values))
};
(Float64, $values: expr) => {
std::sync::Arc::new(arrow::array::Float64Array::from($values))
};
(Utf8, $values: expr) => {
std::sync::Arc::new(arrow::array::StringArray::from($values))
};
}
#[macro_export]
macro_rules! record_batch {
($(($name: expr, $type: ident, $values: expr)),*) => {
{
let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
$(
arrow_schema::Field::new($name, arrow_schema::DataType::$type, true),
)*
]));
let batch = arrow_array::RecordBatch::try_new(
schema,
vec![$(
$crate::create_array!($type, $values),
)*]
);
batch
}
}
}
#[cfg(test)]
mod tests {
use crate::cast::{as_float64_array, as_int32_array, as_string_array};
use crate::error::Result;
use super::*;
use std::env;
#[test]
fn test_data_dir() {
let udf_env = "get_data_dir";
let cwd = env::current_dir().unwrap();
let existing_pb = cwd.join("..");
let existing = existing_pb.display().to_string();
let existing_str = existing.as_str();
let non_existing = cwd.join("non-existing-dir").display().to_string();
let non_existing_str = non_existing.as_str();
env::set_var(udf_env, non_existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_err());
env::set_var(udf_env, "");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
env::set_var(udf_env, " ");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
env::set_var(udf_env, existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
env::remove_var(udf_env);
let res = get_data_dir(udf_env, non_existing_str);
assert!(res.is_err());
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
}
#[test]
#[cfg(feature = "parquet")]
fn test_happy() {
let res = arrow_test_data();
assert!(PathBuf::from(res).is_dir());
let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}
#[test]
fn test_create_record_batch() -> Result<()> {
use arrow_array::Array;
let batch = record_batch!(
("a", Int32, vec![1, 2, 3, 4]),
("b", Float64, vec![Some(4.0), None, Some(5.0), None]),
("c", Utf8, vec!["alpha", "beta", "gamma", "delta"])
)?;
assert_eq!(3, batch.num_columns());
assert_eq!(4, batch.num_rows());
let values: Vec<_> = as_int32_array(batch.column(0))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![1, 2, 3, 4]);
let values: Vec<_> = as_float64_array(batch.column(1))?
.values()
.iter()
.map(|v| v.to_owned())
.collect();
assert_eq!(values, vec![4.0, 0.0, 5.0, 0.0]);
let nulls: Vec<_> = as_float64_array(batch.column(1))?
.nulls()
.unwrap()
.iter()
.collect();
assert_eq!(nulls, vec![true, false, true, false]);
let values: Vec<_> = as_string_array(batch.column(2))?.iter().flatten().collect();
assert_eq!(values, vec!["alpha", "beta", "gamma", "delta"]);
Ok(())
}
}