use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{exec_err, DataFusionError};
use std::any::Any;
use std::sync::Arc;
#[derive(Debug)]
pub struct MemoryCatalogProviderList {
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}
impl MemoryCatalogProviderList {
pub fn new() -> Self {
Self {
catalogs: DashMap::new(),
}
}
}
impl Default for MemoryCatalogProviderList {
fn default() -> Self {
Self::new()
}
}
impl CatalogProviderList for MemoryCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
self.catalogs.iter().map(|c| c.key().clone()).collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.get(name).map(|c| Arc::clone(c.value()))
}
}
#[derive(Debug)]
pub struct MemoryCatalogProvider {
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
impl MemoryCatalogProvider {
pub fn new() -> Self {
Self {
schemas: DashMap::new(),
}
}
}
impl Default for MemoryCatalogProvider {
fn default() -> Self {
Self::new()
}
}
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.schemas.iter().map(|s| s.key().clone()).collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).map(|s| Arc::clone(s.value()))
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.into(), schema))
}
fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
if let Some(schema) = self.schema(name) {
let table_names = schema.table_names();
match (table_names.is_empty(), cascade) {
(true, _) | (false, true) => {
let (_, removed) = self.schemas.remove(name).unwrap();
Ok(Some(removed))
}
(false, false) => exec_err!(
"Cannot drop schema {} because other tables depend on it: {}",
name,
itertools::join(table_names.iter(), ", ")
),
}
} else {
Ok(None)
}
}
}
#[derive(Debug)]
pub struct MemorySchemaProvider {
tables: DashMap<String, Arc<dyn TableProvider>>,
}
impl MemorySchemaProvider {
pub fn new() -> Self {
Self {
tables: DashMap::new(),
}
}
}
impl Default for MemorySchemaProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.tables
.iter()
.map(|table| table.key().clone())
.collect()
}
async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if self.table_exist(name.as_str()) {
return exec_err!("The table {name} already exists");
}
Ok(self.tables.insert(name, table))
}
fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.remove(name).map(|(_, table)| table))
}
fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}