如何设计IM即时通讯系统
一、问题描述
1.1 业务背景
IM(Instant Messaging)即时通讯系统是互联网最核心的应用之一,广泛应用于:
- 社交产品:微信、QQ、钉钉、Slack
- 电商平台:客服系统、买卖家沟通
- 企业协作:内部沟通、远程办公
- 游戏应用:团队语音、文字聊天
1.2 核心功能
基础功能:
- 单聊:一对一文本、图片、语音、视频消息
- 群聊:多人聊天,支持@某人、群公告
- 离线消息:用户离线时消息存储,上线后拉取
- 已读回执:单聊已读、群聊已读
- 消息同步:多端登录,消息实时同步
进阶功能:
- 语音视频通话:实时音视频通信
- 文件传输:大文件分片上传下载
- 消息搜索:全文搜索历史消息
- 消息撤回:2分钟内可撤回
- @提醒:群聊中@某人
1.3 技术挑战
海量在线连接:
- 百万级同时在线用户
- 每个用户一个长连接
- 连接保活和断线重连
消息可靠性:
- 消息不丢失(At Least Once)
- 消息不重复(幂等)
- 消息有序(同一会话)
高并发消息:
- 每秒百万级消息
- 群聊消息扩散
- 实时性要求高(<100ms)
存储挑战:
- 海量历史消息存储
- 快速查询和检索
- 成本控制
1.4 面试考察点
- 架构设计能力:如何设计可扩展的IM架构?
- 长连接管理:如何管理百万级长连接?
- 消息可靠性:如何保证消息不丢失?
- 性能优化:如何优化群聊性能?
- 多端同步:如何实现多端消息同步?
二、需求分析
2.1 功能性需求
| 需求 | 描述 | 优先级 |
|---|---|---|
| FR1 | 支持单聊文本消息 | P0 |
| FR2 | 支持群聊(<500人) | P0 |
| FR3 | 支持离线消息 | P0 |
| FR4 | 支持已读回执 | P1 |
| FR5 | 支持多端同步 | P1 |
| FR6 | 支持消息撤回 | P1 |
| FR7 | 支持文件传输 | P1 |
| FR8 | 支持消息搜索 | P2 |
2.2 非功能性需求
性能需求:
- 同时在线用户:100万+
- 消息QPS:10万+
- 消息延迟:<100ms(P95)
- 连接保持:24小时+
可靠性需求:
- 消息不丢失:99.99%
- 系统可用性:99.95%
- 消息有序性:同一会话100%有序
存储需求:
- 消息保存:永久(重要消息)
- 历史消息:快速查询
- 存储成本:可控
安全需求:
- 消息加密传输(TLS)
- 敏感内容过滤
- 用户隐私保护
2.3 约束条件
- 网络环境:需要处理弱网、断网场景
- 设备多样:iOS、Android、Web、PC多端
- 流量成本:需要优化网络流量
- 电量消耗:移动端要节省电量
2.4 边界场景
- 连接断开:网络断开、切换网络
- 消息乱序:网络延迟导致消息乱序
- 重复消息:网络重传导致重复
- 大群聊:500人群聊消息风暴
- 多端登录:同一账号多设备登录
三、技术选型
3.1 通信协议选择
WebSocket vs TCP vs HTTP
| 协议 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| WebSocket | 浏览器原生支持、双向通信 | 不支持UDP | Web端 |
| TCP | 可靠传输、自定义协议 | 需要心跳保活 | 移动端(推荐) |
| HTTP长轮询 | 兼容性好 | 延迟高、效率低 | 降级方案 |
| HTTP/2 Server Push | 服务器推送 | 浏览器支持有限 | 部分Web场景 |
推荐方案:
- 移动端:TCP长连接 + 自定义协议(protobuf)
- Web端:WebSocket
- 降级方案:HTTP长轮询
3.2 消息存储选择
| 存储 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| MySQL | 事务支持、可靠性高 | 性能一般、成本高 | 关键元数据 |
| MongoDB | 高性能、灵活 | 无事务 | 消息存储(推荐) |
| HBase | 海量存储、成本低 | 查询受限 | 历史消息归档 |
| Redis | 极高性能 | 易丢失、成本高 | 在线状态、最近消息 |
推荐方案:
- 在线状态:Redis(临时数据)
- 最近消息:MongoDB(7天内)
- 历史消息:HBase(7天外)
- 元数据:MySQL(用户、群组)
3.3 推送方案选择
| 方案 | 实时性 | 可靠性 | 成本 | 适用场景 |
|---|---|---|---|---|
| 长连接推送 | 最高 | 高 | 高 | 在线用户(推荐) |
| APNs/FCM | 中 | 中 | 低 | 离线iOS/Android |
| 轮询 | 低 | 高 | 中 | 降级方案 |
3.4 技术栈清单
| 组件 | 技术选型 | 作用 |
|---|---|---|
| 长连接 | Netty / Go TCP | 维持客户端连接 |
| 协议 | Protobuf | 高效序列化 |
| 消息队列 | Kafka | 消息缓冲和分发 |
| 存储 | MongoDB + HBase | 消息存储 |
| 缓存 | Redis | 在线状态、会话 |
| 数据库 | MySQL | 用户、群组元数据 |
| 推送 | APNs、FCM | 离线推送 |
四、架构设计
4.1 系统架构图
mermaid
graph TB
subgraph 客户端
A1[iOS App]
A2[Android App]
A3[Web浏览器]
A4[PC客户端]
end
subgraph 接入层
B[负载均衡Nginx]
B --> C1[连接服务器1]
B --> C2[连接服务器2]
B --> C3[连接服务器N]
end
subgraph 业务层
C1 --> D[消息服务]
C2 --> D
C3 --> D
D --> E[会话服务]
D --> F[群组服务]
D --> G[离线服务]
end
subgraph 消息队列
D --> H[Kafka集群]
H --> I1[消息消费者1]
H --> I2[消息消费者2]
end
subgraph 存储层
I1 --> J1[MongoDB<br/>最近消息]
I2 --> J1
I1 --> J2[HBase<br/>历史消息]
I2 --> J2
E --> K[MySQL<br/>用户/群组]
D --> L[Redis<br/>在线状态]
end
subgraph 推送层
G --> M1[APNs]
G --> M2[FCM]
end
A1 --> B
A2 --> B
A3 --> B
A4 --> B4.2 核心流程
单聊消息发送流程
mermaid
sequenceDiagram
participant A as 用户A<br/>移动端
participant CS1 as 连接服务器1
participant MS as 消息服务
participant Kafka as Kafka
participant CS2 as 连接服务器2
participant B as 用户B<br/>移动端
participant Redis as Redis
participant Mongo as MongoDB
A->>CS1: 发送消息(TCP)
CS1->>MS: 转发消息
MS->>MS: 消息ID生成<br/>敏感词过滤
MS->>Kafka: 写入消息队列
MS-->>CS1: 返回ACK
CS1-->>A: 发送成功
Kafka->>Mongo: 消费者写入MongoDB
MS->>Redis: 查询用户B在线状态
alt 用户B在线
Redis-->>MS: 在线,连接服务器2
MS->>CS2: 推送消息
CS2->>B: TCP推送
B-->>CS2: 已收到ACK
CS2-->>MS: 已送达
else 用户B离线
Redis-->>MS: 离线
MS->>离线服务: 存储离线消息
离线服务->>APNs/FCM: 推送通知
end群聊消息发送流程
mermaid
sequenceDiagram
participant A as 用户A
participant CS as 连接服务器
participant MS as 消息服务
participant GS as 群组服务
participant Kafka as Kafka
participant DB as MongoDB
A->>CS: 发送群消息
CS->>MS: 转发消息
MS->>GS: 查询群成员列表
GS-->>MS: 返回成员列表(N人)
alt 小群(<100人)- 写扩散
MS->>Kafka: 写入N份消息
Note over MS,Kafka: 每个成员一份
Kafka->>DB: 存储N份消息
else 大群(>=100人)- 读扩散
MS->>Kafka: 写入1份消息
Note over MS,Kafka: 只存一份
Kafka->>DB: 存储1份消息
Note over DB: 读取时查询
end
MS->>CS: 遍历在线成员推送
CS->>在线成员: TCP推送
MS->>离线服务: 离线成员入库4.3 数据库设计
用户表(MySQL)
sql
CREATE TABLE im_user (
user_id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
nickname VARCHAR(100),
avatar VARCHAR(255),
status TINYINT DEFAULT 0 COMMENT '0正常 1禁用',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;群组表(MySQL)
sql
CREATE TABLE im_group (
group_id BIGINT PRIMARY KEY AUTO_INCREMENT,
group_name VARCHAR(100) NOT NULL,
owner_id BIGINT NOT NULL COMMENT '群主',
avatar VARCHAR(255),
member_count INT DEFAULT 0,
max_members INT DEFAULT 500,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_owner (owner_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE im_group_member (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
group_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
role TINYINT DEFAULT 0 COMMENT '0普通 1管理 2群主',
join_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_group_user (group_id, user_id),
INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;消息表(MongoDB)
javascript
// 单聊消息
{
_id: ObjectId,
msg_id: NumberLong, // 消息ID(雪花算法)
from_id: NumberLong, // 发送者
to_id: NumberLong, // 接收者
msg_type: Number, // 1文本 2图片 3语音 4视频
content: String, // 消息内容
send_time: ISODate, // 发送时间
read_status: Number, // 0未读 1已读
read_time: ISODate, // 已读时间
status: Number // 0正常 1撤回 2删除
}
// 群聊消息
{
_id: ObjectId,
msg_id: NumberLong,
group_id: NumberLong, // 群组ID
from_id: NumberLong,
msg_type: Number,
content: String,
send_time: ISODate,
read_users: Array, // 已读用户列表(小群)
status: Number
}
// 索引
db.im_message.createIndex({from_id: 1, to_id: 1, send_time: -1})
db.im_message.createIndex({msg_id: 1}, {unique: true})
db.im_group_message.createIndex({group_id: 1, send_time: -1})在线状态(Redis)
redis
# 用户在线状态
Key: online:{user_id}
Value: {
"server": "tcp-server-01",
"device": "iOS",
"login_time": "2024-01-01 12:00:00"
}
TTL: 300秒(心跳刷新)
# 会话最新消息ID(已读水位)
Key: session:{user_id}:{target_id}
Value: last_msg_id
TTL: 7天五、核心实现
5.1 长连接管理(Go实现)
点击查看完整实现
go
package im
import (
"context"
"fmt"
"net"
"sync"
"time"
)
// ConnectionManager 连接管理器
type ConnectionManager struct {
connections sync.Map // key: userID, value: *Connection
redis *RedisClient
kafka *KafkaProducer
}
// Connection 用户连接
type Connection struct {
UserID int64
Conn net.Conn
Device string
LastHeart time.Time
SendChan chan []byte
ctx context.Context
cancel context.CancelFunc
mutex sync.RWMutex
}
// NewConnectionManager 创建连接管理器
func NewConnectionManager(redis *RedisClient, kafka *KafkaProducer) *ConnectionManager {
return &ConnectionManager{
redis: redis,
kafka: kafka,
}
}
// AddConnection 添加连接
func (cm *ConnectionManager) AddConnection(userID int64, conn net.Conn, device string) *Connection {
ctx, cancel := context.WithCancel(context.Background())
connection := &Connection{
UserID: userID,
Conn: conn,
Device: device,
LastHeart: time.Now(),
SendChan: make(chan []byte, 100),
ctx: ctx,
cancel: cancel,
}
// 踢掉同设备的旧连接
if old, exists := cm.connections.LoadOrStore(userID, connection); exists {
oldConn := old.(*Connection)
if oldConn.Device == device {
oldConn.Close()
}
}
// 更新在线状态到Redis
cm.redis.SetOnlineStatus(userID, device, "tcp-server-01")
// 启动消息发送协程
go connection.sendLoop()
// 启动心跳检测
go connection.heartbeatCheck(cm)
return connection
}
// RemoveConnection 移除连接
func (cm *ConnectionManager) RemoveConnection(userID int64) {
if conn, exists := cm.connections.LoadAndDelete(userID); exists {
connection := conn.(*Connection)
connection.Close()
// 删除在线状态
cm.redis.DelOnlineStatus(userID)
}
}
// SendMessage 发送消息给指定用户
func (cm *ConnectionManager) SendMessage(userID int64, data []byte) error {
conn, exists := cm.connections.Load(userID)
if !exists {
return fmt.Errorf("user %d not online", userID)
}
connection := conn.(*Connection)
select {
case connection.SendChan <- data:
return nil
case <-time.After(time.Second):
return fmt.Errorf("send timeout")
}
}
// sendLoop 消息发送循环
func (c *Connection) sendLoop() {
defer c.Conn.Close()
for {
select {
case msg := <-c.SendChan:
// 设置写超时
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if _, err := c.Conn.Write(msg); err != nil {
return
}
case <-c.ctx.Done():
return
}
}
}
// heartbeatCheck 心跳检测
func (c *Connection) heartbeatCheck(cm *ConnectionManager) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mutex.RLock()
lastHeart := c.LastHeart
c.mutex.RUnlock()
// 超过2分钟没心跳,断开连接
if time.Since(lastHeart) > 2*time.Minute {
cm.RemoveConnection(c.UserID)
return
}
case <-c.ctx.Done():
return
}
}
}
// UpdateHeartbeat 更新心跳时间
func (c *Connection) UpdateHeartbeat() {
c.mutex.Lock()
c.LastHeart = time.Now()
c.mutex.Unlock()
}
// Close 关闭连接
func (c *Connection) Close() {
c.cancel()
close(c.SendChan)
}
// HandleMessage 处理收到的消息
func (cm *ConnectionManager) HandleMessage(userID int64, msg *Message) error {
// 1. 生成消息ID
msg.MsgID = generateMsgID()
msg.SendTime = time.Now()
// 2. 敏感词过滤
if containsSensitiveWord(msg.Content) {
return fmt.Errorf("message contains sensitive words")
}
// 3. 写入Kafka
if err := cm.kafka.SendMessage(msg); err != nil {
return err
}
// 4. 推送给接收方
if msg.ToID > 0 {
// 单聊
return cm.pushToUser(msg.ToID, msg)
} else if msg.GroupID > 0 {
// 群聊
return cm.pushToGroup(msg.GroupID, msg, userID)
}
return nil
}
// pushToUser 推送消息给用户
func (cm *ConnectionManager) pushToUser(toID int64, msg *Message) error {
// 检查是否在线
if cm.IsOnline(toID) {
// 在线,直接推送
return cm.SendMessage(toID, msg.Serialize())
} else {
// 离线,存储离线消息并推送通知
return cm.handleOfflineMessage(toID, msg)
}
}
// pushToGroup 推送群消息
func (cm *ConnectionManager) pushToGroup(groupID, fromID int64, msg *Message) error {
// 获取群成员列表
members, err := cm.getGroupMembers(groupID)
if err != nil {
return err
}
// 遍历成员推送(排除发送者)
for _, memberID := range members {
if memberID == fromID {
continue
}
if cm.IsOnline(memberID) {
cm.SendMessage(memberID, msg.Serialize())
}
}
return nil
}
// IsOnline 检查用户是否在线
func (cm *ConnectionManager) IsOnline(userID int64) bool {
_, exists := cm.connections.Load(userID)
return exists
}5.2 消息可靠性保证
go
// 消息确认机制
type MessageAck struct {
MsgID int64
Status int // 1已发送 2已送达 3已读
Time time.Time
}
// SendWithAck 发送消息并等待确认
func (c *Connection) SendWithAck(msg *Message) error {
// 1. 发送消息
if err := c.Send(msg.Serialize()); err != nil {
return err
}
// 2. 等待ACK(超时重试)
timeout := time.After(10 * time.Second)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
retryCount := 0
maxRetry := 3
for {
select {
case ack := <-c.AckChan:
if ack.MsgID == msg.MsgID {
return nil // 收到确认
}
case <-ticker.C:
// 重试
retryCount++
if retryCount >= maxRetry {
return fmt.Errorf("send timeout after %d retries", maxRetry)
}
c.Send(msg.Serialize())
case <-timeout:
return fmt.Errorf("send timeout")
}
}
}java
@Service
public class MessageDeduplication {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 消息去重:使用Redis SET判断
*/
public boolean isDuplicate(Long msgId) {
String key = "msg:dedup:" + msgId;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", 5, TimeUnit.MINUTES);
return !Boolean.TRUE.equals(success);
}
/**
* 处理消息(幂等)
*/
public void handleMessage(Message msg) {
// 1. 去重检查
if (isDuplicate(msg.getMsgId())) {
log.info("Duplicate message: {}", msg.getMsgId());
return;
}
// 2. 存储消息
messageRepository.save(msg);
// 3. 推送消息
pushToUser(msg);
}
}六、性能优化
6.1 群聊优化策略
写扩散 vs 读扩散
| 策略 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 写扩散 | 发送时复制N份 | 读取快 | 写入慢、存储大 | 小群(<100人) |
| 读扩散 | 存储1份,读时查询 | 写入快、存储小 | 读取慢 | 大群(>=100人) |
| 混合模式 | 小群写扩散,大群读扩散 | 平衡性能 | 实现复杂 | 推荐方案 |
6.2 离线消息优化
go
// 离线消息拉取优化
func (s *MessageService) PullOfflineMessages(userID int64, lastMsgID int64) ([]*Message, error) {
// 1. 只拉取最近的消息(如最近100条)
messages, err := s.repo.GetMessagesSince(userID, lastMsgID, 100)
if err != nil {
return nil, err
}
// 2. 对于大量离线消息,分批拉取
if len(messages) >= 100 {
// 返回分页标记,客户端继续拉取
return messages, nil
}
return messages, nil
}6.3 性能数据
| 指标 | 目标值 | 实际值 |
|---|---|---|
| 单机连接数 | 10万 | 15万 |
| 消息延迟P99 | <100ms | 85ms |
| 消息吞吐 | 10万QPS | 12万QPS |
| 内存占用 | <8GB | 6GB |
七、运维监控
7.1 监控指标
yaml
metrics:
# 连接指标
- online_users: 在线用户数
- total_connections: 总连接数
- connection_rate: 连接建立速率
# 消息指标
- message_qps: 消息QPS
- message_latency_p99: 消息延迟P99
- message_loss_rate: 消息丢失率
# 系统指标
- cpu_usage: CPU使用率
- memory_usage: 内存使用率
- goroutine_count: 协程数量7.2 告警规则
- 在线用户数骤降 > 20%
- 消息延迟P99 > 500ms
- 消息丢失率 > 0.01%
- 连接服务器宕机
八、面试要点
8.1 常见追问
Q1: 如何保证消息不丢失?
A:
- 客户端到服务器:TCP可靠传输 + ACK确认
- 服务器内部:Kafka持久化 + 副本机制
- 服务器到客户端:推送失败存离线 + 重试
- 存储层:MongoDB副本集 + 定期备份
Q2: 如何实现多端消息同步?
A:
- 已读水位:每个端记录已读的最大消息ID
- 上线拉取:登录时拉取大于已读水位的消息
- 实时推送:消息实时推送到所有在线设备
- 冲突解决:以最新的已读水位为准
Q3: 如何优化群聊性能?
A:
- 分级策略:小群写扩散,大群读扩散
- 推送优化:只推送@我的和最新N条
- 离线优化:大群离线不推送通知
- 消息聚合:合并多条消息推送
8.2 扩展知识点
- WebSocket协议 - 长连接原理
- Netty框架 - 高性能网络框架
- Kafka消息队列 - 消息缓冲
- MongoDB - 消息存储
九、相关资源
9.1 相关技术栈
9.2 开源项目
- Tinode - 开源IM服务器
- OpenIM - 企业级IM
- Rocket.Chat - 团队协作
总结:IM系统是最复杂的实时通信系统,涉及长连接管理、消息可靠性、性能优化等多个核心技术。面试中要能说清楚架构设计、消息流程、可靠性保证、性能优化等关键点,展现系统设计的全局思维。
