插件开发实战
本页面通过三个由简到繁的完整插件示例,带你实践 Monibuca SDK 的核心开发模式。每个示例都可以直接编译运行。
示例一:流监控插件
Section titled “示例一:流监控插件”一个最实用的入门级插件——监控所有流的创建和销毁,并通过 HTTP API 提供实时统计数据。
plugins/monitor/├── Cargo.toml└── src/ ├── lib.rs ├── config.rs └── plugin.rsCargo.toml
Section titled “Cargo.toml”[package]name = "plugin-monitor"version = "0.1.0"edition = "2024"
[dependencies]sdk = { path = "../../crates/monibuca-sdk" }async-trait = "0.1"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"tokio = { version = "1", features = ["full"] }tokio-util = "0.7"tracing = "0.1"
[lib]crate-type = ["cdylib", "rlib"]src/lib.rs
Section titled “src/lib.rs”mod config;mod plugin;
pub use plugin::MonitorPlugin;
sdk::export_plugin!(MonitorPlugin);src/config.rs
Section titled “src/config.rs”use sdk::ConfigSchema;use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "monitor", title = "Stream Monitor", description = "Real-time stream monitoring and statistics", sdk_path = "sdk")]pub struct MonitorConfig { #[serde(default = "default_enable")] #[schema(label = "启用", desc = "是否启用监控插件")] pub enable: bool,
#[serde(default = "default_history_size")] #[schema(label = "历史记录数", desc = "保留最近的事件记录条数", min = 10, max = 10000)] pub history_size: usize,}
fn default_enable() -> bool { true }fn default_history_size() -> usize { 100 }
impl Default for MonitorConfig { fn default() -> Self { Self { enable: default_enable(), history_size: default_history_size(), } }}src/plugin.rs
Section titled “src/plugin.rs”use std::any::Any;use std::collections::VecDeque;use std::sync::Arc;use std::sync::atomic::{AtomicU64, Ordering};use async_trait::async_trait;use tokio_util::sync::CancellationToken;use sdk::prelude::*;use parking_lot::Mutex;
use crate::config::MonitorConfig;
/// 流事件记录#[derive(Debug, Clone, serde::Serialize)]struct EventRecord { timestamp: String, event_type: String, stream_path: String,}
/// 共享统计数据struct MonitorStats { total_created: AtomicU64, total_disposed: AtomicU64, total_subscribers: AtomicU64, event_history: Mutex<VecDeque<EventRecord>>, history_size: usize,}
impl MonitorStats { fn new(history_size: usize) -> Self { Self { total_created: AtomicU64::new(0), total_disposed: AtomicU64::new(0), total_subscribers: AtomicU64::new(0), event_history: Mutex::new(VecDeque::with_capacity(history_size)), history_size, } }
fn record_event(&self, event_type: &str, stream_path: &str) { let record = EventRecord { timestamp: chrono::Utc::now().to_rfc3339(), event_type: event_type.to_string(), stream_path: stream_path.to_string(), }; let mut history = self.event_history.lock(); if history.len() >= self.history_size { history.pop_front(); } history.push_back(record); }}
pub struct MonitorPlugin { state: PluginState, config: MonitorConfig, stream_manager: Option<Arc<dyn StreamManagerApi>>, stats: Option<Arc<MonitorStats>>,}
impl Default for MonitorPlugin { fn default() -> Self { Self { state: PluginState::Created, config: MonitorConfig::default(), stream_manager: None, stats: None, } }}
#[async_trait]impl Plugin for MonitorPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "monitor", version: "1.0.0", description: "Real-time stream monitoring", author: "Monibuca Team", } }
fn state(&self) -> PluginState { self.state }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { // 加载配置 self.config = config .get_plugin_config("monitor") .and_then(|v| serde_json::from_value(v).ok()) .unwrap_or_default();
if !self.config.enable { self.state = PluginState::Disabled; return Ok(()); }
self.stream_manager = Some(manager); self.stats = Some(Arc::new(MonitorStats::new(self.config.history_size))); self.state = PluginState::Initialized;
tracing::info!("Monitor plugin initialized, history_size={}", self.config.history_size); Ok(()) }
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { if self.state == PluginState::Disabled { return Ok(()); }
let manager = self.stream_manager.clone().unwrap(); let stats = self.stats.clone().unwrap();
// 启动事件监听协程 let mut event_rx = manager.subscribe_events(); tokio::spawn(async move { loop { tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("Monitor event listener stopped"); break; } event = event_rx.recv() => { match event { Ok(StreamEvent::Created(path)) => { stats.total_created.fetch_add(1, Ordering::Relaxed); stats.record_event("created", &path); tracing::info!("[Monitor] Stream created: {}", path); } Ok(StreamEvent::Disposed(path)) => { stats.total_disposed.fetch_add(1, Ordering::Relaxed); stats.record_event("disposed", &path); tracing::info!("[Monitor] Stream disposed: {}", path); } Ok(StreamEvent::SubscriberAdded { stream, .. }) => { stats.total_subscribers.fetch_add(1, Ordering::Relaxed); stats.record_event("subscriber_added", &stream); } Ok(StreamEvent::SubscriberRemoved { stream, .. }) => { stats.record_event("subscriber_removed", &stream); } Err(_) => break, } } } } });
self.state = PluginState::Running; tracing::info!("Monitor plugin started"); Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; Ok(()) }
/// 注册 HTTP API fn http_routes(&self) -> Vec<HttpRoute> { if let (Some(sm), Some(stats)) = (&self.stream_manager, &self.stats) { let handler = MonitorHandler { stream_manager: sm.clone(), stats: stats.clone(), }; vec![ HttpRoute { method: HttpMethod::Get, path: "/monitor/**".to_string(), handler: Box::new(handler), }, ] } else { Vec::new() } }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}
/// HTTP API Handlerstruct MonitorHandler { stream_manager: Arc<dyn StreamManagerApi>, stats: Arc<MonitorStats>,}
#[async_trait]impl HttpHandler for MonitorHandler { async fn handle(&self, req: HttpRequest) -> HttpResponse { let path = req.path.trim_start_matches('/'); let parts: Vec<&str> = path.split('/').collect(); let action = parts.first().copied().unwrap_or("summary");
match action { // GET /monitor/summary — 获取统计概要 "summary" => { let active_streams = self.stream_manager.stream_paths(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "data": { "active_streams": active_streams.len(), "stream_list": active_streams, "total_created": self.stats.total_created .load(Ordering::Relaxed), "total_disposed": self.stats.total_disposed .load(Ordering::Relaxed), "total_subscribers": self.stats.total_subscribers .load(Ordering::Relaxed), } })) }
// GET /monitor/events — 获取最近事件记录 "events" => { let history = self.stats.event_history.lock(); let events: Vec<_> = history.iter().cloned().collect(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "count": events.len(), "events": events, })) }
_ => HttpResponse::not_found().json(&serde_json::json!({ "code": 404, "message": "Unknown endpoint. Try /monitor/summary or /monitor/events" })), } }}# 启动 Monibuca 后,访问 APIcurl http://localhost:8180/monitor/summary# {"code":0,"data":{"active_streams":2,"stream_list":["live/test","live/camera"],...}}
curl http://localhost:8180/monitor/events# {"code":0,"count":5,"events":[{"timestamp":"...","event_type":"created","stream_path":"live/test"},...]}示例二:简易 TCP 协议服务器插件
Section titled “示例二:简易 TCP 协议服务器插件”一个完整的 TCP 服务器插件,实现简单的文本协议——客户端通过 TCP 发送命令来发布或订阅流。
简单文本协议,每行一个命令:
PUBLISH <stream_path> → 开始发布流SUBSCRIBE <stream_path> → 开始订阅流DATA <hex_bytes> → 发送数据帧(发布模式)STOP → 停止当前操作use std::any::Any;use std::net::SocketAddr;use std::sync::Arc;use std::sync::atomic::{AtomicU64, Ordering};use std::time::Duration;use async_trait::async_trait;use bytes::Bytes;use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};use tokio::net::TcpStream;use tokio_util::sync::CancellationToken;use sdk::prelude::*;
pub struct SimpleTcpPlugin { state: PluginState, listen_addr: SocketAddr, stream_manager: Option<Arc<dyn StreamManagerApi>>, task_work: Option<Arc<dyn TaskWork>>,}
impl Default for SimpleTcpPlugin { fn default() -> Self { Self { state: PluginState::Created, listen_addr: "0.0.0.0:9090".parse().unwrap(), stream_manager: None, task_work: None, } }}
#[async_trait]impl Plugin for SimpleTcpPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "simple-tcp", version: "0.1.0", description: "Simple TCP protocol plugin", author: "Tutorial", } }
fn state(&self) -> PluginState { self.state }
// 存储 TaskWork 引用 fn set_task_work(&mut self, work: Arc<dyn TaskWork>) { self.task_work = Some(work); }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { self.stream_manager = Some(manager);
// 解析监听地址 if let Some(cfg) = config.get_plugin_config("simple-tcp") { if let Some(addr) = cfg.get("listen_addr").and_then(|v| v.as_str()) { let addr_str = if addr.starts_with(':') { format!("0.0.0.0{}", addr) } else { addr.to_string() }; self.listen_addr = addr_str.parse().unwrap_or(self.listen_addr); } }
self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let addr = self.listen_addr; let manager = self.stream_manager.clone().unwrap(); let task_work = self.task_work.clone(); let session_counter = Arc::new(AtomicU64::new(0));
// 创建 TCP 监听器 let listener = sdk::create_tcp_listener(addr).map_err(|e| { MonibucaError::Io(std::io::Error::new( std::io::ErrorKind::AddrInUse, format!("Failed to bind {}: {}", addr, e), )) })?;
tracing::info!("Simple TCP server listening on {}", addr);
// TCP accept 主循环 let accept_loop = async move { loop { tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("TCP server shutting down"); break; } result = listener.accept() => { match result { Ok((stream, peer_addr)) => { let session_id = session_counter .fetch_add(1, Ordering::Relaxed); let manager = manager.clone(); let tw = task_work.clone();
// 为每个连接创建子任务 let (session_cancel, child) = if let Some(ref tw) = tw { let child = tw.create_child_task( &format!("TCP:{}", peer_addr) ); child.start(); let token = child.cancellation_token(); (token, Some(child)) } else { (cancel.child_token(), None) };
tokio::spawn(async move { tracing::info!( "Session {} from {}", session_id, peer_addr );
if let Err(e) = handle_session( stream, session_id, manager, session_cancel, ).await { tracing::warn!( "Session {} error: {}", session_id, e ); }
// 清理子任务 if let (Some(child), Some(tw)) = (child, tw) { child.stop(); tw.remove_child_task(child.id()); }
tracing::info!( "Session {} ended", session_id ); }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } } } };
// 注册到 TaskWork if let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(accept_loop)); } else { tokio::spawn(accept_loop); }
self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; tracing::info!("Simple TCP plugin stopped"); Ok(()) }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}
/// 处理单个 TCP 会话async fn handle_session( stream: TcpStream, session_id: u64, manager: Arc<dyn StreamManagerApi>, cancel: CancellationToken,) -> Result<()> { let (reader, mut writer) = stream.into_split(); let mut lines = BufReader::new(reader).lines();
writer.write_all(b"WELCOME simple-tcp/1.0\n").await .map_err(|e| MonibucaError::Io(e))?;
loop { tokio::select! { biased; _ = cancel.cancelled() => { writer.write_all(b"BYE server shutting down\n").await.ok(); break; } line = lines.next_line() => { match line { Ok(Some(line)) => { let parts: Vec<&str> = line.trim().splitn(2, ' ').collect(); let cmd = parts[0].to_uppercase(); let arg = parts.get(1).copied().unwrap_or("");
match cmd.as_str() { "PUBLISH" => { if arg.is_empty() { writer.write_all( b"ERR missing stream path\n" ).await.ok(); continue; } match do_publish( &manager, arg, &mut lines, &mut writer, &cancel, ).await { Ok(_) => { writer.write_all( b"OK publish ended\n" ).await.ok(); } Err(e) => { let msg = format!( "ERR publish failed: {}\n", e ); writer.write_all( msg.as_bytes() ).await.ok(); } } } "SUBSCRIBE" => { if arg.is_empty() { writer.write_all( b"ERR missing stream path\n" ).await.ok(); continue; } match do_subscribe( &manager, arg, &mut writer, &cancel, ).await { Ok(_) => { writer.write_all( b"OK subscribe ended\n" ).await.ok(); } Err(e) => { let msg = format!( "ERR subscribe failed: {}\n", e ); writer.write_all( msg.as_bytes() ).await.ok(); } } } "QUIT" => break, _ => { writer.write_all( b"ERR unknown command\n" ).await.ok(); } } } Ok(None) => break, // 连接关闭 Err(e) => { tracing::warn!("Read error: {}", e); break; } } } } }
Ok(())}
/// 发布流逻辑async fn do_publish( manager: &Arc<dyn StreamManagerApi>, stream_path: &str, lines: &mut tokio::io::Lines<BufReader<tokio::net::tcp::OwnedReadHalf>>, writer: &mut tokio::net::tcp::OwnedWriteHalf, cancel: &CancellationToken,) -> Result<()> { // 创建 Publisher let publisher = manager.create_publisher_for_plugin(stream_path, "simple-tcp")?; { let mut pub_guard = publisher.write(); pub_guard.init_video_track(); pub_guard.init_audio_track(); }
writer.write_all( format!("OK publishing to {}\n", stream_path).as_bytes() ).await.map_err(|e| MonibucaError::Io(e))?;
let mut frame_count: u64 = 0;
loop { tokio::select! { biased; _ = cancel.cancelled() => break, line = lines.next_line() => { match line { Ok(Some(line)) => { let line = line.trim(); if line == "STOP" { break; } if let Some(hex_data) = line.strip_prefix("DATA ") { // 简单示例:将 hex 数据作为视频帧写入 if let Ok(data) = hex::decode(hex_data) { let ts = Duration::from_millis(frame_count * 33); let frame_type = if frame_count % 30 == 0 { VideoFrameType::Keyframe } else { VideoFrameType::Interframe }; let _ = publisher.read().write_video( frame_type, ts, ts, Bytes::from(data), ); frame_count += 1; } } } Ok(None) => break, Err(_) => break, } } } }
// 清理 manager.dispose_stream(stream_path); Ok(())}
/// 订阅流逻辑async fn do_subscribe( manager: &Arc<dyn StreamManagerApi>, stream_path: &str, writer: &mut tokio::net::tcp::OwnedWriteHalf, cancel: &CancellationToken,) -> Result<()> { let config = SubscriberConfig { mode: SubscribeMode::RealTime, receive_video: true, receive_audio: true, keyframe_timeout: Some(Duration::from_secs(10)), ..Default::default() };
let (_pub_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
writer.write_all( format!("OK subscribed to {}\n", stream_path).as_bytes() ).await.map_err(|e| MonibucaError::Io(e))?;
loop { if subscriber.is_stopped() || cancel.is_cancelled() { break; }
// 读取并转发视频帧 while let Ok(Some(frame)) = subscriber.read_video() { let info = format!( "FRAME video ts={} keyframe={} size={}\n", frame.timestamp.as_millis(), frame.is_keyframe(), frame.get_raw().len(), ); if writer.write_all(info.as_bytes()).await.is_err() { manager.unsubscribe(stream_path, subscriber.as_mut()); return Ok(()); } }
// 读取并转发音频帧 while let Some(frame) = subscriber.read_audio() { let info = format!( "FRAME audio ts={} size={}\n", frame.timestamp.as_millis(), frame.get_audio_raw().len(), ); if writer.write_all(info.as_bytes()).await.is_err() { manager.unsubscribe(stream_path, subscriber.as_mut()); return Ok(()); } }
// 等待新帧 match tokio::time::timeout( Duration::from_millis(100), subscriber.wait_for_frames(), ).await { Ok(Ok(())) => continue, Ok(Err(_)) => break, Err(_) => continue, } }
manager.unsubscribe(stream_path, subscriber.as_mut()); Ok(())}示例三:协议拉流工厂插件
Section titled “示例三:协议拉流工厂插件”实现 PullerFactory,让其他插件(如 cluster 集群插件)能通过你的协议从远程拉流。
use std::any::Any;use std::sync::Arc;use std::time::Duration;use async_trait::async_trait;use tokio_util::sync::CancellationToken;use sdk::prelude::*;use sdk::services::{PullerFactory, StreamPuller};
// ════════════════════════════════════════════════════════════════// 插件主体// ════════════════════════════════════════════════════════════════
pub struct MyProtocolPlugin { state: PluginState, stream_manager: Option<Arc<dyn StreamManagerApi>>, engine_context: Option<EngineContext>,}
impl Default for MyProtocolPlugin { fn default() -> Self { Self { state: PluginState::Created, stream_manager: None, engine_context: None, } }}
#[async_trait]impl Plugin for MyProtocolPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "myproto", version: "0.1.0", description: "My custom protocol plugin", author: "Tutorial", } }
fn state(&self) -> PluginState { self.state }
// 接收 EngineContext,获取 ServiceRegistry fn set_engine_context(&mut self, ctx: EngineContext) { self.engine_context = Some(ctx); }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, _config: Arc<dyn ConfigProvider>, ) -> Result<()> { self.stream_manager = Some(manager.clone());
// 向 ServiceRegistry 注册拉流工厂 if let Some(ref ctx) = self.engine_context { if let Some(registry) = ctx.service_registry() { registry.register_puller_factory( Arc::new(MyProtoPullerFactory { default_manager: manager, }) ); tracing::info!("Registered myproto:// puller factory"); } }
self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> { self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; Ok(()) }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}
// ════════════════════════════════════════════════════════════════// PullerFactory 实现// ════════════════════════════════════════════════════════════════
struct MyProtoPullerFactory { default_manager: Arc<dyn StreamManagerApi>,}
#[async_trait]impl PullerFactory for MyProtoPullerFactory { fn protocol(&self) -> &str { "myproto" }
fn supports_url(&self, url: &str) -> bool { url.starts_with("myproto://") }
async fn create_puller( &self, url: &str, stream_path: &str, config: serde_json::Value, manager: Arc<dyn StreamManagerApi>, cancel_token: CancellationToken, ) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> { // 解析 URL let remote_addr = url.strip_prefix("myproto://") .ok_or_else(|| MonibucaError::Plugin("Invalid URL".to_string()))?;
// 从 JSON config 中读取可选参数 let timeout_ms = config.get("timeout_ms") .and_then(|v| v.as_u64()) .unwrap_or(5000);
let puller = MyProtoPuller { remote_addr: remote_addr.to_string(), stream_path: stream_path.to_string(), manager, cancel_token, timeout: Duration::from_millis(timeout_ms), running: false, };
Ok(Arc::new(tokio::sync::RwLock::new( Box::new(puller) as Box<dyn StreamPuller> ))) }}
// ════════════════════════════════════════════════════════════════// StreamPuller 实现// ════════════════════════════════════════════════════════════════
struct MyProtoPuller { remote_addr: String, stream_path: String, manager: Arc<dyn StreamManagerApi>, cancel_token: CancellationToken, timeout: Duration, running: bool,}
#[async_trait]impl StreamPuller for MyProtoPuller { async fn start(&mut self) -> Result<()> { self.running = true; tracing::info!( "Pulling from myproto://{} to {}", self.remote_addr, self.stream_path );
// 1. 连接远程服务器 let stream = tokio::time::timeout( self.timeout, tokio::net::TcpStream::connect(&self.remote_addr), ).await .map_err(|_| MonibucaError::Plugin("Connect timeout".to_string()))? .map_err(|e| MonibucaError::Io(e))?;
// 2. 创建本地 Publisher let publisher = self.manager.create_publisher_for_plugin( &self.stream_path, "myproto" )?; { let mut pub_guard = publisher.write(); pub_guard.init_video_track(); pub_guard.init_audio_track(); }
// 3. 从远程读取数据并写入 Publisher let cancel = self.cancel_token.clone(); let (reader, _writer) = stream.into_split(); let mut buf_reader = tokio::io::BufReader::new(reader);
loop { tokio::select! { biased; _ = cancel.cancelled() => break, // 这里应该是你的自定义协议解析逻辑 // 伪代码示意: result = read_protocol_frame(&mut buf_reader) => { match result { Ok(frame) => { publisher.read().write_video( frame.frame_type, frame.pts, frame.dts, frame.data, )?; } Err(_) => break, } } } }
// 4. 清理 self.manager.dispose_stream(&self.stream_path); self.running = false; Ok(()) }
fn stop(&self) -> Result<()> { self.cancel_token.cancel(); Ok(()) }
fn stream_path(&self) -> &str { &self.stream_path }
fn is_running(&self) -> bool { self.running }}其他插件如何使用你的拉流工厂
Section titled “其他插件如何使用你的拉流工厂”// 在任何其他插件中(例如 cluster 插件):if let Some(registry) = engine_context.service_registry() { // 按 URL 自动匹配工厂 if let Some(factory) = registry.get_puller_factory_for_url("myproto://192.168.1.100:9090") { let puller = factory.create_puller( "myproto://192.168.1.100:9090/live/test", "live/remote-test", serde_json::json!({"timeout_ms": 3000}), stream_manager.clone(), cancel_token.clone(), ).await?;
// 启动拉流 puller.write().await.start().await?; }}通过这三个示例,你已经掌握了 Monibuca 插件开发的核心模式:
| 模式 | 示例 | 关键技术 |
|---|---|---|
| HTTP API 插件 | 流监控 | http_routes(), HttpHandler, StreamEvent |
| TCP 服务器插件 | 简易 TCP | create_tcp_listener, TaskWork, 发布/订阅 |
| 协议工厂插件 | 拉流工厂 | EngineContext, ServiceRegistry, PullerFactory |
更多信息请参考:
- 插件开发指南 — 完整的概念讲解和 API 用法
- SDK API 参考 — 所有类型和接口的完整文档
- 插件系统 — 插件系统的架构设计
联系我们
微信公众号:不卡科技
腾讯频道:流媒体技术
QQ 频道:p0qq0crz08
QQ 群:751639168