| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| use std::sync::atomic::{AtomicU8, Ordering}; |
| use std::sync::Arc; |
|
|
| use bex_types::BexError; |
| use dashmap::DashMap; |
| use tokio_util::sync::CancellationToken; |
|
|
| use crate::scheduler::{Scheduler, SchedulerConfig}; |
|
|
| |
|
|
| #[allow(dead_code)] |
| const STATE_NOT_READY: u8 = 0; |
| const STATE_READY: u8 = 1; |
| const STATE_DRAINING: u8 = 2; |
| const STATE_STOPPED: u8 = 3; |
|
|
| |
|
|
| |
| |
| |
| |
| |
| pub struct BexRuntime { |
| inner: Arc<RuntimeInner>, |
| } |
|
|
| struct RuntimeInner { |
| |
| engine: bex_core::Engine, |
|
|
| |
| runtime: Arc<tokio::runtime::Runtime>, |
|
|
| |
| #[allow(dead_code)] |
| scheduler: Scheduler, |
|
|
| |
| cancellation: DashMap<u64, CancellationToken>, |
|
|
| |
| state: AtomicU8, |
| } |
|
|
| impl BexRuntime { |
| |
| pub fn new(config: bex_core::EngineConfig) -> Result<Self, BexError> { |
| Self::with_scheduler_config(config, SchedulerConfig::default()) |
| } |
|
|
| |
| pub fn with_scheduler_config( |
| config: bex_core::EngineConfig, |
| scheduler_config: SchedulerConfig, |
| ) -> Result<Self, BexError> { |
| let engine = bex_core::Engine::new(config)?; |
|
|
| let runtime = Arc::new( |
| tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(4) |
| .enable_all() |
| .build() |
| .map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?, |
| ); |
|
|
| let scheduler = Scheduler::with_config(scheduler_config); |
|
|
| let inner = Arc::new(RuntimeInner { |
| engine, |
| runtime, |
| scheduler, |
| cancellation: DashMap::new(), |
| state: AtomicU8::new(STATE_READY), |
| }); |
|
|
| Ok(Self { inner }) |
| } |
|
|
| |
|
|
| |
| pub fn clone_engine(&self) -> bex_core::Engine { |
| self.inner.engine.clone() |
| } |
|
|
| |
| pub fn tokio_handle(&self) -> tokio::runtime::Handle { |
| self.inner.runtime.handle().clone() |
| } |
|
|
| |
| pub fn insert_cancellation(&self, request_id: u64, token: CancellationToken) { |
| self.inner.cancellation.insert(request_id, token); |
| } |
|
|
| |
| pub fn remove_cancellation(&self, request_id: u64) { |
| self.inner.cancellation.remove(&request_id); |
| } |
|
|
| |
|
|
| |
| pub fn cancel_request(&self, request_id: u64) -> bool { |
| if let Some((_, token)) = self.inner.cancellation.remove(&request_id) { |
| token.cancel(); |
| true |
| } else { |
| false |
| } |
| } |
|
|
| |
|
|
| |
| pub fn install_plugin(&self, path: &std::path::Path) -> Result<bex_types::plugin_info::PluginInfo, BexError> { |
| self.inner.engine.install_plugin(path) |
| } |
|
|
| |
| pub fn install_bytes(&self, data: &[u8]) -> Result<bex_types::plugin_info::PluginInfo, BexError> { |
| self.inner.engine.install_bytes(data) |
| } |
|
|
| |
| pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> { |
| self.inner.engine.uninstall_plugin(id) |
| } |
|
|
| |
| pub fn list_plugins(&self) -> Vec<bex_types::plugin_info::PluginInfo> { |
| self.inner.engine.list_plugins() |
| } |
|
|
| |
| pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> { |
| self.inner.engine.enable_plugin(id) |
| } |
|
|
| |
| pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> { |
| self.inner.engine.disable_plugin(id) |
| } |
|
|
| |
| pub fn get_plugin_info(&self, id: &str) -> Option<bex_types::plugin_info::PluginInfo> { |
| self.inner.engine.get_plugin_info(id) |
| } |
|
|
| |
|
|
| |
| pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> { |
| self.inner.engine.secret_set(plugin_id, key, value) |
| } |
|
|
| |
| pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result<Option<String>, BexError> { |
| self.inner.engine.secret_get(plugin_id, key) |
| } |
|
|
| |
| pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result<bool, BexError> { |
| self.inner.engine.secret_remove(plugin_id, key) |
| } |
|
|
| |
| pub fn secret_keys(&self, plugin_id: &str) -> Result<Vec<String>, BexError> { |
| self.inner.engine.secret_keys(plugin_id) |
| } |
|
|
| |
|
|
| |
| pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result<String, BexError> { |
| self.inner.engine.call_search_json(plugin_id, query) |
| } |
|
|
| |
| pub fn call_get_home_json(&self, plugin_id: &str) -> Result<String, BexError> { |
| self.inner.engine.call_get_home_json(plugin_id) |
| } |
|
|
| |
| |
| pub fn call_get_info_json(&self, plugin_id: &str, media_id: &str) -> Result<String, BexError> { |
| self.inner.engine.call_get_info_json(plugin_id, media_id) |
| } |
|
|
| |
| |
| pub fn call_get_servers_json(&self, plugin_id: &str, id: &str) -> Result<String, BexError> { |
| self.inner.engine.call_get_servers_json(plugin_id, id) |
| } |
|
|
| |
| pub fn call_resolve_stream_json(&self, plugin_id: &str, server_json: &str) -> Result<String, BexError> { |
| self.inner.engine.call_resolve_stream_json(plugin_id, server_json) |
| } |
|
|
| |
|
|
| |
| pub fn stats(&self) -> bex_types::engine_types::EngineStats { |
| self.inner.engine.stats() |
| } |
|
|
| |
| pub fn shutdown(&self) { |
| tracing::info!("BexRuntime shutting down..."); |
| self.inner.state.store(STATE_DRAINING, Ordering::Release); |
|
|
| |
| let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500); |
| while std::time::Instant::now() < deadline { |
| std::thread::sleep(std::time::Duration::from_millis(50)); |
| } |
|
|
| self.inner.state.store(STATE_STOPPED, Ordering::Release); |
| tracing::info!("BexRuntime shut down complete"); |
| } |
| } |
|
|