krystv's picture
Upload 107 files
3374e90 verified
//! The BexRuntime β€” async callback-driven wrapper around bex_core::Engine.
//!
//! BexRuntime adds these capabilities on top of the sync bex_core::Engine:
//!
//! 1. **Callback-based async**: Plugin call results are delivered directly
//! to a C function pointer callback from a background Tokio thread.
//! No event queue, no polling.
//!
//! 2. **Lane-based scheduling**: Plugin calls are dispatched to tokio tasks
//! with concurrency limits per priority lane (Control, User, Background).
//!
//! 3. **Cancellation**: Each request gets a `CancellationToken`. The C++
//! backend can cancel via `bex_cancel_request()`.
//!
//! ## Architecture
//!
//! ```text
//! C++ Backend
//! β”‚
//! β”œβ”€β”€ bex_submit_search(engine, plugin_id, query, callback, user_data)
//! β”‚ β†’ returns request_id immediately
//! β”‚
//! β”‚ [Rust Tokio background thread]
//! β”‚ β”œβ”€β”€ Acquires scheduler permit
//! β”‚ β”œβ”€β”€ Executes plugin call via spawn_blocking
//! β”‚ └── Invokes callback(user_data, request_id, success, payload, len)
//! β”‚
//! β”œβ”€β”€ bex_cancel_request(engine, request_id)
//! β”‚
//! BexRuntime (this crate)
//! β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//! β”‚ Scheduler (lane semaphores) β”‚
//! β”‚ Cancellation Tokens (DashMap) β”‚
//! β”‚ bex_core::Engine (inner) β”‚
//! β”‚ Tokio Runtime (owned) β”‚
//! β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
//! ```
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use bex_types::BexError;
use dashmap::DashMap;
use tokio_util::sync::CancellationToken;
use crate::scheduler::{Scheduler, SchedulerConfig};
// ── Runtime State Machine ────────────────────────────────────────────
#[allow(dead_code)]
const STATE_NOT_READY: u8 = 0;
const STATE_READY: u8 = 1;
const STATE_DRAINING: u8 = 2;
const STATE_STOPPED: u8 = 3;
// ── BexRuntime ───────────────────────────────────────────────────────
/// The async runtime that wraps `bex_core::Engine` with callback-driven scheduling.
///
/// Created once at application startup. The C++ backend (via FFI)
/// calls the `bex_submit_*` functions to kick off plugin operations, and
/// receives results via the callback function pointer.
pub struct BexRuntime {
inner: Arc<RuntimeInner>,
}
struct RuntimeInner {
/// The underlying sync engine from bex-core.
engine: bex_core::Engine,
/// Owned tokio runtime β€” keeps the async threads alive.
runtime: Arc<tokio::runtime::Runtime>,
/// Lane-based scheduler for concurrency control.
#[allow(dead_code)]
scheduler: Scheduler,
/// Cancellation tokens per request.
cancellation: DashMap<u64, CancellationToken>,
/// Runtime state machine.
state: AtomicU8,
}
impl BexRuntime {
/// Create a new BexRuntime from the given engine config.
pub fn new(config: bex_core::EngineConfig) -> Result<Self, BexError> {
Self::with_scheduler_config(config, SchedulerConfig::default())
}
/// Create a new BexRuntime with custom scheduler limits.
pub fn with_scheduler_config(
config: bex_core::EngineConfig,
scheduler_config: SchedulerConfig,
) -> Result<Self, BexError> {
let engine = bex_core::Engine::new(config)?;
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?,
);
let scheduler = Scheduler::with_config(scheduler_config);
let inner = Arc::new(RuntimeInner {
engine,
runtime,
scheduler,
cancellation: DashMap::new(),
state: AtomicU8::new(STATE_READY),
});
Ok(Self { inner })
}
// ── Accessors for FFI layer ──────────────────────────────────────
/// Get a clone of the underlying bex-core Engine for use in spawned tasks.
pub fn clone_engine(&self) -> bex_core::Engine {
self.inner.engine.clone()
}
/// Get a handle to the Tokio runtime for spawning tasks.
pub fn tokio_handle(&self) -> tokio::runtime::Handle {
self.inner.runtime.handle().clone()
}
/// Insert a cancellation token for a request.
pub fn insert_cancellation(&self, request_id: u64, token: CancellationToken) {
self.inner.cancellation.insert(request_id, token);
}
/// Remove a cancellation token (after request completes).
pub fn remove_cancellation(&self, request_id: u64) {
self.inner.cancellation.remove(&request_id);
}
// ── Cancellation ────────────────────────────────────────────────
/// Cancel a pending request.
pub fn cancel_request(&self, request_id: u64) -> bool {
if let Some((_, token)) = self.inner.cancellation.remove(&request_id) {
token.cancel();
true
} else {
false
}
}
// ── Plugin Management (delegated to bex_core::Engine) ───────────
/// Install a plugin from a file path.
pub fn install_plugin(&self, path: &std::path::Path) -> Result<bex_types::plugin_info::PluginInfo, BexError> {
self.inner.engine.install_plugin(path)
}
/// Install a plugin from raw bytes.
pub fn install_bytes(&self, data: &[u8]) -> Result<bex_types::plugin_info::PluginInfo, BexError> {
self.inner.engine.install_bytes(data)
}
/// Uninstall a plugin.
pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> {
self.inner.engine.uninstall_plugin(id)
}
/// List all installed plugins.
pub fn list_plugins(&self) -> Vec<bex_types::plugin_info::PluginInfo> {
self.inner.engine.list_plugins()
}
/// Enable a plugin.
pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> {
self.inner.engine.enable_plugin(id)
}
/// Disable a plugin.
pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> {
self.inner.engine.disable_plugin(id)
}
/// Get plugin info.
pub fn get_plugin_info(&self, id: &str) -> Option<bex_types::plugin_info::PluginInfo> {
self.inner.engine.get_plugin_info(id)
}
// ── Secret / API Key Management ────────────────────────────────
/// Set a secret/API key for a plugin.
pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> {
self.inner.engine.secret_set(plugin_id, key, value)
}
/// Get a secret/API key for a plugin. Returns the value or None.
pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result<Option<String>, BexError> {
self.inner.engine.secret_get(plugin_id, key)
}
/// Delete a secret/API key for a plugin. Returns true if the key existed.
pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result<bool, BexError> {
self.inner.engine.secret_remove(plugin_id, key)
}
/// List all secret keys for a plugin.
pub fn secret_keys(&self, plugin_id: &str) -> Result<Vec<String>, BexError> {
self.inner.engine.secret_keys(plugin_id)
}
// ── JSON-based API calls (used by FFI) ─────────────────────────
/// Search for media. Returns JSON string.
pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result<String, BexError> {
self.inner.engine.call_search_json(plugin_id, query)
}
/// Get home page. Returns JSON string.
pub fn call_get_home_json(&self, plugin_id: &str) -> Result<String, BexError> {
self.inner.engine.call_get_home_json(plugin_id)
}
/// Get media info. Returns JSON string.
/// The media_id is opaque β€” the plugin knows how to interpret it.
pub fn call_get_info_json(&self, plugin_id: &str, media_id: &str) -> Result<String, BexError> {
self.inner.engine.call_get_info_json(plugin_id, media_id)
}
/// Get servers for an episode. Returns JSON string.
/// The id is self-describing β€” the plugin knows how to parse its own IDs.
pub fn call_get_servers_json(&self, plugin_id: &str, id: &str) -> Result<String, BexError> {
self.inner.engine.call_get_servers_json(plugin_id, id)
}
/// Resolve a stream URL. Returns JSON string.
pub fn call_resolve_stream_json(&self, plugin_id: &str, server_json: &str) -> Result<String, BexError> {
self.inner.engine.call_resolve_stream_json(plugin_id, server_json)
}
// ── Stats and Shutdown ──────────────────────────────────────────
/// Get engine stats.
pub fn stats(&self) -> bex_types::engine_types::EngineStats {
self.inner.engine.stats()
}
/// Shut down the runtime gracefully.
pub fn shutdown(&self) {
tracing::info!("BexRuntime shutting down...");
self.inner.state.store(STATE_DRAINING, Ordering::Release);
// Give active tasks a brief window to complete
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
while std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(50));
}
self.inner.state.store(STATE_STOPPED, Ordering::Release);
tracing::info!("BexRuntime shut down complete");
}
}