如何设计消息推送系统
一、问题描述
1.1 业务背景
消息推送是移动应用的核心功能,用于实时通知用户。
典型场景:
- App通知:新闻推送、活动通知
- 系统消息:订单状态、物流更新
- 营销消息:优惠券、促销活动
1.2 核心功能
- 单推:推送给单个用户
- 群推:推送给指定用户群
- 全量推送:推送给所有用户
- 定时推送:定时发送
1.3 技术挑战
到达率:
发送100万条推送
实际到达:70万(70%到达率)
如何提高到达率?高并发:
双十一:瞬间推送给1亿用户
如何快速推送?1.4 面试考察点
- 推送架构:客户端SDK、网关、第三方
- 推送协议:APNs、FCM接入
- 长连接管理:心跳保活
- 离线消息:存储和重试
二、系统架构
2.1 架构图
mermaid
graph TB
subgraph 客户端
A[iOS App]
B[Android App]
end
subgraph 推送系统
C[推送API]
D[消息队列]
E[推送Gateway]
end
subgraph 第三方Push
F[APNs<br/>iOS推送]
G[FCM<br/>Android推送]
H[厂商Push<br/>小米/华为]
end
subgraph 存储
I[Redis<br/>设备Token]
J[MySQL<br/>推送记录]
end
C --> D
D --> E
E --> F
E --> G
E --> H
E --> I
E --> J
F -.-> A
G -.-> B
H -.-> B三、推送协议
3.1 APNs(iOS推送)
流程:
1. App启动 → 注册APNs → 获取DeviceToken
2. DeviceToken上报给服务器
3. 服务器调用APNs推送
4. APNs推送到设备Go实现:
go
import "github.com/sideshow/apns2"
func (s *PushService) PushToiOS(deviceToken, message string) error {
notification := &apns2.Notification{
DeviceToken: deviceToken,
Topic: "com.example.app",
Payload: map[string]interface{}{
"aps": map[string]interface{}{
"alert": message,
"badge": 1,
"sound": "default",
},
},
}
client := apns2.NewClient(cert)
res, err := client.Push(notification)
if err != nil {
return err
}
if res.StatusCode != 200 {
return fmt.Errorf("推送失败: %s", res.Reason)
}
return nil
}3.2 FCM(Android推送)
Go实现:
go
import "firebase.google.com/go/messaging"
func (s *PushService) PushToAndroid(deviceToken, message string) error {
msg := &messaging.Message{
Token: deviceToken,
Notification: &messaging.Notification{
Title: "新消息",
Body: message,
},
Android: &messaging.AndroidConfig{
Priority: "high",
},
}
_, err := s.fcmClient.Send(context.Background(), msg)
return err
}四、核心实现
4.1 Go实现
go
package push
import (
"context"
"fmt"
)
type PushService struct {
db *gorm.DB
redis *redis.Client
mq *MessageQueue
}
// 推送消息
type PushMessage struct {
ID int64
Title string
Content string
UserIDs []int64
PushType int8 // 1单推 2群推 3全量
ScheduleTime time.Time
Status int8 // 1待推送 2推送中 3已完成
}
// 设备Token
type DeviceToken struct {
UserID int64
DeviceToken string
Platform string // ios/android
IsActive bool
}
// Push 推送消息
func (s *PushService) Push(ctx context.Context, msg *PushMessage) error {
// 1. 保存推送记录
err := s.db.Create(msg).Error
if err != nil {
return err
}
// 2. 查询设备Token
tokens, err := s.getDeviceTokens(msg.UserIDs)
if err != nil {
return err
}
// 3. 分批推送
batchSize := 1000
for i := 0; i < len(tokens); i += batchSize {
end := i + batchSize
if end > len(tokens) {
end = len(tokens)
}
batch := tokens[i:end]
// 发送到消息队列
s.mq.Send("push_topic", PushTask{
MessageID: msg.ID,
Tokens: batch,
Content: msg.Content,
})
}
return nil
}
// 批量推送
func (s *PushService) BatchPush(task *PushTask) error {
for _, token := range task.Tokens {
var err error
if token.Platform == "ios" {
err = s.PushToiOS(token.DeviceToken, task.Content)
} else {
err = s.PushToAndroid(token.DeviceToken, task.Content)
}
// 记录推送结果
s.recordPushResult(task.MessageID, token.UserID, err)
// 失败存储到离线消息
if err != nil {
s.saveOfflineMessage(token.UserID, task.Content)
}
}
return nil
}
// 离线消息存储
func (s *PushService) saveOfflineMessage(userID int64, content string) error {
key := fmt.Sprintf("offline:msg:%d", userID)
// 存储到Redis List,最多保留100条
s.redis.LPush(ctx, key, content)
s.redis.LTrim(ctx, key, 0, 99)
s.redis.Expire(ctx, key, 7*24*time.Hour)
return nil
}
// 获取离线消息
func (s *PushService) GetOfflineMessages(userID int64) ([]string, error) {
key := fmt.Sprintf("offline:msg:%d", userID)
messages, err := s.redis.LRange(ctx, key, 0, -1).Result()
if err != nil {
return nil, err
}
// 清空已读
s.redis.Del(ctx, key)
return messages, nil
}五、长连接管理
5.1 心跳保活
客户端:
go
// 每30秒发送心跳
func (c *PushClient) startHeartbeat() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
c.conn.WriteMessage(websocket.PingMessage, []byte{})
}
}服务端:
go
// 检测心跳超时
func (s *PushServer) checkHeartbeat() {
for conn := range s.connections {
lastHeartbeat := conn.GetLastHeartbeat()
if time.Since(lastHeartbeat) > 90*time.Second {
// 超时,关闭连接
conn.Close()
}
}
}5.2 断线重连
go
func (c *PushClient) connect() {
for {
err := c.dial()
if err == nil {
break
}
// 指数退避重试
time.Sleep(c.retryDelay)
c.retryDelay *= 2
if c.retryDelay > 60*time.Second {
c.retryDelay = 60 * time.Second
}
}
}六、性能优化
6.1 批量推送
go
// 每批1000个设备
func (s *PushService) batchPush(tokens []string, message string) {
for i := 0; i < len(tokens); i += 1000 {
batch := tokens[i:min(i+1000, len(tokens))]
go s.pushBatch(batch, message)
}
}6.2 限流控制
go
// 每秒最多推送1万条
limiter := rate.NewLimiter(10000, 10000)
for _, token := range tokens {
limiter.Wait(ctx)
go s.pushToDevice(token, message)
}6.3 性能数据
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 推送速度 | 100/s | 10000/s | 100x |
| 到达率 | 60% | 85% | +25% |
| 延迟 | 5s | 500ms | 10x |
七、面试要点
7.1 常见追问
Q1: APNs和FCM有什么区别?
A:
| 维度 | APNs | FCM |
|---|---|---|
| 平台 | iOS | Android |
| 协议 | HTTP/2 | HTTP |
| 证书 | 需要证书 | API Key |
| 到达率 | 高(95%+) | 中(80%+) |
Q2: 如何提高消息到达率?
A:
- 离线消息:存储未到达的消息
- 重试机制:失败后重试3次
- 多通道:厂商Push + FCM双通道
- 降级方案:Push失败时短信通知
Q3: 如何处理大量推送?
A:
- 消息队列:Kafka削峰
- 批量推送:每批1000个
- 限流控制:控制推送速率
- 分布式部署:多台Push Gateway
7.2 扩展知识点
相关场景题:
八、总结
消息推送系统设计要点:
- 推送协议:APNs(iOS)+ FCM(Android)
- 长连接:心跳保活 + 断线重连
- 离线消息:Redis存储 + 7天过期
- 性能优化:批量推送 + 限流控制
面试重点:
- 能画出推送架构图
- 能说出APNs和FCM的接入方式
- 能设计离线消息存储
- 能解释如何提高到达率
参考资料:
- APNs官方文档
- FCM官方文档
- 极光推送技术分享
