use async_trait::async_trait;
use datafusion_common::config::{ConfigOptions, TableOptions};
use datafusion_common::{DFSchema, Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
use parking_lot::{Mutex, RwLock};
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
#[async_trait]
pub trait Session: Send + Sync {
fn session_id(&self) -> &str;
fn config(&self) -> &SessionConfig;
fn config_options(&self) -> &ConfigOptions {
self.config().options()
}
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>>;
fn create_physical_expr(
&self,
expr: Expr,
df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>>;
fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>>;
fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>>;
fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>>;
fn runtime_env(&self) -> &Arc<RuntimeEnv>;
fn execution_props(&self) -> &ExecutionProps;
fn as_any(&self) -> &dyn Any;
fn table_options(&self) -> &TableOptions;
fn default_table_options(&self) -> TableOptions {
self.table_options()
.combine_with_session_config(self.config_options())
}
fn table_options_mut(&mut self) -> &mut TableOptions;
fn task_ctx(&self) -> Arc<TaskContext>;
}
impl From<&dyn Session> for TaskContext {
fn from(state: &dyn Session) -> Self {
let task_id = None;
TaskContext::new(
task_id,
state.session_id().to_string(),
state.config().clone(),
state.scalar_functions().clone(),
state.aggregate_functions().clone(),
state.window_functions().clone(),
Arc::clone(state.runtime_env()),
)
}
}
type SessionRefLock = Arc<Mutex<Option<Weak<RwLock<dyn Session>>>>>;
#[derive(Debug)]
pub struct SessionStore {
session: SessionRefLock,
}
impl SessionStore {
pub fn new() -> Self {
Self {
session: Arc::new(Mutex::new(None)),
}
}
pub fn with_state(&self, state: Weak<RwLock<dyn Session>>) {
let mut lock = self.session.lock();
*lock = Some(state);
}
pub fn get_session(&self) -> Weak<RwLock<dyn Session>> {
self.session.lock().clone().unwrap()
}
}
impl Default for SessionStore {
fn default() -> Self {
Self::new()
}
}