use std::{
any,
sync::{Arc, Weak},
};
use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use datafusion_common::Result;
use crate::datasource::{MemTable, TableProvider};
use crate::logical_expr::TableType;
use super::{
catalog::{CatalogList, CatalogProvider},
schema::SchemaProvider,
};
const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const COLUMNS: &str = "columns";
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
}
impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
inner,
}
}
}
impl CatalogProvider for CatalogWithInformationSchema {
fn as_any(&self) -> &dyn any::Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.inner
.schema_names()
.into_iter()
.chain(std::iter::once(INFORMATION_SCHEMA.to_string()))
.collect::<Vec<String>>()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider { catalog_list })
as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
}
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let catalog = &self.inner;
catalog.register_schema(name, schema)
}
}
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
}
impl InformationSchemaProvider {
fn make_tables(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaTablesBuilder::new();
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
builder.add_table(
&catalog_name,
&schema_name,
table_name,
table.table_type(),
table.get_table_definition(),
);
}
}
}
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
TABLES,
TableType::View,
None::<&str>,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
COLUMNS,
TableType::View,
None::<&str>,
);
}
let mem_table: MemTable = builder.into();
Arc::new(mem_table)
}
fn make_columns(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaColumnsBuilder::new();
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
for (i, field) in table.schema().fields().iter().enumerate() {
builder.add_column(
&catalog_name,
&schema_name,
&table_name,
field.name(),
i,
field.is_nullable(),
field.data_type(),
)
}
}
}
}
}
let mem_table: MemTable = builder.into();
Arc::new(mem_table)
}
}
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &(dyn any::Any + 'static) {
self
}
fn table_names(&self) -> Vec<String> {
vec![TABLES.to_string(), COLUMNS.to_string()]
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if name.eq_ignore_ascii_case("tables") {
Some(self.make_tables())
} else if name.eq_ignore_ascii_case("columns") {
Some(self.make_columns())
} else {
None
}
}
fn table_exist(&self, name: &str) -> bool {
return matches!(name.to_ascii_lowercase().as_str(), TABLES | COLUMNS);
}
}
struct InformationSchemaTablesBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
table_types: StringBuilder,
definitions: StringBuilder,
}
impl InformationSchemaTablesBuilder {
fn new() -> Self {
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
table_types: StringBuilder::new(default_capacity),
definitions: StringBuilder::new(default_capacity),
}
}
fn add_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
table_type: TableType,
definition: Option<impl AsRef<str>>,
) {
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types
.append_value(match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
})
.unwrap();
self.definitions.append_option(definition.as_ref()).unwrap();
}
}
impl From<InformationSchemaTablesBuilder> for MemTable {
fn from(value: InformationSchemaTablesBuilder) -> MemTable {
let schema = Schema::new(vec![
Field::new("table_catalog", DataType::Utf8, false),
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("table_type", DataType::Utf8, false),
Field::new("definition", DataType::Utf8, true),
]);
let InformationSchemaTablesBuilder {
mut catalog_names,
mut schema_names,
mut table_names,
mut table_types,
mut definitions,
} = value;
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(catalog_names.finish()),
Arc::new(schema_names.finish()),
Arc::new(table_names.finish()),
Arc::new(table_types.finish()),
Arc::new(definitions.finish()),
],
)
.unwrap();
MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}
struct InformationSchemaColumnsBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
column_names: StringBuilder,
ordinal_positions: UInt64Builder,
column_defaults: StringBuilder,
is_nullables: StringBuilder,
data_types: StringBuilder,
character_maximum_lengths: UInt64Builder,
character_octet_lengths: UInt64Builder,
numeric_precisions: UInt64Builder,
numeric_precision_radixes: UInt64Builder,
numeric_scales: UInt64Builder,
datetime_precisions: UInt64Builder,
interval_types: StringBuilder,
}
impl InformationSchemaColumnsBuilder {
fn new() -> Self {
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
column_names: StringBuilder::new(default_capacity),
ordinal_positions: UInt64Builder::new(default_capacity),
column_defaults: StringBuilder::new(default_capacity),
is_nullables: StringBuilder::new(default_capacity),
data_types: StringBuilder::new(default_capacity),
character_maximum_lengths: UInt64Builder::new(default_capacity),
character_octet_lengths: UInt64Builder::new(default_capacity),
numeric_precisions: UInt64Builder::new(default_capacity),
numeric_precision_radixes: UInt64Builder::new(default_capacity),
numeric_scales: UInt64Builder::new(default_capacity),
datetime_precisions: UInt64Builder::new(default_capacity),
interval_types: StringBuilder::new(default_capacity),
}
}
#[allow(clippy::too_many_arguments)]
fn add_column(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
column_name: impl AsRef<str>,
column_position: usize,
is_nullable: bool,
data_type: &DataType,
) {
use DataType::*;
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.column_names
.append_value(column_name.as_ref())
.unwrap();
self.ordinal_positions
.append_value(column_position as u64)
.unwrap();
self.column_defaults.append_null().unwrap();
let nullable_str = if is_nullable { "YES" } else { "NO" };
self.is_nullables.append_value(nullable_str).unwrap();
self.data_types
.append_value(format!("{:?}", data_type))
.unwrap();
let max_chars = None;
self.character_maximum_lengths
.append_option(max_chars)
.unwrap();
let char_len: Option<u64> = match data_type {
Utf8 | Binary => Some(i32::MAX as u64),
LargeBinary | LargeUtf8 => Some(i64::MAX as u64),
_ => None,
};
self.character_octet_lengths
.append_option(char_len)
.unwrap();
let (numeric_precision, numeric_radix, numeric_scale) = match data_type {
Int8 | UInt8 => (Some(8), Some(2), None),
Int16 | UInt16 => (Some(16), Some(2), None),
Int32 | UInt32 => (Some(32), Some(2), None),
Float16 => (Some(15), Some(2), None),
Float32 => (Some(24), Some(2), None),
Float64 => (Some(24), Some(2), None),
Decimal(precision, scale) => {
(Some(*precision as u64), Some(10), Some(*scale as u64))
}
_ => (None, None, None),
};
self.numeric_precisions
.append_option(numeric_precision)
.unwrap();
self.numeric_precision_radixes
.append_option(numeric_radix)
.unwrap();
self.numeric_scales.append_option(numeric_scale).unwrap();
self.datetime_precisions.append_option(None).unwrap();
self.interval_types.append_null().unwrap();
}
}
impl From<InformationSchemaColumnsBuilder> for MemTable {
fn from(value: InformationSchemaColumnsBuilder) -> MemTable {
let schema = Schema::new(vec![
Field::new("table_catalog", DataType::Utf8, false),
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("ordinal_position", DataType::UInt64, false),
Field::new("column_default", DataType::Utf8, true),
Field::new("is_nullable", DataType::Utf8, false),
Field::new("data_type", DataType::Utf8, false),
Field::new("character_maximum_length", DataType::UInt64, true),
Field::new("character_octet_length", DataType::UInt64, true),
Field::new("numeric_precision", DataType::UInt64, true),
Field::new("numeric_precision_radix", DataType::UInt64, true),
Field::new("numeric_scale", DataType::UInt64, true),
Field::new("datetime_precision", DataType::UInt64, true),
Field::new("interval_type", DataType::Utf8, true),
]);
let InformationSchemaColumnsBuilder {
mut catalog_names,
mut schema_names,
mut table_names,
mut column_names,
mut ordinal_positions,
mut column_defaults,
mut is_nullables,
mut data_types,
mut character_maximum_lengths,
mut character_octet_lengths,
mut numeric_precisions,
mut numeric_precision_radixes,
mut numeric_scales,
mut datetime_precisions,
mut interval_types,
} = value;
let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(catalog_names.finish()),
Arc::new(schema_names.finish()),
Arc::new(table_names.finish()),
Arc::new(column_names.finish()),
Arc::new(ordinal_positions.finish()),
Arc::new(column_defaults.finish()),
Arc::new(is_nullables.finish()),
Arc::new(data_types.finish()),
Arc::new(character_maximum_lengths.finish()),
Arc::new(character_octet_lengths.finish()),
Arc::new(numeric_precisions.finish()),
Arc::new(numeric_precision_radixes.finish()),
Arc::new(numeric_scales.finish()),
Arc::new(datetime_precisions.finish()),
Arc::new(interval_types.finish()),
],
)
.unwrap();
MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}