流管理
StreamManager 是 Monibuca V6 的流管理核心,负责流的创建、发布、订阅和销毁的全生命周期管理。
┌─────────┐ ┌───────────┐ ┌─────────────┐ ┌──────────┐│ Create │────▶│ Publish │────▶│ Subscribe │────▶│ Destroy ││ (创建) │ │ (推流) │ │ (订阅) │ │ (销毁) │└─────────┘ └───────────┘ └─────────────┘ └──────────┘ │ │ │ ▲ │ │ │ │ │ ▼ ▼ │ │ Init Tracks Read Frames Dispose │ Set Codec Wait For Frames Clean Up │ Write Frames Unsubscribe Remove from Registry详细状态流转
Section titled “详细状态流转”Publisher 状态机:
Init │ ▼ (init_video_track / init_audio_track) TrackAdded │ ▼ (add_subscriber) Subscribed ◀─────────────────────────┐ │ │ ▼ (所有订阅者离开) │ WaitingSubscriber │ │ │ ├─── 新订阅者加入 ───────────────┘ │ ▼ (推流断开,启用断连续推) WaitingReconnect │ ├─── 重新推流 ──▶ Subscribed │ ▼ (超时) Disposed
特殊状态: Paused ──── 暂停数据超时检测(VOD/回放场景)StreamManager 职责
Section titled “StreamManager 职责”StreamManager 是流管理的统一入口,由三个子组件组成:
StreamManager├── StreamRegistry # 流注册表(DashMap 高并发存储)├── StreamLifecycle # 生命周期管理(创建/销毁/断连续推)└── WaitingQueue # 等待队列(订阅者等待尚未发布的流)核心 API
Section titled “核心 API”impl StreamManager { // 创建流(推流端调用) fn create_publisher(stream_path) -> Publisher
// 获取流(订阅端调用) fn get_publisher(stream_path) -> Option<Publisher>
// 带 Dispatcher 的创建 fn create_publisher_with_dispatcher(stream_path) -> (Publisher, Dispatcher)
// 事件订阅 fn subscribe_events() -> Receiver<StreamEvent>
// 流信息查询 fn stream_exists(stream_path) -> bool fn stream_count() -> usize fn list_streams() -> Vec<StreamInfo>}Publisher(推流者)
Section titled “Publisher(推流者)”Publisher 负责管理一路流的推流端:
pub struct Publisher { stream_path: String, // 流路径(如 "live/camera1") plugin_name: String, // 创建此流的插件名 stream_type: String, // 流类型(如 "rtmp", "rtsp") video_track: Option<Arc<VideoTrack>>, // 视频轨道 audio_track: Option<Arc<AudioTrack>>, // 音频轨道 subscribers: DashMap<u64, SubscriberInfo>, // 订阅者列表 frame_notify: watch::Sender<u64>, // 帧通知 channel config: PublisherConfig, // 配置 task: Option<Arc<Task>>, // 关联的任务(层级取消)}PublisherConfig { video_buffer_capacity: 1024, // 视频缓冲区大小 audio_buffer_capacity: 64, // 音频缓冲区大小 default_buffer_time: 2s, // 默认缓冲时间 max_buffer_time: 10s, // 最大缓冲时间 wait_timeout: 30s, // 无订阅者等待超时 max_fps: 0, // 最大帧率(0=不限) data_timeout: 60s, // 数据超时(0=不超时) continue_push_timeout: 30s, // 断连续推超时}// 1. 创建 Publisherlet publisher = stream_manager.create_publisher("live/stream1")?;
// 2. 初始化轨道let mut pub_guard = publisher.write();pub_guard.init_video_track();pub_guard.init_audio_track();
// 3. 设置编码器信息pub_guard.set_video_codec(VideoCodec::new(H264, 1920, 1080));pub_guard.set_audio_codec(AudioCodec::new(AAC, 44100, 2));
// 4. 写入帧数据(在推流循环中)pub_guard.write_video(IDR, timestamp, dts, data)?;pub_guard.write_audio(timestamp, data)?;Subscriber(订阅者)
Section titled “Subscriber(订阅者)”Subscriber 从 Publisher 的 RingBuffer 中消费帧数据:
三种订阅模式
Section titled “三种订阅模式”| 模式 | 说明 |
|---|---|
| RealTime | 从最新的 IDR 帧开始播放(默认) |
| Buffer | 从指定缓冲时间前的 IDR 帧开始 |
| WaitKeyframe | 等待下一个关键帧 |
SubscriberConfig { mode: SubscribeMode::RealTime, buffer_time: Duration::from_secs(2), receive_video: true, receive_audio: true, keyframe_timeout: Some(Duration::from_secs(30)),}// 1. 创建 Subscriberlet mut subscriber = Subscriber::new("live/stream1");
// 2. 订阅 Publisherlet publisher = stream_manager.get_publisher("live/stream1")?;subscriber.subscribe(&publisher)?;
// 3. 读取帧数据(在播放循环中)loop { subscriber.wait_for_frames().await?;
while let Ok(Some(frame)) = subscriber.read_video() { // 处理视频帧 }
while let Some(frame) = subscriber.read_audio() { // 处理音频帧 }}
// 4. 取消订阅subscriber.unsubscribe(&publisher);自动 Opus→AAC 转码
Section titled “自动 Opus→AAC 转码”当源流音频为 Opus 编码(如 WebRTC 推流)而订阅者需要 AAC(如 RTMP/FLV 播放)时,Subscriber 会自动进行转码:
WebRTC 推流 (Opus) ──▶ Publisher ──▶ Subscriber ──▶ AAC 自动转码 ──▶ RTMP 拉流 │ skip_opus_transcode=true │ WebRTC 订阅者直接消费 Opus音视频轨道管理
Section titled “音视频轨道管理”VideoTrack
Section titled “VideoTrack”VideoTrack├── RingBuffer (1024 slots) # 帧存储├── VideoCodec # 编码信息(H.264/H.265/AV1)├── SequenceGenerator # 帧序列号生成器├── Stats # BPS/FPS 统计└── IDR Tracking # 关键帧追踪VideoTrack 支持两种帧写入格式:
- AVCC 格式: 来自 RTMP/FLV(长度前缀 NAL 单元)
- Raw NAL 格式: 来自 RTSP/RTP(裸 NAL 单元列表)
AudioTrack
Section titled “AudioTrack”AudioTrack├── RingBuffer (64 slots) # 帧存储├── AudioCodec # 编码信息(AAC/Opus/G.711/...)├── SequenceGenerator # 帧序列号生成器└── Stats # BPS 统计序列头(Sequence Header)
Section titled “序列头(Sequence Header)”Publisher 维护视频和音频的序列头:
- 视频序列头: AVC/HEVC Decoder Configuration Record
- 音频序列头: AAC AudioSpecificConfig
新订阅者连接时,Dispatcher 会先发送序列头,确保解码器正确初始化。
Pull/Push 代理
Section titled “Pull/Push 代理”Pull Proxy(拉流代理)
Section titled “Pull Proxy(拉流代理)”从远程源拉流到本地发布:
远程 RTMP 服务器 ──拉流──▶ Monibuca Publisher ──▶ 本地订阅者支持的拉流协议:RTMP、RTSP、SRT、HLS、HTTP-FLV、MP4
Push Proxy(推流代理)
Section titled “Push Proxy(推流代理)”将本地流推送到远程服务器:
本地 Publisher ──推流──▶ 远程 RTMP/RTSP/SRT 服务器代理配置支持自动重试、重连间隔和最大重试次数。
录制模块将流数据写入文件:
Publisher ──▶ Subscriber (录制) ──▶ FileWriter │ ┌─────┼─────┐ ▼ ▼ ▼ FLV MP4 HLS │ fMP4 / Raw支持的录制格式:
| 格式 | 说明 |
|---|---|
| FLV | FLV 文件录制 |
| MP4 | 标准 MP4 文件 |
| fMP4 | 碎片化 MP4(Fragmented MP4) |
| HLS | HLS TS 分片 |
| Raw | 原始帧数据 |
录制支持 Normal(可自动删除)、High(受保护)和 Event(事件触发)三种优先级。
转换器(Transformer)
Section titled “转换器(Transformer)”Transformer 订阅源流,处理后发布为新流:
源流 "live/camera1" │ ▼Transformer (订阅 → 处理 → 发布) │ ▼目标流 "live/camera1_720p"应用场景:
- 视频转码(分辨率/编码格式转换)
- 添加水印/叠加层
- SEI 数据注入
- 格式转换
等待队列(WaitQueue)
Section titled “等待队列(WaitQueue)”当订阅者请求一个尚未发布的流时,不会立即失败,而是进入等待队列:
订阅者请求 "live/camera1" │ ├── 流已存在? ──是──▶ 立即订阅 │ └── 流不存在? ──▶ 加入 WaitQueue │ ▼ 等待流发布... │ ┌───────┴───────┐ │ │ 流已发布 等待超时 │ │ ▼ ▼ 自动订阅 返回错误WaitQueue 使流的发布和订阅解耦 — 订阅者可以先于推流端启动。这在以下场景特别有用:
- 按需拉流: 订阅触发 Pull Proxy 从远程源拉流
- 设备重连: 设备断线后订阅者等待重连
- 直播预约: 观众提前进入直播间等待开播
当推流端断开连接时,Monibuca 不会立即销毁流,而是进入 WaitingReconnect 状态:
推流端断开 │ ▼Publisher 状态 → WaitingReconnect │ ├── 在 continue_push_timeout 内重连 │ │ │ ▼ │ take_over(接管) │ • 继承时间戳基准 │ • 转移序列头 │ • 恢复订阅者列表 │ • 订阅者无感知切换 │ └── 超时未重连 │ ▼ Publisher → Disposed 通知所有订阅者 EOS关键设计: 断连续推保证了观看端在推流短暂中断时不需要重新连接,提供了无缝的观看体验。
默认断连续推超时为 30 秒,可通过 continue_push_timeout 配置。设置为 0 禁用此功能。
数据超时检测
Section titled “数据超时检测”Publisher 会检测推流端是否长时间未发送数据:
// 配置数据超时(默认 60 秒)data_timeout: Duration::from_secs(60)- 每次写入帧时更新
last_data_time - 超时后自动 Dispose Publisher
- 在 Paused 状态下超时检测被禁用(用于 VOD/回放场景)
- 设置为 0 禁用超时检测
StreamManager 通过 broadcast channel 发布流事件:
let mut events = stream_manager.subscribe_events();
loop { match events.recv().await { Ok(StreamEvent::Published { stream_path, .. }) => { println!("流发布: {}", stream_path); } Ok(StreamEvent::Unpublished { stream_path, .. }) => { println!("流停止: {}", stream_path); } Ok(StreamEvent::Subscribed { stream_path, subscriber_id, .. }) => { println!("新订阅: {} #{}", stream_path, subscriber_id); } _ => {} }}插件可以监听流事件来触发相应的业务逻辑(如按需录制、自动转码等)。
联系我们