Skip to content

如何设计IM即时通讯系统

一、问题描述

1.1 业务背景

IM(Instant Messaging)即时通讯系统是互联网最核心的应用之一,广泛应用于:

  • 社交产品:微信、QQ、钉钉、Slack
  • 电商平台:客服系统、买卖家沟通
  • 企业协作:内部沟通、远程办公
  • 游戏应用:团队语音、文字聊天

1.2 核心功能

基础功能

  1. 单聊:一对一文本、图片、语音、视频消息
  2. 群聊:多人聊天,支持@某人、群公告
  3. 离线消息:用户离线时消息存储,上线后拉取
  4. 已读回执:单聊已读、群聊已读
  5. 消息同步:多端登录,消息实时同步

进阶功能

  1. 语音视频通话:实时音视频通信
  2. 文件传输:大文件分片上传下载
  3. 消息搜索:全文搜索历史消息
  4. 消息撤回:2分钟内可撤回
  5. @提醒:群聊中@某人

1.3 技术挑战

海量在线连接

  • 百万级同时在线用户
  • 每个用户一个长连接
  • 连接保活和断线重连

消息可靠性

  • 消息不丢失(At Least Once)
  • 消息不重复(幂等)
  • 消息有序(同一会话)

高并发消息

  • 每秒百万级消息
  • 群聊消息扩散
  • 实时性要求高(<100ms)

存储挑战

  • 海量历史消息存储
  • 快速查询和检索
  • 成本控制

1.4 面试考察点

  • 架构设计能力:如何设计可扩展的IM架构?
  • 长连接管理:如何管理百万级长连接?
  • 消息可靠性:如何保证消息不丢失?
  • 性能优化:如何优化群聊性能?
  • 多端同步:如何实现多端消息同步?

二、需求分析

2.1 功能性需求

需求描述优先级
FR1支持单聊文本消息P0
FR2支持群聊(<500人)P0
FR3支持离线消息P0
FR4支持已读回执P1
FR5支持多端同步P1
FR6支持消息撤回P1
FR7支持文件传输P1
FR8支持消息搜索P2

2.2 非功能性需求

性能需求

  • 同时在线用户:100万+
  • 消息QPS:10万+
  • 消息延迟:<100ms(P95)
  • 连接保持:24小时+

可靠性需求

  • 消息不丢失:99.99%
  • 系统可用性:99.95%
  • 消息有序性:同一会话100%有序

存储需求

  • 消息保存:永久(重要消息)
  • 历史消息:快速查询
  • 存储成本:可控

安全需求

  • 消息加密传输(TLS)
  • 敏感内容过滤
  • 用户隐私保护

2.3 约束条件

  1. 网络环境:需要处理弱网、断网场景
  2. 设备多样:iOS、Android、Web、PC多端
  3. 流量成本:需要优化网络流量
  4. 电量消耗:移动端要节省电量

2.4 边界场景

  1. 连接断开:网络断开、切换网络
  2. 消息乱序:网络延迟导致消息乱序
  3. 重复消息:网络重传导致重复
  4. 大群聊:500人群聊消息风暴
  5. 多端登录:同一账号多设备登录

三、技术选型

3.1 通信协议选择

WebSocket vs TCP vs HTTP

协议优点缺点适用场景
WebSocket浏览器原生支持、双向通信不支持UDPWeb端
TCP可靠传输、自定义协议需要心跳保活移动端(推荐)
HTTP长轮询兼容性好延迟高、效率低降级方案
HTTP/2 Server Push服务器推送浏览器支持有限部分Web场景

推荐方案

  • 移动端:TCP长连接 + 自定义协议(protobuf)
  • Web端:WebSocket
  • 降级方案:HTTP长轮询

3.2 消息存储选择

存储优点缺点适用场景
MySQL事务支持、可靠性高性能一般、成本高关键元数据
MongoDB高性能、灵活无事务消息存储(推荐)
HBase海量存储、成本低查询受限历史消息归档
Redis极高性能易丢失、成本高在线状态、最近消息

推荐方案

  • 在线状态:Redis(临时数据)
  • 最近消息:MongoDB(7天内)
  • 历史消息:HBase(7天外)
  • 元数据:MySQL(用户、群组)

3.3 推送方案选择

方案实时性可靠性成本适用场景
长连接推送最高在线用户(推荐)
APNs/FCM离线iOS/Android
轮询降级方案

3.4 技术栈清单

组件技术选型作用
长连接Netty / Go TCP维持客户端连接
协议Protobuf高效序列化
消息队列Kafka消息缓冲和分发
存储MongoDB + HBase消息存储
缓存Redis在线状态、会话
数据库MySQL用户、群组元数据
推送APNs、FCM离线推送

四、架构设计

4.1 系统架构图

mermaid
graph TB
    subgraph 客户端
        A1[iOS App]
        A2[Android App]
        A3[Web浏览器]
        A4[PC客户端]
    end
    
    subgraph 接入层
        B[负载均衡Nginx]
        B --> C1[连接服务器1]
        B --> C2[连接服务器2]
        B --> C3[连接服务器N]
    end
    
    subgraph 业务层
        C1 --> D[消息服务]
        C2 --> D
        C3 --> D
        D --> E[会话服务]
        D --> F[群组服务]
        D --> G[离线服务]
    end
    
    subgraph 消息队列
        D --> H[Kafka集群]
        H --> I1[消息消费者1]
        H --> I2[消息消费者2]
    end
    
    subgraph 存储层
        I1 --> J1[MongoDB<br/>最近消息]
        I2 --> J1
        I1 --> J2[HBase<br/>历史消息]
        I2 --> J2
        E --> K[MySQL<br/>用户/群组]
        D --> L[Redis<br/>在线状态]
    end
    
    subgraph 推送层
        G --> M1[APNs]
        G --> M2[FCM]
    end
    
    A1 --> B
    A2 --> B
    A3 --> B
    A4 --> B

4.2 核心流程

单聊消息发送流程

mermaid
sequenceDiagram
    participant A as 用户A<br/>移动端
    participant CS1 as 连接服务器1
    participant MS as 消息服务
    participant Kafka as Kafka
    participant CS2 as 连接服务器2
    participant B as 用户B<br/>移动端
    participant Redis as Redis
    participant Mongo as MongoDB
    
    A->>CS1: 发送消息(TCP)
    CS1->>MS: 转发消息
    
    MS->>MS: 消息ID生成<br/>敏感词过滤
    MS->>Kafka: 写入消息队列
    MS-->>CS1: 返回ACK
    CS1-->>A: 发送成功
    
    Kafka->>Mongo: 消费者写入MongoDB
    
    MS->>Redis: 查询用户B在线状态
    alt 用户B在线
        Redis-->>MS: 在线,连接服务器2
        MS->>CS2: 推送消息
        CS2->>B: TCP推送
        B-->>CS2: 已收到ACK
        CS2-->>MS: 已送达
    else 用户B离线
        Redis-->>MS: 离线
        MS->>离线服务: 存储离线消息
        离线服务->>APNs/FCM: 推送通知
    end

群聊消息发送流程

mermaid
sequenceDiagram
    participant A as 用户A
    participant CS as 连接服务器
    participant MS as 消息服务
    participant GS as 群组服务
    participant Kafka as Kafka
    participant DB as MongoDB
    
    A->>CS: 发送群消息
    CS->>MS: 转发消息
    MS->>GS: 查询群成员列表
    GS-->>MS: 返回成员列表(N人)
    
    alt 小群(<100人)- 写扩散
        MS->>Kafka: 写入N份消息
        Note over MS,Kafka: 每个成员一份
        Kafka->>DB: 存储N份消息
    else 大群(>=100人)- 读扩散
        MS->>Kafka: 写入1份消息
        Note over MS,Kafka: 只存一份
        Kafka->>DB: 存储1份消息
        Note over DB: 读取时查询
    end
    
    MS->>CS: 遍历在线成员推送
    CS->>在线成员: TCP推送
    MS->>离线服务: 离线成员入库

4.3 数据库设计

用户表(MySQL)

sql
CREATE TABLE im_user (
    user_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) UNIQUE NOT NULL,
    nickname VARCHAR(100),
    avatar VARCHAR(255),
    status TINYINT DEFAULT 0 COMMENT '0正常 1禁用',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username (username)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

群组表(MySQL)

sql
CREATE TABLE im_group (
    group_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    group_name VARCHAR(100) NOT NULL,
    owner_id BIGINT NOT NULL COMMENT '群主',
    avatar VARCHAR(255),
    member_count INT DEFAULT 0,
    max_members INT DEFAULT 500,
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_owner (owner_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE im_group_member (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    group_id BIGINT NOT NULL,
    user_id BIGINT NOT NULL,
    role TINYINT DEFAULT 0 COMMENT '0普通 1管理 2群主',
    join_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_group_user (group_id, user_id),
    INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

消息表(MongoDB)

javascript
// 单聊消息
{
    _id: ObjectId,
    msg_id: NumberLong,  // 消息ID(雪花算法)
    from_id: NumberLong, // 发送者
    to_id: NumberLong,   // 接收者
    msg_type: Number,    // 1文本 2图片 3语音 4视频
    content: String,     // 消息内容
    send_time: ISODate,  // 发送时间
    read_status: Number, // 0未读 1已读
    read_time: ISODate,  // 已读时间
    status: Number       // 0正常 1撤回 2删除
}

// 群聊消息
{
    _id: ObjectId,
    msg_id: NumberLong,
    group_id: NumberLong,  // 群组ID
    from_id: NumberLong,
    msg_type: Number,
    content: String,
    send_time: ISODate,
    read_users: Array,     // 已读用户列表(小群)
    status: Number
}

// 索引
db.im_message.createIndex({from_id: 1, to_id: 1, send_time: -1})
db.im_message.createIndex({msg_id: 1}, {unique: true})
db.im_group_message.createIndex({group_id: 1, send_time: -1})

在线状态(Redis)

redis
# 用户在线状态
Key: online:{user_id}
Value: {
    "server": "tcp-server-01",
    "device": "iOS",
    "login_time": "2024-01-01 12:00:00"
}
TTL: 300秒(心跳刷新)

# 会话最新消息ID(已读水位)
Key: session:{user_id}:{target_id}
Value: last_msg_id
TTL: 7天

五、核心实现

5.1 长连接管理(Go实现)

点击查看完整实现
go
package im

import (
    "context"
    "fmt"
    "net"
    "sync"
    "time"
)

// ConnectionManager 连接管理器
type ConnectionManager struct {
    connections sync.Map // key: userID, value: *Connection
    redis       *RedisClient
    kafka       *KafkaProducer
}

// Connection 用户连接
type Connection struct {
    UserID    int64
    Conn      net.Conn
    Device    string
    LastHeart time.Time
    SendChan  chan []byte
    ctx       context.Context
    cancel    context.CancelFunc
    mutex     sync.RWMutex
}

// NewConnectionManager 创建连接管理器
func NewConnectionManager(redis *RedisClient, kafka *KafkaProducer) *ConnectionManager {
    return &ConnectionManager{
        redis: redis,
        kafka: kafka,
    }
}

// AddConnection 添加连接
func (cm *ConnectionManager) AddConnection(userID int64, conn net.Conn, device string) *Connection {
    ctx, cancel := context.WithCancel(context.Background())
    
    connection := &Connection{
        UserID:    userID,
        Conn:      conn,
        Device:    device,
        LastHeart: time.Now(),
        SendChan:  make(chan []byte, 100),
        ctx:       ctx,
        cancel:    cancel,
    }
    
    // 踢掉同设备的旧连接
    if old, exists := cm.connections.LoadOrStore(userID, connection); exists {
        oldConn := old.(*Connection)
        if oldConn.Device == device {
            oldConn.Close()
        }
    }
    
    // 更新在线状态到Redis
    cm.redis.SetOnlineStatus(userID, device, "tcp-server-01")
    
    // 启动消息发送协程
    go connection.sendLoop()
    
    // 启动心跳检测
    go connection.heartbeatCheck(cm)
    
    return connection
}

// RemoveConnection 移除连接
func (cm *ConnectionManager) RemoveConnection(userID int64) {
    if conn, exists := cm.connections.LoadAndDelete(userID); exists {
        connection := conn.(*Connection)
        connection.Close()
        
        // 删除在线状态
        cm.redis.DelOnlineStatus(userID)
    }
}

// SendMessage 发送消息给指定用户
func (cm *ConnectionManager) SendMessage(userID int64, data []byte) error {
    conn, exists := cm.connections.Load(userID)
    if !exists {
        return fmt.Errorf("user %d not online", userID)
    }
    
    connection := conn.(*Connection)
    select {
    case connection.SendChan <- data:
        return nil
    case <-time.After(time.Second):
        return fmt.Errorf("send timeout")
    }
}

// sendLoop 消息发送循环
func (c *Connection) sendLoop() {
    defer c.Conn.Close()
    
    for {
        select {
        case msg := <-c.SendChan:
            // 设置写超时
            c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if _, err := c.Conn.Write(msg); err != nil {
                return
            }
        case <-c.ctx.Done():
            return
        }
    }
}

// heartbeatCheck 心跳检测
func (c *Connection) heartbeatCheck(cm *ConnectionManager) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            c.mutex.RLock()
            lastHeart := c.LastHeart
            c.mutex.RUnlock()
            
            // 超过2分钟没心跳,断开连接
            if time.Since(lastHeart) > 2*time.Minute {
                cm.RemoveConnection(c.UserID)
                return
            }
        case <-c.ctx.Done():
            return
        }
    }
}

// UpdateHeartbeat 更新心跳时间
func (c *Connection) UpdateHeartbeat() {
    c.mutex.Lock()
    c.LastHeart = time.Now()
    c.mutex.Unlock()
}

// Close 关闭连接
func (c *Connection) Close() {
    c.cancel()
    close(c.SendChan)
}

// HandleMessage 处理收到的消息
func (cm *ConnectionManager) HandleMessage(userID int64, msg *Message) error {
    // 1. 生成消息ID
    msg.MsgID = generateMsgID()
    msg.SendTime = time.Now()
    
    // 2. 敏感词过滤
    if containsSensitiveWord(msg.Content) {
        return fmt.Errorf("message contains sensitive words")
    }
    
    // 3. 写入Kafka
    if err := cm.kafka.SendMessage(msg); err != nil {
        return err
    }
    
    // 4. 推送给接收方
    if msg.ToID > 0 {
        // 单聊
        return cm.pushToUser(msg.ToID, msg)
    } else if msg.GroupID > 0 {
        // 群聊
        return cm.pushToGroup(msg.GroupID, msg, userID)
    }
    
    return nil
}

// pushToUser 推送消息给用户
func (cm *ConnectionManager) pushToUser(toID int64, msg *Message) error {
    // 检查是否在线
    if cm.IsOnline(toID) {
        // 在线,直接推送
        return cm.SendMessage(toID, msg.Serialize())
    } else {
        // 离线,存储离线消息并推送通知
        return cm.handleOfflineMessage(toID, msg)
    }
}

// pushToGroup 推送群消息
func (cm *ConnectionManager) pushToGroup(groupID, fromID int64, msg *Message) error {
    // 获取群成员列表
    members, err := cm.getGroupMembers(groupID)
    if err != nil {
        return err
    }
    
    // 遍历成员推送(排除发送者)
    for _, memberID := range members {
        if memberID == fromID {
            continue
        }
        
        if cm.IsOnline(memberID) {
            cm.SendMessage(memberID, msg.Serialize())
        }
    }
    
    return nil
}

// IsOnline 检查用户是否在线
func (cm *ConnectionManager) IsOnline(userID int64) bool {
    _, exists := cm.connections.Load(userID)
    return exists
}

5.2 消息可靠性保证

go
// 消息确认机制
type MessageAck struct {
    MsgID    int64
    Status   int // 1已发送 2已送达 3已读
    Time     time.Time
}

// SendWithAck 发送消息并等待确认
func (c *Connection) SendWithAck(msg *Message) error {
    // 1. 发送消息
    if err := c.Send(msg.Serialize()); err != nil {
        return err
    }
    
    // 2. 等待ACK(超时重试)
    timeout := time.After(10 * time.Second)
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    
    retryCount := 0
    maxRetry := 3
    
    for {
        select {
        case ack := <-c.AckChan:
            if ack.MsgID == msg.MsgID {
                return nil // 收到确认
            }
        case <-ticker.C:
            // 重试
            retryCount++
            if retryCount >= maxRetry {
                return fmt.Errorf("send timeout after %d retries", maxRetry)
            }
            c.Send(msg.Serialize())
        case <-timeout:
            return fmt.Errorf("send timeout")
        }
    }
}
java
@Service
public class MessageDeduplication {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 消息去重:使用Redis SET判断
     */
    public boolean isDuplicate(Long msgId) {
        String key = "msg:dedup:" + msgId;
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", 5, TimeUnit.MINUTES);
        return !Boolean.TRUE.equals(success);
    }
    
    /**
     * 处理消息(幂等)
     */
    public void handleMessage(Message msg) {
        // 1. 去重检查
        if (isDuplicate(msg.getMsgId())) {
            log.info("Duplicate message: {}", msg.getMsgId());
            return;
        }
        
        // 2. 存储消息
        messageRepository.save(msg);
        
        // 3. 推送消息
        pushToUser(msg);
    }
}

六、性能优化

6.1 群聊优化策略

写扩散 vs 读扩散

策略原理优点缺点适用场景
写扩散发送时复制N份读取快写入慢、存储大小群(<100人)
读扩散存储1份,读时查询写入快、存储小读取慢大群(>=100人)
混合模式小群写扩散,大群读扩散平衡性能实现复杂推荐方案

6.2 离线消息优化

go
// 离线消息拉取优化
func (s *MessageService) PullOfflineMessages(userID int64, lastMsgID int64) ([]*Message, error) {
    // 1. 只拉取最近的消息(如最近100条)
    messages, err := s.repo.GetMessagesSince(userID, lastMsgID, 100)
    if err != nil {
        return nil, err
    }
    
    // 2. 对于大量离线消息,分批拉取
    if len(messages) >= 100 {
        // 返回分页标记,客户端继续拉取
        return messages, nil
    }
    
    return messages, nil
}

6.3 性能数据

指标目标值实际值
单机连接数10万15万
消息延迟P99<100ms85ms
消息吞吐10万QPS12万QPS
内存占用<8GB6GB

七、运维监控

7.1 监控指标

yaml
metrics:
  # 连接指标
  - online_users: 在线用户数
  - total_connections: 总连接数
  - connection_rate: 连接建立速率
  
  # 消息指标
  - message_qps: 消息QPS
  - message_latency_p99: 消息延迟P99
  - message_loss_rate: 消息丢失率
  
  # 系统指标
  - cpu_usage: CPU使用率
  - memory_usage: 内存使用率
  - goroutine_count: 协程数量

7.2 告警规则

  • 在线用户数骤降 > 20%
  • 消息延迟P99 > 500ms
  • 消息丢失率 > 0.01%
  • 连接服务器宕机

八、面试要点

8.1 常见追问

Q1: 如何保证消息不丢失?

A:

  1. 客户端到服务器:TCP可靠传输 + ACK确认
  2. 服务器内部:Kafka持久化 + 副本机制
  3. 服务器到客户端:推送失败存离线 + 重试
  4. 存储层:MongoDB副本集 + 定期备份

Q2: 如何实现多端消息同步?

A:

  1. 已读水位:每个端记录已读的最大消息ID
  2. 上线拉取:登录时拉取大于已读水位的消息
  3. 实时推送:消息实时推送到所有在线设备
  4. 冲突解决:以最新的已读水位为准

Q3: 如何优化群聊性能?

A:

  1. 分级策略:小群写扩散,大群读扩散
  2. 推送优化:只推送@我的和最新N条
  3. 离线优化:大群离线不推送通知
  4. 消息聚合:合并多条消息推送

8.2 扩展知识点

九、相关资源

9.1 相关技术栈

9.2 开源项目


总结:IM系统是最复杂的实时通信系统,涉及长连接管理、消息可靠性、性能优化等多个核心技术。面试中要能说清楚架构设计、消息流程、可靠性保证、性能优化等关键点,展现系统设计的全局思维。

正在精进