跳转到内容

SDK API 参考

本页面提供 monibuca-sdk crate 的完整 API 参考,包括所有公开类型、Trait 接口和常用模块。

// 推荐:使用 prelude 一次性导入所有常用类型
use sdk::prelude::*;

prelude 包含:

类别类型
插件核心Plugin, PluginInfo, PluginState, PluginFactory, PluginCreateFn, PluginExport
任务管理TaskWork, TaskHandle, ChildTaskHandle
配置ConfigProvider, CommonConfig, TcpConfig, UdpConfig, ConfigReloadable
流管理StreamManagerApi, SharedStreamManager, StreamEvent, StreamManagerStats
发布/订阅Publisher, Subscriber, Dispatcher, DispatchFrame, SubscriberConfig, SubscribeMode
帧数据AVFrame, FrameType, VideoFrameType
编解码VideoCodec, VideoCodecType, AudioCodec, AudioCodecType
HTTPHttpHandler, HttpMethod, HttpRequest, HttpResponse, HttpRoute, HttpRouteProvider
错误MonibucaError, Result
事件EventBus, EventEnvelope, EventPriority
网络TcpSplitter
第三方 re-exportasync_trait, Bytes, BytesMut, CancellationToken, Mutex, RwLock, Serialize, Deserialize
日志tracing::info!, warn!, error!, debug!, trace!

#[async_trait]
pub trait Plugin: Send + Sync {
fn info(&self) -> PluginInfo;
fn state(&self) -> PluginState;
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()>;
async fn start(&mut self, cancel: CancellationToken) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
// 可选方法(有默认实现)
fn name(&self) -> &'static str;
fn is_enabled(&self) -> bool;
fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()>;
fn set_task_work(&mut self, work: Arc<dyn TaskWork>);
fn set_engine_context(&mut self, ctx: EngineContext);
fn http_routes(&self) -> Vec<HttpRoute>;
fn register_http_handlers(&mut self, server: SharedHttpServer);
}
pub struct PluginInfo {
pub name: &'static str,
pub version: &'static str,
pub description: &'static str,
pub author: &'static str,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PluginState {
Created,
Initialized,
Running,
Stopped,
Disabled,
Error,
}

#[async_trait]
pub trait StreamManagerApi: Send + Sync + 'static {
// ── 发布 ─────────────────────────────────────────────────────
fn create_publisher(&self, path: &str)
-> Result<Arc<RwLock<dyn Publisher>>>;
fn create_publisher_with_dispatcher(&self, path: &str)
-> Result<(Arc<RwLock<dyn Publisher>>, Arc<dyn Dispatcher>)>;
fn create_publisher_for_plugin(&self, path: &str, plugin_name: &str)
-> Result<Arc<RwLock<dyn Publisher>>>;
fn get_publisher(&self, path: &str)
-> Option<Arc<RwLock<dyn Publisher>>>;
fn get_dispatcher(&self, path: &str)
-> Option<Arc<dyn Dispatcher>>;
// ── 订阅 ─────────────────────────────────────────────────────
async fn subscribe(&self, path: &str, config: SubscriberConfig)
-> Result<(Arc<RwLock<dyn Publisher>>, Box<dyn Subscriber>)>;
fn subscribe_sync(&self, path: &str, config: SubscriberConfig)
-> Result<(Arc<RwLock<dyn Publisher>>, Box<dyn Subscriber>)>;
fn unsubscribe(&self, path: &str, subscriber: &mut dyn Subscriber);
// ── 查询 ─────────────────────────────────────────────────────
fn has_stream(&self, path: &str) -> bool;
fn stream_count(&self) -> usize;
fn stream_paths(&self) -> Vec<String>;
fn stats(&self) -> StreamManagerStats;
fn video_track(&self, path: &str) -> Option<Arc<dyn VideoTrack>>;
fn audio_track(&self, path: &str) -> Option<Arc<dyn AudioTrack>>;
// ── 事件 ─────────────────────────────────────────────────────
fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent>;
// ── 生命周期 ─────────────────────────────────────────────────
fn dispose_stream(&self, path: &str) -> bool;
fn force_dispose_stream(&self, path: &str) -> bool;
fn is_running(&self) -> bool;
}
pub enum StreamEvent {
Created(String), // 流路径
Disposed(String), // 流路径
SubscriberAdded { stream: String, .. }, // 新订阅者加入
SubscriberRemoved { stream: String, .. }, // 订阅者离开
}

pub trait Publisher: Send + Sync {
fn path(&self) -> &str;
fn subscriber_count(&self) -> usize;
fn plugin_name(&self) -> &str;
fn set_plugin_name(&mut self, name: &str);
fn state(&self) -> PublisherState;
// Track 初始化
fn init_video_track(&mut self);
fn init_audio_track(&mut self);
// 编解码器设置
fn set_video_codec(&mut self, codec: VideoCodec);
fn set_audio_codec(&mut self, codec: AudioCodec);
fn set_video_seq_header(&mut self, data: Bytes);
fn set_audio_seq_header(&mut self, data: Bytes);
fn video_seq_header(&self) -> Option<Bytes>;
fn audio_seq_header(&self) -> Option<Bytes>;
fn audio_codec(&self) -> Option<AudioCodec>;
// 帧写入
fn write_video(&self, frame_type: VideoFrameType,
pts: Duration, dts: Duration, data: Bytes) -> Result<()>;
fn write_video_raw(&self, frame_type: VideoFrameType,
timestamp: Duration, dts: Duration,
nalus: Vec<Bytes>) -> Result<()>;
fn write_audio(&self, timestamp: Duration, data: Bytes) -> Result<()>;
// 生命周期
fn dispose(&self);
// 扩展参数
fn set_extra_param(&mut self, key: &str, value: &str);
fn set_extra_params(&mut self, params: HashMap<String, String>);
}

pub trait Subscriber: Send + Sync {
fn id(&self) -> u64;
fn path(&self) -> &str;
fn mode(&self) -> SubscribeMode;
// 异步等待
fn wait_for_frames(&mut self)
-> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
// 帧读取
fn read_video(&mut self) -> Result<Option<AVFrame>>;
fn read_audio(&mut self) -> Option<AVFrame>;
// 状态查询
fn is_stopped(&self) -> bool;
fn has_video(&self) -> bool;
fn has_audio(&self) -> bool;
// 生命周期
fn unsubscribe(&mut self, publisher: &dyn Publisher);
}
pub struct SubscriberConfig {
pub mode: SubscribeMode,
pub buffer_time: Duration, // 默认 2s
pub receive_video: bool, // 默认 true
pub receive_audio: bool, // 默认 true
pub keyframe_timeout: Option<Duration>, // 默认 Some(30s)
}
pub enum SubscribeMode {
RealTime, // 从最新 IDR 关键帧开始
Buffer, // 从 buffer_time 前的 IDR 开始
WaitKeyframe, // 等待下一个关键帧
}

pub struct AVFrame {
pub timestamp: Duration,
pub video_frame_type: VideoFrameType,
pub video_codec: Option<Arc<VideoCodec>>,
pub audio_codec: Option<AudioCodec>,
// ... 内部字段
}
impl AVFrame {
// 视频格式转换(懒加载缓存)
pub fn get_flv_video_tag(&self) -> Bytes;
pub fn get_annexb(&self) -> Bytes;
pub fn get_avcc(&self) -> Bytes;
pub fn get_raw(&self) -> Bytes;
pub fn is_keyframe(&self) -> bool;
// 音频格式转换
pub fn get_flv_audio_tag(&self) -> Bytes;
pub fn get_audio_raw(&self) -> Bytes;
}

pub struct HttpRequest {
pub method: String,
pub path: String,
pub query: Option<String>,
pub headers: HashMap<String, String>,
pub body: Bytes,
pub params: HashMap<String, String>,
pub peer_addr: SocketAddr,
}
impl HttpRequest {
pub fn body_str(&self) -> Option<&str>;
pub fn parse_json_body<T: DeserializeOwned>(&self) -> Result<T>;
pub fn query_param(&self, key: &str) -> Option<String>;
pub fn path_param(&self, key: &str) -> Option<String>;
}
impl HttpResponse {
// 构造器
pub fn ok() -> Self; // 200
pub fn not_found() -> Self; // 404
pub fn bad_request() -> Self; // 400
pub fn internal_error() -> Self; // 500
pub fn new(status: u16) -> Self; // 自定义状态码
// Builder 方法
pub fn json<T: Serialize>(self, data: &T) -> Self;
pub fn text(self, text: &str) -> Self;
pub fn body(self, data: Bytes) -> Self;
pub fn content_type(self, ct: &str) -> Self;
pub fn header(self, key: &str, value: &str) -> Self;
pub fn cors_origin(self, origin: &str) -> Self;
}
pub struct HttpRoute {
pub method: HttpMethod,
pub path: String, // 支持通配符:"/api/**"
pub handler: Box<dyn HttpHandler>,
}
pub enum HttpMethod { Get, Post, Put, Delete, Patch, Options, Head }
#[async_trait]
pub trait HttpHandler: Send + Sync {
async fn handle(&self, req: HttpRequest) -> HttpResponse;
}

pub struct EngineContext { /* TypeId-keyed map */ }
impl EngineContext {
// 泛型 API
pub fn set<T: Any + Send + Sync>(&mut self, value: T);
pub fn get<T: Any + Send + Sync>(&self) -> Option<&T>;
pub fn has<T: Any + Send + Sync>(&self) -> bool;
// 预定义能力
pub fn database_arc(&self) -> Option<Arc<dyn DatabaseApi>>;
pub fn transform(&self) -> Option<Arc<dyn TransformApi>>;
pub fn playback(&self) -> Option<Arc<dyn PlaybackApi>>;
pub fn service_registry(&self) -> Option<Arc<dyn ServiceRegistry>>;
pub fn room_api(&self) -> Option<Arc<dyn RoomApi>>;
pub fn proxy_manager(&self) -> Option<Arc<dyn ProxyManager>>;
pub fn collection(&self) -> Option<Arc<dyn CollectionApi>>;
pub fn reporting(&self) -> Option<Arc<dyn ReportingApi>>;
pub fn utility_provider(&self) -> Option<Arc<dyn UtilityProvider>>;
pub fn http_registry(&self) -> Option<Arc<dyn HttpRegistry>>;
// 检查方法
pub fn has_database(&self) -> bool;
pub fn has_transform(&self) -> bool;
pub fn has_playback(&self) -> bool;
pub fn has_service_registry(&self) -> bool;
pub fn has_room_api(&self) -> bool;
pub fn has_proxy_manager(&self) -> bool;
pub fn has_collection(&self) -> bool;
pub fn has_reporting(&self) -> bool;
pub fn has_utility_provider(&self) -> bool;
pub fn has_http_registry(&self) -> bool;
}

pub trait TaskWork: Send + Sync + Any {
fn start(&self);
fn register_child(&self, owner: &str, task: Pin<Box<dyn Future<Output=()> + Send>>);
fn create_child_task(&self, owner: &str) -> Box<dyn ChildTaskHandle>;
fn remove_child_task(&self, child_id: u64);
fn task(&self) -> Option<Arc<dyn TaskHandle>>;
}
pub trait ChildTaskHandle: Send + Sync + Any {
fn start(&self);
fn stop(&self);
fn id(&self) -> u64;
fn cancellation_token(&self) -> CancellationToken;
}

pub trait ConfigProvider: Send + Sync + Any {
fn get(&self, key: &str) -> Option<serde_json::Value>;
fn get_plugin_config(&self, plugin_name: &str) -> Option<serde_json::Value>;
fn load_default_config(&self, plugin_name: &str, default_yaml: &str) -> Result<()>;
fn is_plugin_enabled(&self, plugin_name: &str) -> bool;
fn get_cors_config(&self) -> (bool, Vec<String>);
}

pub trait ServiceRegistry: Send + Sync {
// 协议工厂注册
fn register_puller_factory(&self, factory: Arc<dyn PullerFactory>);
fn register_pusher_factory(&self, factory: Arc<dyn PusherFactory>);
// 协议工厂查询
fn get_puller_factory(&self, protocol: &str) -> Option<Arc<dyn PullerFactory>>;
fn get_pusher_factory(&self, protocol: &str) -> Option<Arc<dyn PusherFactory>>;
fn get_puller_factory_for_url(&self, url: &str) -> Option<Arc<dyn PullerFactory>>;
fn get_pusher_factory_for_url(&self, url: &str) -> Option<Arc<dyn PusherFactory>>;
// 列出已注册协议
fn list_puller_protocols(&self) -> Vec<String>;
fn list_pusher_protocols(&self) -> Vec<String>;
// 通用服务注册(Any 类型)
fn register_service(&self, name: &str, service: Arc<dyn Any + Send + Sync>);
fn get_service(&self, name: &str) -> Option<Arc<dyn Any + Send + Sync>>;
}
#[async_trait]
pub trait PullerFactory: Send + Sync {
fn protocol(&self) -> &str;
fn supports_url(&self, url: &str) -> bool;
async fn create_puller(
&self, url: &str, stream_path: &str,
config: serde_json::Value,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>>;
}
#[async_trait]
pub trait PusherFactory: Send + Sync {
fn protocol(&self) -> &str;
fn supports_url(&self, url: &str) -> bool;
async fn create_pusher(
&self, url: &str, stream_path: &str,
config: serde_json::Value,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPusher>>>>;
}
#[async_trait]
pub trait StreamPuller: Send + Sync {
async fn start(&mut self) -> Result<()>;
fn stop(&self) -> Result<()>;
fn stream_path(&self) -> &str;
fn is_running(&self) -> bool;
}
#[async_trait]
pub trait StreamPusher: Send + Sync {
async fn start(&mut self) -> Result<()>;
fn stop(&self) -> Result<()>;
fn stream_path(&self) -> &str;
fn is_running(&self) -> bool;
}

/// 创建启用 SO_REUSEPORT 的 TCP 监听器
pub fn create_tcp_listener(addr: SocketAddr) -> std::io::Result<TcpListener>;
/// TCP 字节流分帧 Trait
pub trait TcpSplitter: Send {
fn split(&mut self, data: &[u8]) -> Vec<Vec<u8>>;
fn pending(&self) -> &[u8];
fn clear(&mut self);
}

#[async_trait]
pub trait EventBus: Send + Sync {
async fn register_user(&self, room_id: &str, user_id: String,
sender: mpsc::UnboundedSender<Bytes>) -> Result<()>;
async fn unregister_user(&self, room_id: &str, user_id: &str) -> Result<()>;
async fn dispatch_event<T: Serialize + Send + Sync>(
&self, room_id: &str, target_user_id: Option<&str>,
event_type: &str, data: &T) -> Result<()>;
async fn broadcast_event<T: Serialize + Send + Sync>(
&self, room_id: &str, event_type: &str, data: &T) -> Result<()>;
}
pub struct EventEnvelope {
pub event_type: String,
pub seq: Option<u64>,
pub timestamp: Option<i64>,
pub sender_id: Option<String>,
pub data: serde_json::Value,
}

pub enum MonibucaError {
Plugin(String), // 插件自定义错误
Io(std::io::Error), // IO 错误
Config(String), // 配置错误
Stream(String), // 流操作错误
Codec(String), // 编解码错误
// ... 其他变体
}
pub type Result<T> = std::result::Result<T, MonibucaError>;

pub enum VideoCodecType { H264, H265, AV1, VP8, VP9, }
pub enum AudioCodecType { AAC, Opus, G711A, G711U, MP3, }
pub enum VideoFrameType { Keyframe, Interframe, }
pub struct VideoCodec {
pub codec_type: VideoCodecType,
pub width: u32,
pub height: u32,
// ...
}
impl VideoCodec {
pub fn new(codec_type: VideoCodecType, width: u32, height: u32) -> Self;
pub fn with_record(self, data: Bytes) -> Self;
}
pub struct AudioCodec {
pub codec_type: AudioCodecType,
pub sample_rate: u32,
pub channels: u8,
}
impl AudioCodec {
pub fn new(codec_type: AudioCodecType, sample_rate: u32, channels: u8) -> Self;
}

sdk::export_plugin!(MyPlugin);
// 要求 MyPlugin: Default + Plugin
// 生成 C ABI 符号: plugin_api_version, plugin_create, plugin_metadata
#[derive(ConfigSchema)]
#[schema(plugin = "name", title = "Title", description = "Desc", sdk_path = "sdk")]
pub struct MyConfig { /* ... */ }
#[monibuca_plugin(name = "my-plugin", version = "0.1.0")]
pub struct MyPlugin { /* ... */ }
// 自动生成 Default 实现 + PLUGIN_NAME/PLUGIN_VERSION 常量

联系我们

微信公众号:不卡科技 微信公众号二维码
腾讯频道:流媒体技术 腾讯频道二维码
QQ 频道:p0qq0crz08 QQ 频道二维码
QQ 群:751639168 QQ 群二维码