Skip to content

直播系统设计

📺 大规模实时直播架构

流媒体系统设计

Q1: 设计一个支持百万人同时在线观看的直播系统?如何处理推流、转码和分发?

难度: ⭐⭐⭐⭐⭐

答案: 大规模直播系统需要解决低延迟、高并发、稳定传输等核心问题,涉及音视频处理、CDN分发和实时互动。

1. 整体系统架构:

mermaid
graph TB
    A[主播端] --> B[推流服务 RTMP]
    B --> C[流媒体服务器集群]
    
    C --> D[转码服务集群]
    D --> E[存储服务 OSS/S3]
    
    C --> F[CDN边缘节点]
    F --> G[观众端播放器]
    
    C --> H[聊天服务]
    C --> I[弹幕服务]
    C --> J[礼物打赏系统]
    
    K[管理后台] --> C
    L[数据分析] --> C
    
    M[(MySQL集群)] --> C
    N[(Redis集群)] --> C
    O[(Kafka消息队列)] --> C

2. 推流和接流服务:

推流服务核心实现:

java
@Service
public class StreamingService {
    
    @Autowired private RedisTemplate redisTemplate;
    @Autowired private StreamServerManager serverManager;
    @Autowired private AuthenticationService authService;
    
    public StreamingResult startStreaming(StartStreamRequest request) {
        try {
            // 1. 主播身份验证
            StreamerInfo streamer = authService.validateStreamer(request.getStreamKey());
            if (streamer == null) {
                return StreamingResult.fail("Invalid stream key");
            }
            
            // 2. 检查主播状态
            if (isStreamerAlreadyLive(streamer.getId())) {
                return StreamingResult.fail("Streamer is already live");
            }
            
            // 3. 分配最优推流服务器
            StreamServer server = serverManager.allocateOptimalServer(
                streamer.getRegion(), 
                request.getStreamConfig()
            );
            
            // 4. 生成推流地址
            String pushUrl = generatePushUrl(server, streamer, request);
            
            // 5. 初始化直播间
            LiveRoom room = initializeLiveRoom(streamer, request);
            
            // 6. 启动相关服务
            startAncillaryServices(room);
            
            return StreamingResult.success(pushUrl, room.getRoomId());
            
        } catch (Exception e) {
            log.error("Start streaming failed", e);
            return StreamingResult.fail("System error");
        }
    }
    
    private StreamServer allocateOptimalServer(String region, StreamConfig config) {
        // 基于负载、地理位置、服务器能力选择
        List<StreamServer> candidates = serverManager.getServersByRegion(region);
        
        return candidates.stream()
            .filter(server -> server.canHandle(config))
            .min(Comparator.comparing(StreamServer::getCurrentLoad))
            .orElseThrow(() -> new RuntimeException("No available stream server"));
    }
    
    private void startAncillaryServices(LiveRoom room) {
        // 启动弹幕服务
        danmakuService.initializeRoom(room.getRoomId());
        
        // 启动聊天服务
        chatService.initializeRoom(room.getRoomId());
        
        // 启动礼物系统
        giftService.initializeRoom(room.getRoomId());
        
        // 启动观众统计
        audienceService.initializeRoom(room.getRoomId());
    }
}

RTMP推流服务器:

java
@Component
public class RTMPStreamServer {
    
    private final NettyServer nettyServer;
    private final StreamProcessor streamProcessor;
    
    @PostConstruct
    public void startServer() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new RTMPChannelInitializer())
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true);
            
        bootstrap.bind(1935); // RTMP默认端口
    }
    
    public class RTMPChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            
            // RTMP协议处理
            pipeline.addLast("rtmp-decoder", new RTMPDecoder());
            pipeline.addLast("rtmp-encoder", new RTMPEncoder());
            
            // 流处理
            pipeline.addLast("stream-handler", new StreamHandler());
            
            // 业务处理
            pipeline.addLast("business-handler", new RTMPBusinessHandler());
        }
    }
    
    public class RTMPBusinessHandler extends SimpleChannelInboundHandler<RTMPMessage> {
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RTMPMessage msg) {
            switch (msg.getType()) {
                case CONNECT:
                    handleConnect(ctx, msg);
                    break;
                case PUBLISH:
                    handlePublish(ctx, msg);
                    break;
                case PLAY:
                    handlePlay(ctx, msg);
                    break;
                case AUDIO_DATA:
                case VIDEO_DATA:
                    handleMediaData(ctx, msg);
                    break;
            }
        }
        
        private void handleMediaData(ChannelHandlerContext ctx, RTMPMessage msg) {
            // 1. 数据校验
            if (!validateMediaData(msg)) {
                ctx.close();
                return;
            }
            
            // 2. 转发给转码服务
            transcodeService.processMediaFrame(msg);
            
            // 3. 转发给观众端
            distributeToViewers(msg);
            
            // 4. 录制存储
            recordingService.saveFrame(msg);
        }
    }
}

3. 音视频转码服务:

多码率转码实现:

java
@Service
public class TranscodeService {
    
    @Autowired private FFmpegService ffmpegService;
    @Autowired private TaskScheduler taskScheduler;
    
    public void processStream(String inputUrl, String roomId) {
        TranscodeConfig config = TranscodeConfig.builder()
            .inputUrl(inputUrl)
            .outputFormats(Arrays.asList(
                // 多码率配置
                new OutputFormat("1080p", 1920, 1080, 6000, "h264", "aac"),
                new OutputFormat("720p", 1280, 720, 3000, "h264", "aac"),
                new OutputFormat("480p", 854, 480, 1500, "h264", "aac"),
                new OutputFormat("360p", 640, 360, 800, "h264", "aac")
            ))
            .build();
        
        // 创建转码任务
        TranscodeTask task = new TranscodeTask(config, roomId);
        taskScheduler.schedule(task, Instant.now());
    }
    
    public class TranscodeTask implements Runnable {
        private final TranscodeConfig config;
        private final String roomId;
        
        @Override
        public void run() {
            try {
                // FFmpeg转码命令构建
                List<String> commands = buildFFmpegCommand();
                
                // 执行转码
                Process process = new ProcessBuilder(commands)
                    .redirectErrorStream(true)
                    .start();
                
                // 监控转码进度
                monitorTranscodeProgress(process, roomId);
                
                // 等待转码完成
                int exitCode = process.waitFor();
                
                if (exitCode == 0) {
                    log.info("Transcode completed successfully for room: {}", roomId);
                    notifyTranscodeSuccess(roomId);
                } else {
                    log.error("Transcode failed for room: {}, exit code: {}", roomId, exitCode);
                    notifyTranscodeFailed(roomId);
                }
                
            } catch (Exception e) {
                log.error("Transcode task error", e);
                notifyTranscodeFailed(roomId);
            }
        }
        
        private List<String> buildFFmpegCommand() {
            List<String> commands = new ArrayList<>();
            commands.add("ffmpeg");
            commands.add("-i"); 
            commands.add(config.getInputUrl());
            
            // 多码率输出
            for (OutputFormat format : config.getOutputFormats()) {
                commands.addAll(Arrays.asList(
                    "-c:v", "libx264",
                    "-preset", "ultrafast", // 低延迟配置
                    "-tune", "zerolatency",
                    "-profile:v", "baseline",
                    "-s", format.getWidth() + "x" + format.getHeight(),
                    "-b:v", format.getBitrate() + "k",
                    "-maxrate", format.getBitrate() + "k",
                    "-bufsize", (format.getBitrate() * 2) + "k",
                    "-c:a", "aac",
                    "-b:a", "128k",
                    "-f", "flv",
                    generateOutputUrl(roomId, format.getName())
                ));
            }
            
            return commands;
        }
    }
}

GPU加速转码:

java
@Service
public class GPUTranscodeService {
    
    public void processStreamWithGPU(String inputUrl, String roomId) {
        // 使用NVIDIA NVENC进行硬件加速转码
        List<String> commands = Arrays.asList(
            "ffmpeg",
            "-hwaccel", "cuda",
            "-hwaccel_output_format", "cuda",
            "-i", inputUrl,
            
            // 1080p output with NVENC
            "-c:v", "h264_nvenc",
            "-preset", "llhq", // 低延迟高质量
            "-rc", "cbr",
            "-cbr", "true",
            "-s", "1920x1080",
            "-b:v", "6000k",
            "-maxrate", "6000k",
            "-bufsize", "12000k",
            "-c:a", "aac",
            "-b:a", "128k",
            "-f", "flv",
            generateOutputUrl(roomId, "1080p"),
            
            // 720p output
            "-c:v", "h264_nvenc",
            "-preset", "llhq",
            "-rc", "cbr",
            "-s", "1280x720",
            "-b:v", "3000k",
            "-maxrate", "3000k",
            "-bufsize", "6000k",
            "-c:a", "aac",
            "-b:a", "128k",
            "-f", "flv",
            generateOutputUrl(roomId, "720p")
        );
        
        executeTranscode(commands, roomId);
    }
}

4. CDN分发和播放:

CDN配置和负载均衡:

java
@Service
public class CDNService {
    
    private final List<CDNNode> cdnNodes;
    private final LoadBalancer loadBalancer;
    
    public String generatePlayUrl(String roomId, String quality, String region, String userIp) {
        // 1. 根据用户地理位置选择最近CDN节点
        CDNNode nearestNode = findNearestCDNNode(region, userIp);
        
        // 2. 检查节点健康状态和负载
        if (!isNodeHealthy(nearestNode) || isNodeOverloaded(nearestNode)) {
            nearestNode = findAlternativeCDNNode(region);
        }
        
        // 3. 生成播放URL
        String playUrl = String.format("http://%s/live/%s_%s.m3u8", 
            nearestNode.getDomain(), roomId, quality);
        
        // 4. 添加防盗链参数
        playUrl = addAntiLeechParams(playUrl, roomId);
        
        return playUrl;
    }
    
    private CDNNode findNearestCDNNode(String region, String userIp) {
        // 基于地理位置和网络延迟选择CDN节点
        return cdnNodes.stream()
            .filter(node -> node.getRegion().equals(region))
            .filter(this::isNodeHealthy)
            .min(Comparator.comparing(node -> calculateLatency(node, userIp)))
            .orElse(getDefaultCDNNode());
    }
    
    @Scheduled(fixedRate = 30000) // 30秒检查一次
    public void monitorCDNNodes() {
        cdnNodes.parallelStream().forEach(node -> {
            try {
                // 健康检查
                boolean healthy = pingNode(node);
                node.setHealthy(healthy);
                
                // 负载检查
                CDNMetrics metrics = getNodeMetrics(node);
                node.setCurrentLoad(metrics.getCpuUsage());
                node.setBandwidthUsage(metrics.getBandwidthUsage());
                
                // 自动扩容检查
                if (metrics.getCpuUsage() > 80 || metrics.getBandwidthUsage() > 85) {
                    triggerAutoScaling(node);
                }
                
            } catch (Exception e) {
                log.error("CDN node health check failed: {}", node.getId(), e);
                node.setHealthy(false);
            }
        });
    }
}

播放器适配:

javascript
// HLS播放器实现
class LivePlayer {
    constructor(containerId, options) {
        this.container = document.getElementById(containerId);
        this.options = options;
        this.hls = null;
        this.video = null;
        this.isLive = true;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        
        this.init();
    }
    
    init() {
        // 创建video元素
        this.video = document.createElement('video');
        this.video.controls = true;
        this.video.autoplay = true;
        this.video.muted = this.options.muted || false;
        this.container.appendChild(this.video);
        
        // 初始化HLS播放器
        if (Hls.isSupported()) {
            this.hls = new Hls({
                enableWorker: true,
                lowLatencyMode: true,
                backBufferLength: 90,
                maxBufferLength: 30,
                maxMaxBufferLength: 120,
                liveBackBufferLength: 5,
                liveSyncDurationCount: 3,
                liveMaxLatencyDurationCount: 5,
                liveDurationInfinity: true,
                highBufferWatchdogPeriod: 1,
                nudgeOffset: 0.1,
                nudgeMaxRetry: 10,
                maxSeekHole: 2
            });
            
            this.setupHLSEvents();
        } else if (this.video.canPlayType('application/vnd.apple.mpegurl')) {
            // Safari native HLS support
            this.video.src = this.options.playUrl;
        }
        
        this.setupVideoEvents();
    }
    
    setupHLSEvents() {
        this.hls.on(Hls.Events.MANIFEST_PARSED, () => {
            console.log('Manifest parsed, starting playback');
            this.video.play();
        });
        
        this.hls.on(Hls.Events.ERROR, (event, data) => {
            console.error('HLS Error:', data);
            
            if (data.fatal) {
                switch (data.type) {
                    case Hls.ErrorTypes.NETWORK_ERROR:
                        console.log('Network error, trying to recover...');
                        this.handleNetworkError();
                        break;
                    case Hls.ErrorTypes.MEDIA_ERROR:
                        console.log('Media error, trying to recover...');
                        this.hls.recoverMediaError();
                        break;
                    default:
                        console.log('Fatal error, destroying HLS...');
                        this.destroy();
                        break;
                }
            }
        });
        
        // 质量切换
        this.hls.on(Hls.Events.LEVEL_SWITCHED, (event, data) => {
            console.log('Quality switched to level:', data.level);
            this.onQualityChanged(data.level);
        });
    }
    
    handleNetworkError() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            console.log(`Reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
            
            setTimeout(() => {
                this.hls.startLoad();
            }, 1000 * Math.pow(2, this.reconnectAttempts)); // 指数退避
        } else {
            console.error('Max reconnection attempts reached');
            this.onPlayError('Network connection failed');
        }
    }
    
    // 动态码率调整
    enableAdaptiveBitrate() {
        this.hls.on(Hls.Events.FRAG_LOAD_EMERGENCY_ABORTED, () => {
            // 网络状况不佳时自动降低码率
            const currentLevel = this.hls.currentLevel;
            if (currentLevel > 0) {
                this.hls.nextLevel = currentLevel - 1;
                console.log('Auto-switched to lower quality due to network conditions');
            }
        });
    }
    
    // 延迟优化
    optimizeLatency() {
        this.video.addEventListener('canplay', () => {
            // 追赶直播进度
            if (this.isLive && this.video.buffered.length > 0) {
                const bufferedEnd = this.video.buffered.end(this.video.buffered.length - 1);
                const currentTime = this.video.currentTime;
                const latency = bufferedEnd - currentTime;
                
                if (latency > 10) { // 延迟超过10秒
                    this.video.currentTime = bufferedEnd - 3; // 跳到最新位置
                    console.log('Latency catch-up performed');
                }
            }
        });
    }
}

5. 实时互动功能:

弹幕系统:

java
@RestController
@RequestMapping("/api/danmaku")
public class DanmakuController {
    
    @Autowired private DanmakuService danmakuService;
    @Autowired private WebSocketManager webSocketManager;
    
    @PostMapping("/send")
    public ResponseEntity<DanmakuResult> sendDanmaku(@RequestBody SendDanmakuRequest request) {
        try {
            // 1. 参数校验
            if (!validateDanmakuContent(request.getContent())) {
                return ResponseEntity.badRequest()
                    .body(DanmakuResult.fail("弹幕内容不符合规范"));
            }
            
            // 2. 用户权限检查
            if (!checkUserPermission(request.getUserId(), request.getRoomId())) {
                return ResponseEntity.status(403)
                    .body(DanmakuResult.fail("您没有在此直播间发言的权限"));
            }
            
            // 3. 频率限制
            if (!checkRateLimit(request.getUserId())) {
                return ResponseEntity.status(429)
                    .body(DanmakuResult.fail("发言太频繁,请稍后再试"));
            }
            
            // 4. 内容审核
            AuditResult auditResult = contentAuditService.audit(request.getContent());
            if (!auditResult.isPass()) {
                return ResponseEntity.badRequest()
                    .body(DanmakuResult.fail("弹幕内容违规"));
            }
            
            // 5. 创建弹幕
            Danmaku danmaku = Danmaku.builder()
                .roomId(request.getRoomId())
                .userId(request.getUserId())
                .content(request.getContent())
                .color(request.getColor())
                .position(request.getPosition())
                .timestamp(System.currentTimeMillis())
                .build();
            
            // 6. 保存弹幕
            danmakuService.saveDanmaku(danmaku);
            
            // 7. 实时推送给观众
            broadcastDanmaku(danmaku);
            
            return ResponseEntity.ok(DanmakuResult.success(danmaku));
            
        } catch (Exception e) {
            log.error("Send danmaku failed", e);
            return ResponseEntity.status(500)
                .body(DanmakuResult.fail("系统错误"));
        }
    }
    
    private void broadcastDanmaku(Danmaku danmaku) {
        // 构造推送消息
        DanmakuMessage message = DanmakuMessage.builder()
            .type("danmaku")
            .roomId(danmaku.getRoomId())
            .userId(danmaku.getUserId())
            .username(userService.getUsername(danmaku.getUserId()))
            .content(danmaku.getContent())
            .color(danmaku.getColor())
            .timestamp(danmaku.getTimestamp())
            .build();
        
        // 推送给所有房间观众
        webSocketManager.broadcastToRoom(danmaku.getRoomId(), message);
        
        // 异步存储到消息队列供其他服务消费
        kafkaTemplate.send("danmaku-events", message);
    }
}

面试要点:

  • 理解直播系统的核心组件:推流、转码、分发、播放
  • 掌握RTMP协议和HLS/DASH流媒体协议的特点
  • 了解CDN在直播分发中的作用和优化策略
  • 掌握实时互动功能的实现和性能优化

这个直播系统设计涵盖了完整的音视频流处理链路,包括推流接入、转码处理、CDN分发、实时互动等核心技术,适合流媒体和音视频领域的高级工程师面试。

正在精进