跳转到内容

流管理

StreamManager 是 Monibuca V6 的流管理核心,负责流的创建、发布、订阅和销毁的全生命周期管理。

┌─────────┐ ┌───────────┐ ┌─────────────┐ ┌──────────┐
│ Create │────▶│ Publish │────▶│ Subscribe │────▶│ Destroy │
│ (创建) │ │ (推流) │ │ (订阅) │ │ (销毁) │
└─────────┘ └───────────┘ └─────────────┘ └──────────┘
│ │ │ ▲
│ │ │ │
│ ▼ ▼ │
│ Init Tracks Read Frames Dispose
│ Set Codec Wait For Frames Clean Up
│ Write Frames Unsubscribe Remove from Registry
Publisher 状态机:
Init
▼ (init_video_track / init_audio_track)
TrackAdded
▼ (add_subscriber)
Subscribed ◀─────────────────────────┐
│ │
▼ (所有订阅者离开) │
WaitingSubscriber │
│ │
├─── 新订阅者加入 ───────────────┘
▼ (推流断开,启用断连续推)
WaitingReconnect
├─── 重新推流 ──▶ Subscribed
▼ (超时)
Disposed
特殊状态:
Paused ──── 暂停数据超时检测(VOD/回放场景)

StreamManager 是流管理的统一入口,由三个子组件组成:

StreamManager
├── StreamRegistry # 流注册表(DashMap 高并发存储)
├── StreamLifecycle # 生命周期管理(创建/销毁/断连续推)
└── WaitingQueue # 等待队列(订阅者等待尚未发布的流)
impl StreamManager {
// 创建流(推流端调用)
fn create_publisher(stream_path) -> Publisher
// 获取流(订阅端调用)
fn get_publisher(stream_path) -> Option<Publisher>
// 带 Dispatcher 的创建
fn create_publisher_with_dispatcher(stream_path)
-> (Publisher, Dispatcher)
// 事件订阅
fn subscribe_events() -> Receiver<StreamEvent>
// 流信息查询
fn stream_exists(stream_path) -> bool
fn stream_count() -> usize
fn list_streams() -> Vec<StreamInfo>
}

Publisher 负责管理一路流的推流端:

pub struct Publisher {
stream_path: String, // 流路径(如 "live/camera1")
plugin_name: String, // 创建此流的插件名
stream_type: String, // 流类型(如 "rtmp", "rtsp")
video_track: Option<Arc<VideoTrack>>, // 视频轨道
audio_track: Option<Arc<AudioTrack>>, // 音频轨道
subscribers: DashMap<u64, SubscriberInfo>, // 订阅者列表
frame_notify: watch::Sender<u64>, // 帧通知 channel
config: PublisherConfig, // 配置
task: Option<Arc<Task>>, // 关联的任务(层级取消)
}
PublisherConfig {
video_buffer_capacity: 1024, // 视频缓冲区大小
audio_buffer_capacity: 64, // 音频缓冲区大小
default_buffer_time: 2s, // 默认缓冲时间
max_buffer_time: 10s, // 最大缓冲时间
wait_timeout: 30s, // 无订阅者等待超时
max_fps: 0, // 最大帧率(0=不限)
data_timeout: 60s, // 数据超时(0=不超时)
continue_push_timeout: 30s, // 断连续推超时
}
// 1. 创建 Publisher
let publisher = stream_manager.create_publisher("live/stream1")?;
// 2. 初始化轨道
let mut pub_guard = publisher.write();
pub_guard.init_video_track();
pub_guard.init_audio_track();
// 3. 设置编码器信息
pub_guard.set_video_codec(VideoCodec::new(H264, 1920, 1080));
pub_guard.set_audio_codec(AudioCodec::new(AAC, 44100, 2));
// 4. 写入帧数据(在推流循环中)
pub_guard.write_video(IDR, timestamp, dts, data)?;
pub_guard.write_audio(timestamp, data)?;

Subscriber 从 Publisher 的 RingBuffer 中消费帧数据:

模式说明
RealTime从最新的 IDR 帧开始播放(默认)
Buffer从指定缓冲时间前的 IDR 帧开始
WaitKeyframe等待下一个关键帧
SubscriberConfig {
mode: SubscribeMode::RealTime,
buffer_time: Duration::from_secs(2),
receive_video: true,
receive_audio: true,
keyframe_timeout: Some(Duration::from_secs(30)),
}
// 1. 创建 Subscriber
let mut subscriber = Subscriber::new("live/stream1");
// 2. 订阅 Publisher
let publisher = stream_manager.get_publisher("live/stream1")?;
subscriber.subscribe(&publisher)?;
// 3. 读取帧数据(在播放循环中)
loop {
subscriber.wait_for_frames().await?;
while let Ok(Some(frame)) = subscriber.read_video() {
// 处理视频帧
}
while let Some(frame) = subscriber.read_audio() {
// 处理音频帧
}
}
// 4. 取消订阅
subscriber.unsubscribe(&publisher);

当源流音频为 Opus 编码(如 WebRTC 推流)而订阅者需要 AAC(如 RTMP/FLV 播放)时,Subscriber 会自动进行转码:

WebRTC 推流 (Opus) ──▶ Publisher ──▶ Subscriber ──▶ AAC 自动转码 ──▶ RTMP 拉流
skip_opus_transcode=true
WebRTC 订阅者直接消费 Opus
VideoTrack
├── RingBuffer (1024 slots) # 帧存储
├── VideoCodec # 编码信息(H.264/H.265/AV1)
├── SequenceGenerator # 帧序列号生成器
├── Stats # BPS/FPS 统计
└── IDR Tracking # 关键帧追踪

VideoTrack 支持两种帧写入格式:

  • AVCC 格式: 来自 RTMP/FLV(长度前缀 NAL 单元)
  • Raw NAL 格式: 来自 RTSP/RTP(裸 NAL 单元列表)
AudioTrack
├── RingBuffer (64 slots) # 帧存储
├── AudioCodec # 编码信息(AAC/Opus/G.711/...)
├── SequenceGenerator # 帧序列号生成器
└── Stats # BPS 统计

Publisher 维护视频和音频的序列头:

  • 视频序列头: AVC/HEVC Decoder Configuration Record
  • 音频序列头: AAC AudioSpecificConfig

新订阅者连接时,Dispatcher 会先发送序列头,确保解码器正确初始化。

从远程源拉流到本地发布:

远程 RTMP 服务器 ──拉流──▶ Monibuca Publisher ──▶ 本地订阅者

支持的拉流协议:RTMP、RTSP、SRT、HLS、HTTP-FLV、MP4

将本地流推送到远程服务器:

本地 Publisher ──推流──▶ 远程 RTMP/RTSP/SRT 服务器

代理配置支持自动重试、重连间隔和最大重试次数。

录制模块将流数据写入文件:

Publisher ──▶ Subscriber (录制) ──▶ FileWriter
┌─────┼─────┐
▼ ▼ ▼
FLV MP4 HLS
fMP4 / Raw

支持的录制格式:

格式说明
FLVFLV 文件录制
MP4标准 MP4 文件
fMP4碎片化 MP4(Fragmented MP4)
HLSHLS TS 分片
Raw原始帧数据

录制支持 Normal(可自动删除)、High(受保护)和 Event(事件触发)三种优先级。

Transformer 订阅源流,处理后发布为新流:

源流 "live/camera1"
Transformer (订阅 → 处理 → 发布)
目标流 "live/camera1_720p"

应用场景:

  • 视频转码(分辨率/编码格式转换)
  • 添加水印/叠加层
  • SEI 数据注入
  • 格式转换

当订阅者请求一个尚未发布的流时,不会立即失败,而是进入等待队列:

订阅者请求 "live/camera1"
├── 流已存在? ──是──▶ 立即订阅
└── 流不存在? ──▶ 加入 WaitQueue
等待流发布...
┌───────┴───────┐
│ │
流已发布 等待超时
│ │
▼ ▼
自动订阅 返回错误

WaitQueue 使流的发布和订阅解耦 — 订阅者可以先于推流端启动。这在以下场景特别有用:

  • 按需拉流: 订阅触发 Pull Proxy 从远程源拉流
  • 设备重连: 设备断线后订阅者等待重连
  • 直播预约: 观众提前进入直播间等待开播

当推流端断开连接时,Monibuca 不会立即销毁流,而是进入 WaitingReconnect 状态:

推流端断开
Publisher 状态 → WaitingReconnect
├── 在 continue_push_timeout 内重连
│ │
│ ▼
│ take_over(接管)
│ • 继承时间戳基准
│ • 转移序列头
│ • 恢复订阅者列表
│ • 订阅者无感知切换
└── 超时未重连
Publisher → Disposed
通知所有订阅者 EOS

关键设计: 断连续推保证了观看端在推流短暂中断时不需要重新连接,提供了无缝的观看体验。

默认断连续推超时为 30 秒,可通过 continue_push_timeout 配置。设置为 0 禁用此功能。

Publisher 会检测推流端是否长时间未发送数据:

// 配置数据超时(默认 60 秒)
data_timeout: Duration::from_secs(60)
  • 每次写入帧时更新 last_data_time
  • 超时后自动 Dispose Publisher
  • Paused 状态下超时检测被禁用(用于 VOD/回放场景)
  • 设置为 0 禁用超时检测

StreamManager 通过 broadcast channel 发布流事件:

let mut events = stream_manager.subscribe_events();
loop {
match events.recv().await {
Ok(StreamEvent::Published { stream_path, .. }) => {
println!("流发布: {}", stream_path);
}
Ok(StreamEvent::Unpublished { stream_path, .. }) => {
println!("流停止: {}", stream_path);
}
Ok(StreamEvent::Subscribed { stream_path, subscriber_id, .. }) => {
println!("新订阅: {} #{}", stream_path, subscriber_id);
}
_ => {}
}
}

插件可以监听流事件来触发相应的业务逻辑(如按需录制、自动转码等)。

联系我们

微信公众号:不卡科技 微信公众号二维码
腾讯频道:流媒体技术 腾讯频道二维码
QQ 频道:p0qq0crz08 QQ 频道二维码
QQ 群:751639168 QQ 群二维码