如何设计Feed流系统
一、问题描述
1.1 业务背景
Feed流(动态流、时间线)是社交产品的核心功能,用于展示用户关注的内容更新。
典型应用场景:
- 微博:关注的博主发布的微博
- 朋友圈:好友发布的动态
- 抖音:关注的创作者发布的视频
- Twitter:关注的用户发布的推文
- Instagram:关注的用户发布的照片
核心价值:
- 让用户及时看到关注对象的最新动态
- 提高用户活跃度和留存率
- 是社交产品的流量入口
1.2 核心功能
基础功能:
- 发布动态:用户发布内容,推送给粉丝
- 查看Feed:用户查看关注对象的最新动态
- 排序展示:按时间、热度或个性化排序
- 分页加载:支持下拉刷新和上拉加载更多
进阶功能:
- 已读标记:标记哪些动态已读
- 去重:同一动态不重复展示
- 混合Feed:关注+推荐内容混排
- 实时更新:有新动态时提示用户
1.3 技术挑战
读写不对称:
- 写少读多:一个人发布,千万人读取
- 大V效应:百万粉丝的大V发布,瞬间千万次读取
数据量巨大:
假设:1亿用户,平均关注100人,每人每天发10条
- 总Feed条目:1亿 × 100 × 10 = 1万亿条/天
- 存储:1万亿 × 100B ≈ 100TB/天实时性要求:
- 发布后立即推送给粉丝
- 用户打开App立即看到最新内容
- 响应时间 <200ms
热点问题:
- 大V发布瞬间百万请求
- 热门动态瞬间百万点赞
1.4 面试考察点
- 架构选择:推模式 vs 拉模式 vs 推拉结合
- 存储设计:如何存储海量Feed数据
- 性能优化:如何支撑亿级用户
- 一致性:如何保证Feed不丢不重
- 排序算法:时间序 vs 热度 vs 个性化
二、需求分析
2.1 功能性需求
| 需求 | 描述 | 优先级 |
|---|---|---|
| FR1 | 用户发布动态,推送给所有粉丝 | P0 |
| FR2 | 用户查看关注人的最新动态 | P0 |
| FR3 | Feed按时间倒序排列 | P0 |
| FR4 | 支持分页加载(每页20条) | P0 |
| FR5 | 支持下拉刷新 | P1 |
| FR6 | 支持按热度排序 | P1 |
| FR7 | 混合推荐内容(关注+推荐) | P2 |
| FR8 | 实时提示新动态 | P2 |
2.2 非功能性需求
性能需求:
- 发布响应时间:<500ms
- Feed查询响应时间:<200ms
- 支持QPS:写10万+,读100万+
可扩展性:
- 支持用户规模:10亿+
- 支持日活用户:1亿+
- 支持关注上限:5000人/用户
可用性:
- 系统可用性:99.9%
- Feed延迟:<5秒
一致性:
- 最终一致性(允许短暂延迟)
- 不丢动态、不重复
2.3 数据规模估算
假设:
- 总用户:10亿
- 日活用户:1亿
- 平均关注:100人
- 日均发布:5条/人
- Feed保留:30天
计算:
日发布总量 = 1亿 × 5 = 5亿条
日Feed生成量 = 5亿 × 100(平均粉丝) = 500亿条
存储 = 500亿 × 100B × 30天 = 150TB
写QPS = 5亿 / 86400 ≈ 5787 QPS(峰值3万)
读QPS = 1亿用户 × 10次/天 / 86400 ≈ 11574 QPS(峰值10万)三、技术选型
3.1 三种架构模式
推模式(写扩散 / Fan-out on Write)
原理:
- 用户发布动态时,立即推送给所有粉丝
- 将动态写入每个粉丝的收件箱(Inbox)
- 粉丝读取时直接从自己的收件箱读取
流程:
1. 用户A发布动态
2. 查询A的所有粉丝(100万)
3. 将动态ID写入100万个粉丝的收件箱
4. 粉丝B查看Feed时,读取自己的收件箱优点:
- ✅ 读取快:直接读自己的收件箱,无需join
- ✅ 读写分离:写和读不冲突
缺点:
- ❌ 写入慢:大V发布需写入百万粉丝收件箱
- ❌ 存储大:每个粉丝都要存一份
适用场景:
- 粉丝数量有限(<5000)
- 发布不频繁
- 对读取性能要求极高
拉模式(读扩散 / Fan-out on Read)
原理:
- 用户发布动态时,只写入自己的发件箱(Outbox)
- 粉丝读取时,查询所有关注人的发件箱
- 合并排序后返回
流程:
1. 用户A发布动态
2. 将动态写入A的发件箱
3. 粉丝B查看Feed时
4. 查询B关注的所有人(100人)的发件箱
5. 合并排序返回优点:
- ✅ 写入快:只写自己的发件箱
- ✅ 存储小:每条动态只存一份
缺点:
- ❌ 读取慢:需要查询100人的发件箱并合并排序
- ❌ 实时性差:需要实时查询
适用场景:
- 有大V用户(百万粉丝)
- 对写入性能要求高
- 读取可以牺牲一些性能
推拉结合(混合模式)
原理:
- 普通用户:推模式(粉丝<1万)
- 大V用户:拉模式(粉丝>1万)
- 读取时:推的直接读,拉的实时合并
流程:
1. 普通用户发布:推送给所有粉丝收件箱
2. 大V发布:只写自己的发件箱
3. 粉丝读取:
- 读取收件箱(推模式的动态)
- 查询关注的大V发件箱(拉模式)
- 合并排序返回优点:
- ✅ 综合最优:兼顾读写性能
- ✅ 灵活:根据用户类型选择策略
缺点:
- ❌ 复杂度高:需要维护两套逻辑
适用场景:
- 既有普通用户又有大V
- 追求综合性能最优
3.2 推荐方案对比
| 维度 | 推模式 | 拉模式 | 推拉结合 |
|---|---|---|---|
| 写性能 | 慢(O(粉丝数)) | 快(O(1)) | 混合 |
| 读性能 | 快(O(1)) | 慢(O(关注数)) | 混合 |
| 存储成本 | 高 | 低 | 中 |
| 实时性 | 高 | 低 | 高 |
| 复杂度 | 低 | 低 | 高 |
| 适用场景 | 小规模 | 大V | 综合最优 ⭐ |
推荐:推拉结合(Twitter、微博采用)
3.3 技术栈
存储:
- MySQL:存储动态内容、用户关系
- Redis:缓存收件箱、热点数据
- HBase:存储海量Feed历史数据
消息队列:
- Kafka:异步推送Feed到粉丝收件箱
缓存:
- Redis:收件箱缓存、排序缓存
四、系统设计
4.1 架构图
mermaid
graph TB
subgraph 客户端
A[用户App]
end
subgraph 接入层
B[API Gateway]
end
subgraph 业务层
C[发布服务]
D[Feed服务]
E[关注服务]
end
subgraph 数据层
F[MySQL<br/>动态表/关注表]
G[Redis<br/>收件箱缓存]
H[HBase<br/>历史Feed]
end
subgraph 消息队列
I[Kafka<br/>Feed推送队列]
J[Consumer<br/>Feed写入器]
end
A --> B
B --> C
B --> D
C --> F
C --> I
I --> J
J --> G
J --> H
D --> G
D --> F
D --> H
E --> F4.2 数据库设计
动态表(Posts)
sql
CREATE TABLE posts (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL COMMENT '发布者ID',
content TEXT COMMENT '动态内容',
media_urls JSON COMMENT '图片/视频URL',
post_type TINYINT DEFAULT 1 COMMENT '类型 1原创 2转发',
forward_id BIGINT COMMENT '转发的原动态ID',
like_count INT DEFAULT 0 COMMENT '点赞数',
comment_count INT DEFAULT 0 COMMENT '评论数',
forward_count INT DEFAULT 0 COMMENT '转发数',
is_deleted TINYINT DEFAULT 0 COMMENT '是否删除',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
KEY idx_user_time (user_id, create_time),
KEY idx_create_time (create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='动态表';关注关系表(Follows)
sql
CREATE TABLE follows (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL COMMENT '关注者ID',
follow_user_id BIGINT NOT NULL COMMENT '被关注者ID',
status TINYINT DEFAULT 1 COMMENT '状态 1关注中',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_user_follow (user_id, follow_user_id),
KEY idx_follow_user (follow_user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='关注关系表';Feed收件箱表(Feed_Inbox)
sql
CREATE TABLE feed_inbox (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL COMMENT '用户ID(收件人)',
post_id BIGINT NOT NULL COMMENT '动态ID',
post_user_id BIGINT NOT NULL COMMENT '动态发布者ID',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_user_post (user_id, post_id),
KEY idx_user_time (user_id, create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='Feed收件箱表(推模式)';
-- 分表:按user_id % 256分表
CREATE TABLE feed_inbox_0 LIKE feed_inbox;
CREATE TABLE feed_inbox_1 LIKE feed_inbox;
-- ... 共256张表大V发件箱表(Feed_Outbox)
sql
CREATE TABLE feed_outbox (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL COMMENT '发布者ID',
post_id BIGINT NOT NULL COMMENT '动态ID',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
KEY idx_user_time (user_id, create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='大V发件箱表(拉模式)';4.3 Redis数据结构
redis
# 1. 用户收件箱(ZSet,按时间排序)
Key: feed:inbox:{user_id}
Score: timestamp(发布时间)
Value: post_id
TTL: 7天
# 示例
ZADD feed:inbox:10001 1699000001 20001
ZADD feed:inbox:10001 1699000002 20002
ZADD feed:inbox:10001 1699000003 20003
# 查询最新20条
ZREVRANGE feed:inbox:10001 0 19 WITHSCORES
# 2. 用户发件箱(ZSet)
Key: feed:outbox:{user_id}
Score: timestamp
Value: post_id
TTL: 30天
# 3. 动态详情缓存(String)
Key: post:{post_id}
Value: {post_json}
TTL: 1天
# 4. 用户是否为大V(String)
Key: user:is_big_v:{user_id}
Value: 1(是)/ 0(否)
TTL: 1小时
# 5. 用户粉丝列表(Set)
Key: user:fans:{user_id}
Value: {fan_user_id1, fan_user_id2, ...}
TTL: 1小时(小于1万粉丝才缓存)4.4 API设计
发布动态
http
POST /api/v1/posts
Content-Type: application/json
{
"content": "今天天气真好!",
"media_urls": ["https://cdn.example.com/img1.jpg"],
"post_type": 1
}
Response:
{
"code": 0,
"message": "success",
"data": {
"post_id": 20001,
"create_time": "2024-01-15 10:30:00"
}
}查询Feed流
http
GET /api/v1/feed?cursor=0&count=20
Response:
{
"code": 0,
"message": "success",
"data": {
"posts": [
{
"post_id": 20003,
"user_id": 10002,
"username": "张三",
"avatar": "https://cdn.example.com/avatar2.jpg",
"content": "分享一张照片",
"media_urls": ["https://cdn.example.com/photo1.jpg"],
"like_count": 100,
"comment_count": 20,
"create_time": "2024-01-15 10:35:00"
},
// ... 更多动态
],
"next_cursor": 20,
"has_more": true
}
}五、核心实现
5.1 Go实现
点击查看完整实现
go
package feed
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v8"
"gorm.io/gorm"
)
// FeedService Feed流服务
type FeedService struct {
db *gorm.DB
redis *redis.Client
kafkaClient *KafkaClient
}
// Post 动态
type Post struct {
ID int64 `gorm:"primary_key" json:"post_id"`
UserID int64 `gorm:"column:user_id" json:"user_id"`
Content string `gorm:"column:content" json:"content"`
MediaURLs string `gorm:"column:media_urls" json:"media_urls"` // JSON数组
PostType int8 `gorm:"column:post_type" json:"post_type"`
LikeCount int `gorm:"column:like_count" json:"like_count"`
CommentCount int `gorm:"column:comment_count" json:"comment_count"`
CreateTime time.Time `gorm:"column:create_time" json:"create_time"`
}
// FeedInbox Feed收件箱
type FeedInbox struct {
ID int64 `gorm:"primary_key"`
UserID int64 `gorm:"column:user_id"`
PostID int64 `gorm:"column:post_id"`
PostUserID int64 `gorm:"column:post_user_id"`
CreateTime time.Time `gorm:"column:create_time"`
}
// Follow 关注关系
type Follow struct {
UserID int64 `gorm:"column:user_id"`
FollowUserID int64 `gorm:"column:follow_user_id"`
}
const (
BigVFansThreshold = 10000 // 大V粉丝阈值
MaxFollowCount = 5000 // 最大关注数
)
// PublishPost 发布动态
func (s *FeedService) PublishPost(ctx context.Context, userID int64, content string, mediaURLs []string) (int64, error) {
// 1. 创建动态
post := &Post{
UserID: userID,
Content: content,
MediaURLs: toJSON(mediaURLs),
PostType: 1,
CreateTime: time.Now(),
}
err := s.db.Create(post).Error
if err != nil {
return 0, err
}
// 2. 判断是否为大V
isBigV, err := s.IsBigV(ctx, userID)
if err != nil {
return 0, err
}
// 3. 推送Feed
if isBigV {
// 大V:写发件箱(拉模式)
err = s.writeOutbox(ctx, userID, post.ID)
} else {
// 普通用户:推送到粉丝收件箱(推模式)
err = s.fanoutToInbox(ctx, userID, post.ID)
}
if err != nil {
return 0, err
}
return post.ID, nil
}
// IsBigV 判断是否为大V
func (s *FeedService) IsBigV(ctx context.Context, userID int64) (bool, error) {
// 1. 从Redis获取
cacheKey := fmt.Sprintf("user:is_big_v:%d", userID)
val, err := s.redis.Get(ctx, cacheKey).Result()
if err == nil {
return val == "1", nil
}
// 2. 查询数据库
var fansCount int64
err = s.db.Model(&Follow{}).
Where("follow_user_id = ?", userID).
Count(&fansCount).Error
if err != nil {
return false, err
}
isBigV := fansCount >= BigVFansThreshold
// 3. 写入Redis
if isBigV {
s.redis.Set(ctx, cacheKey, "1", time.Hour)
} else {
s.redis.Set(ctx, cacheKey, "0", time.Hour)
}
return isBigV, nil
}
// writeOutbox 写发件箱(大V)
func (s *FeedService) writeOutbox(ctx context.Context, userID, postID int64) error {
// 写Redis
cacheKey := fmt.Sprintf("feed:outbox:%d", userID)
score := float64(time.Now().Unix())
return s.redis.ZAdd(ctx, cacheKey, &redis.Z{
Score: score,
Member: postID,
}).Err()
}
// fanoutToInbox 推送到粉丝收件箱(普通用户)
func (s *FeedService) fanoutToInbox(ctx context.Context, userID, postID int64) error {
// 1. 查询粉丝列表
fans, err := s.getFans(ctx, userID)
if err != nil {
return err
}
// 2. 异步推送到Kafka(避免阻塞)
message := FeedMessage{
PostID: postID,
UserID: userID,
FanIDs: fans,
PubTime: time.Now(),
}
return s.kafkaClient.SendFeedMessage(message)
}
// ConsumeFeedMessage 消费Feed消息(Kafka Consumer)
func (s *FeedService) ConsumeFeedMessage(msg FeedMessage) error {
ctx := context.Background()
// 批量写入Redis
pipeline := s.redis.Pipeline()
score := float64(msg.PubTime.Unix())
for _, fanID := range msg.FanIDs {
cacheKey := fmt.Sprintf("feed:inbox:%d", fanID)
pipeline.ZAdd(ctx, cacheKey, &redis.Z{
Score: score,
Member: msg.PostID,
})
// 只保留最新1000条
pipeline.ZRemRangeByRank(ctx, cacheKey, 0, -1001)
// 设置过期时间7天
pipeline.Expire(ctx, cacheKey, 7*24*time.Hour)
}
_, err := pipeline.Exec(ctx)
return err
}
// getFans 获取粉丝列表
func (s *FeedService) getFans(ctx context.Context, userID int64) ([]int64, error) {
// 1. 尝试从Redis获取
cacheKey := fmt.Sprintf("user:fans:%d", userID)
members, err := s.redis.SMembers(ctx, cacheKey).Result()
if err == nil && len(members) > 0 {
var fans []int64
for _, m := range members {
id, _ := strconv.ParseInt(m, 10, 64)
fans = append(fans, id)
}
return fans, nil
}
// 2. 从数据库查询
var follows []Follow
err = s.db.Where("follow_user_id = ?", userID).
Find(&follows).Error
if err != nil {
return nil, err
}
var fans []int64
for _, f := range follows {
fans = append(fans, f.UserID)
}
// 3. 写入Redis(小于1万粉丝才缓存)
if len(fans) < BigVFansThreshold {
pipeline := s.redis.Pipeline()
for _, fanID := range fans {
pipeline.SAdd(ctx, cacheKey, fanID)
}
pipeline.Expire(ctx, cacheKey, time.Hour)
pipeline.Exec(ctx)
}
return fans, nil
}
// GetFeed 获取Feed流(推拉结合)
func (s *FeedService) GetFeed(ctx context.Context, userID int64, cursor, count int) ([]*Post, int, error) {
// 1. 获取关注列表
followings, err := s.getFollowings(ctx, userID)
if err != nil {
return nil, 0, err
}
// 2. 分离普通用户和大V
var normalUsers, bigVUsers []int64
for _, uid := range followings {
isBigV, _ := s.IsBigV(ctx, uid)
if isBigV {
bigVUsers = append(bigVUsers, uid)
} else {
normalUsers = append(normalUsers, uid)
}
}
// 3. 读取收件箱(推模式的动态)
inboxPostIDs, err := s.getInboxPosts(ctx, userID, cursor, count*2)
if err != nil {
return nil, 0, err
}
// 4. 读取大V发件箱(拉模式的动态)
outboxPostIDs, err := s.getOutboxPosts(ctx, bigVUsers, cursor, count)
if err != nil {
return nil, 0, err
}
// 5. 合并排序
allPostIDs := s.mergeAndSort(inboxPostIDs, outboxPostIDs, count)
// 6. 批量查询动态详情
posts, err := s.getPostsByIDs(ctx, allPostIDs)
if err != nil {
return nil, 0, err
}
// 7. 计算下一页游标
nextCursor := cursor + count
return posts, nextCursor, nil
}
// getFollowings 获取关注列表
func (s *FeedService) getFollowings(ctx context.Context, userID int64) ([]int64, error) {
var follows []Follow
err := s.db.Where("user_id = ?", userID).
Limit(MaxFollowCount).
Find(&follows).Error
if err != nil {
return nil, err
}
var followings []int64
for _, f := range follows {
followings = append(followings, f.FollowUserID)
}
return followings, nil
}
// getInboxPosts 获取收件箱动态
func (s *FeedService) getInboxPosts(ctx context.Context, userID int64, offset, count int) ([]int64, error) {
cacheKey := fmt.Sprintf("feed:inbox:%d", userID)
// 从Redis ZSet获取(按时间倒序)
members, err := s.redis.ZRevRange(ctx, cacheKey, int64(offset), int64(offset+count-1)).Result()
if err != nil {
return nil, err
}
var postIDs []int64
for _, m := range members {
id, _ := strconv.ParseInt(m, 10, 64)
postIDs = append(postIDs, id)
}
return postIDs, nil
}
// getOutboxPosts 获取大V发件箱动态
func (s *FeedService) getOutboxPosts(ctx context.Context, bigVUsers []int64, offset, count int) ([]int64, error) {
var allPostIDs []int64
// 查询每个大V的发件箱
for _, uid := range bigVUsers {
cacheKey := fmt.Sprintf("feed:outbox:%d", uid)
members, err := s.redis.ZRevRange(ctx, cacheKey, 0, int64(count-1)).Result()
if err != nil {
continue
}
for _, m := range members {
id, _ := strconv.ParseInt(m, 10, 64)
allPostIDs = append(allPostIDs, id)
}
}
return allPostIDs, nil
}
// mergeAndSort 合并排序
func (s *FeedService) mergeAndSort(inbox, outbox []int64, count int) []int64 {
// 简化实现:直接合并(实际应按时间戳排序)
merged := append(inbox, outbox...)
// 去重
seen := make(map[int64]bool)
var result []int64
for _, id := range merged {
if !seen[id] {
seen[id] = true
result = append(result, id)
if len(result) >= count {
break
}
}
}
return result
}
// getPostsByIDs 批量查询动态详情
func (s *FeedService) getPostsByIDs(ctx context.Context, postIDs []int64) ([]*Post, error) {
if len(postIDs) == 0 {
return []*Post{}, nil
}
// 1. 尝试从Redis批量获取
posts, missingIDs := s.getPostsFromCache(ctx, postIDs)
// 2. 从数据库查询缺失的
if len(missingIDs) > 0 {
var dbPosts []*Post
err := s.db.Where("id IN ?", missingIDs).Find(&dbPosts).Error
if err != nil {
return nil, err
}
// 写回缓存
for _, post := range dbPosts {
s.cachePost(ctx, post)
posts = append(posts, post)
}
}
return posts, nil
}
// getPostsFromCache 从缓存获取动态
func (s *FeedService) getPostsFromCache(ctx context.Context, postIDs []int64) ([]*Post, []int64) {
var posts []*Post
var missingIDs []int64
for _, id := range postIDs {
cacheKey := fmt.Sprintf("post:%d", id)
val, err := s.redis.Get(ctx, cacheKey).Result()
if err != nil {
missingIDs = append(missingIDs, id)
continue
}
var post Post
json.Unmarshal([]byte(val), &post)
posts = append(posts, &post)
}
return posts, missingIDs
}
// cachePost 缓存动态
func (s *FeedService) cachePost(ctx context.Context, post *Post) error {
cacheKey := fmt.Sprintf("post:%d", post.ID)
data, _ := json.Marshal(post)
return s.redis.Set(ctx, cacheKey, data, 24*time.Hour).Err()
}
// 辅助函数
func toJSON(arr []string) string {
data, _ := json.Marshal(arr)
return string(data)
}
type FeedMessage struct {
PostID int64
UserID int64
FanIDs []int64
PubTime time.Time
}
type KafkaClient struct {
// Kafka客户端实现
}
func (k *KafkaClient) SendFeedMessage(msg FeedMessage) error {
// 发送到Kafka
return nil
}5.2 Java实现
java
@Service
public class FeedService {
@Autowired
private PostMapper postMapper;
@Autowired
private FollowMapper followMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private KafkaTemplate<String, FeedMessage> kafkaTemplate;
private static final int BIG_V_THRESHOLD = 10000;
private static final int MAX_FOLLOW_COUNT = 5000;
/**
* 发布动态
*/
@Transactional
public Long publishPost(Long userId, String content, List<String> mediaUrls) {
// 1. 创建动态
Post post = new Post();
post.setUserId(userId);
post.setContent(content);
post.setMediaUrls(JSON.toJSONString(mediaUrls));
post.setPostType((byte) 1);
post.setCreateTime(new Date());
postMapper.insert(post);
// 2. 判断是否为大V
boolean isBigV = isBigV(userId);
// 3. 推送Feed
if (isBigV) {
// 大V:写发件箱
writeOutbox(userId, post.getId());
} else {
// 普通用户:推送到粉丝收件箱
fanoutToInbox(userId, post.getId());
}
return post.getId();
}
/**
* 判断是否为大V
*/
private boolean isBigV(Long userId) {
String cacheKey = "user:is_big_v:" + userId;
// 从Redis获取
String val = (String) redisTemplate.opsForValue().get(cacheKey);
if (val != null) {
return "1".equals(val);
}
// 查询数据库
Long fansCount = followMapper.countByFollowUserId(userId);
boolean isBigV = fansCount >= BIG_V_THRESHOLD;
// 写入Redis
redisTemplate.opsForValue().set(
cacheKey,
isBigV ? "1" : "0",
1,
TimeUnit.HOURS
);
return isBigV;
}
/**
* 写发件箱(大V)
*/
private void writeOutbox(Long userId, Long postId) {
String cacheKey = "feed:outbox:" + userId;
double score = System.currentTimeMillis() / 1000.0;
redisTemplate.opsForZSet().add(cacheKey, postId, score);
redisTemplate.expire(cacheKey, 30, TimeUnit.DAYS);
}
/**
* 推送到粉丝收件箱(普通用户)
*/
private void fanoutToInbox(Long userId, Long postId) {
// 查询粉丝列表
List<Long> fans = getFans(userId);
// 发送到Kafka异步处理
FeedMessage message = new FeedMessage();
message.setPostId(postId);
message.setUserId(userId);
message.setFanIds(fans);
message.setPubTime(new Date());
kafkaTemplate.send("feed_topic", message);
}
/**
* 消费Kafka消息(Consumer)
*/
@KafkaListener(topics = "feed_topic")
public void consumeFeedMessage(FeedMessage message) {
// 批量写入Redis
List<Long> fanIds = message.getFanIds();
double score = message.getPubTime().getTime() / 1000.0;
// 使用Pipeline批量操作
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) {
for (Long fanId : fanIds) {
String cacheKey = "feed:inbox:" + fanId;
operations.opsForZSet().add(
(K) cacheKey,
(V) message.getPostId(),
score
);
// 只保留最新1000条
operations.opsForZSet().removeRange((K) cacheKey, 0, -1001);
// 设置过期时间
operations.expire((K) cacheKey, 7, TimeUnit.DAYS);
}
return null;
}
});
}
/**
* 获取粉丝列表
*/
private List<Long> getFans(Long userId) {
String cacheKey = "user:fans:" + userId;
// 尝试从Redis获取
Set<Object> members = redisTemplate.opsForSet().members(cacheKey);
if (members != null && !members.isEmpty()) {
return members.stream()
.map(m -> Long.parseLong(m.toString()))
.collect(Collectors.toList());
}
// 从数据库查询
List<Long> fans = followMapper.findFansByUserId(userId);
// 写入Redis(小于1万粉丝才缓存)
if (fans.size() < BIG_V_THRESHOLD) {
redisTemplate.opsForSet().add(cacheKey, fans.toArray());
redisTemplate.expire(cacheKey, 1, TimeUnit.HOURS);
}
return fans;
}
/**
* 获取Feed流
*/
public FeedResponse getFeed(Long userId, Integer cursor, Integer count) {
// 1. 获取关注列表
List<Long> followings = getFollowings(userId);
// 2. 分离普通用户和大V
List<Long> normalUsers = new ArrayList<>();
List<Long> bigVUsers = new ArrayList<>();
for (Long uid : followings) {
if (isBigV(uid)) {
bigVUsers.add(uid);
} else {
normalUsers.add(uid);
}
}
// 3. 读取收件箱(推模式)
List<Long> inboxPostIds = getInboxPosts(userId, cursor, count * 2);
// 4. 读取大V发件箱(拉模式)
List<Long> outboxPostIds = getOutboxPosts(bigVUsers, cursor, count);
// 5. 合并排序
List<Long> allPostIds = mergeAndSort(inboxPostIds, outboxPostIds, count);
// 6. 批量查询动态详情
List<Post> posts = getPostsByIds(allPostIds);
// 7. 构造响应
FeedResponse response = new FeedResponse();
response.setPosts(posts);
response.setNextCursor(cursor + count);
response.setHasMore(posts.size() >= count);
return response;
}
/**
* 获取关注列表
*/
private List<Long> getFollowings(Long userId) {
return followMapper.findFollowingsByUserId(userId, MAX_FOLLOW_COUNT);
}
/**
* 获取收件箱动态
*/
private List<Long> getInboxPosts(Long userId, Integer offset, Integer count) {
String cacheKey = "feed:inbox:" + userId;
Set<Object> members = redisTemplate.opsForZSet()
.reverseRange(cacheKey, offset, offset + count - 1);
if (members == null) {
return Collections.emptyList();
}
return members.stream()
.map(m -> Long.parseLong(m.toString()))
.collect(Collectors.toList());
}
/**
* 获取大V发件箱动态
*/
private List<Long> getOutboxPosts(List<Long> bigVUsers, Integer offset, Integer count) {
List<Long> allPostIds = new ArrayList<>();
for (Long uid : bigVUsers) {
String cacheKey = "feed:outbox:" + uid;
Set<Object> members = redisTemplate.opsForZSet()
.reverseRange(cacheKey, 0, count - 1);
if (members != null) {
members.forEach(m -> allPostIds.add(Long.parseLong(m.toString())));
}
}
return allPostIds;
}
/**
* 合并排序
*/
private List<Long> mergeAndSort(List<Long> inbox, List<Long> outbox, Integer count) {
Set<Long> uniqueIds = new LinkedHashSet<>();
uniqueIds.addAll(inbox);
uniqueIds.addAll(outbox);
return uniqueIds.stream()
.limit(count)
.collect(Collectors.toList());
}
/**
* 批量查询动态详情
*/
private List<Post> getPostsByIds(List<Long> postIds) {
if (postIds.isEmpty()) {
return Collections.emptyList();
}
// 先从Redis批量获取
List<Post> posts = new ArrayList<>();
List<Long> missingIds = new ArrayList<>();
for (Long id : postIds) {
String cacheKey = "post:" + id;
String json = (String) redisTemplate.opsForValue().get(cacheKey);
if (json != null) {
posts.add(JSON.parseObject(json, Post.class));
} else {
missingIds.add(id);
}
}
// 从数据库查询缺失的
if (!missingIds.isEmpty()) {
List<Post> dbPosts = postMapper.findByIds(missingIds);
// 写回缓存
for (Post post : dbPosts) {
String cacheKey = "post:" + post.getId();
redisTemplate.opsForValue().set(
cacheKey,
JSON.toJSONString(post),
1,
TimeUnit.DAYS
);
posts.add(post);
}
}
return posts;
}
}java
@Data
public class FeedMessage {
private Long postId;
private Long userId;
private List<Long> fanIds;
private Date pubTime;
}java
@Data
public class FeedResponse {
private List<Post> posts;
private Integer nextCursor;
private Boolean hasMore;
}六、性能优化
6.1 缓存优化
多级缓存:
客户端缓存(本地) → CDN缓存 → Redis缓存 → 数据库缓存预热:
go
// 用户登录时预热Feed
func (s *FeedService) PreheatFeed(ctx context.Context, userID int64) error {
// 异步预加载前50条Feed到Redis
go func() {
s.GetFeed(ctx, userID, 0, 50)
}()
return nil
}热点数据:
go
// 缓存热门动态(点赞数>1万)
func (s *FeedService) CacheHotPosts() {
// 定时任务:每5分钟缓存热门动态
}6.2 数据库优化
分库分表:
sql
-- Feed收件箱按user_id分表(256张表)
-- 路由规则:user_id % 256
-- 动态表按时间分表(按月)
posts_202401
posts_202402
...读写分离:
写库(Master):发布动态
读库(Slave):查询Feed、动态详情索引优化:
sql
-- 收件箱表:联合索引
KEY idx_user_time (user_id, create_time)
-- 动态表:覆盖索引
KEY idx_id_time_count (id, create_time, like_count)6.3 性能数据
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 发布响应时间 | 2000ms | 300ms | 6.7x |
| Feed查询响应时间 | 500ms | 100ms | 5x |
| 写QPS | 1000 | 1万 | 10x |
| 读QPS | 1万 | 10万 | 10x |
| 存储成本 | 200TB | 100TB | 50% |
优化效果:
- ✅ 发布响应从2秒优化到300ms(Kafka异步)
- ✅ Feed查询从500ms优化到100ms(推拉结合+多级缓存)
- ✅ 支持亿级用户、百万级QPS
七、面试要点
7.1 常见追问
Q1: 推模式、拉模式、推拉结合如何选择?
A: 根据业务特点选择:
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 小规模社交 | 推模式 | 读取快,粉丝数少 |
| 有大V场景 | 推拉结合 ⭐ | 综合最优 |
| 大规模论坛 | 拉模式 | 写入快,不用考虑粉丝数 |
推拉结合最优:
- 普通用户(粉丝<1万):推模式(写时推送)
- 大V(粉丝>1万):拉模式(读时拉取)
Q2: 如何处理大V发布瞬间百万请求?
A: 多重优化:
- 异步推送:Kafka异步写收件箱
- 批量写入:Pipeline批量操作Redis
- 分批推送:分100批,每批1万粉丝
- 限流:控制推送速率
go
// 分批推送
func (s *FeedService) batchFanout(fans []int64, postID int64) {
batchSize := 10000
for i := 0; i < len(fans); i += batchSize {
end := i + batchSize
if end > len(fans) {
end = len(fans)
}
batch := fans[i:end]
s.kafkaClient.Send(batch, postID)
}
}Q3: 微博、Twitter、Instagram的Feed架构有何不同?
A:
| 产品 | 架构模式 | 特点 |
|---|---|---|
| 微博 | 推拉结合 | 普通用户推,大V拉 |
| 推拉结合 | 类似微博 | |
| 推模式为主 | 图片加载较慢,推模式保证实时性 | |
| 抖音 | 推荐算法 | Feed主要靠算法推荐,不完全依赖关注 |
Q4: 如何实现Feed去重?
A: 多种方案:
- 客户端去重:记录已读post_id,客户端过滤
- 服务端去重:合并时用Set去重
- 布隆过滤器:快速判断是否已展示
go
// 服务端去重
func (s *FeedService) dedup(postIDs []int64) []int64 {
seen := make(map[int64]bool)
var result []int64
for _, id := range postIDs {
if !seen[id] {
seen[id] = true
result = append(result, id)
}
}
return result
}Q5: 如何保证Feed不丢不重?
A:
不丢:
- Kafka消息持久化
- Redis + MySQL双写
- 定时对账补偿
不重:
- ZSet天然去重(score相同会覆盖)
- 客户端记录last_post_id
- 幂等性保证
Q6: 如何实现个性化排序?
A: 多因素加权:
go
// 个性化得分 = 时间分 × 0.3 + 热度分 × 0.4 + 亲密度分 × 0.3
func (s *FeedService) PersonalizedScore(post *Post, user *User) float64 {
// 时间分(越新越高)
timeScore := 1.0 / (1.0 + float64(time.Since(post.CreateTime).Hours())/24.0)
// 热度分(点赞+评论+转发)
heatScore := float64(post.LikeCount + post.CommentCount*2 + post.ForwardCount*3)
// 亲密度分(互动次数)
intimacyScore := s.getIntimacy(user.ID, post.UserID)
return timeScore*0.3 + heatScore*0.4 + intimacyScore*0.3
}Q7: 如何支持混合Feed(关注+推荐)?
A: 插入策略:
go
// 每5条关注内容插入1条推荐内容
func (s *FeedService) MixedFeed(followPosts, recPosts []*Post) []*Post {
var result []*Post
followIdx, recIdx := 0, 0
for followIdx < len(followPosts) || recIdx < len(recPosts) {
// 插入5条关注
for i := 0; i < 5 && followIdx < len(followPosts); i++ {
result = append(result, followPosts[followIdx])
followIdx++
}
// 插入1条推荐
if recIdx < len(recPosts) {
result = append(result, recPosts[recIdx])
recIdx++
}
}
return result
}7.2 扩展知识点
相关场景题:
相关技术文档:
八、总结
Feed流系统设计要点:
核心技术选型
架构模式:推拉结合 ⭐(综合最优)
- 普通用户:推模式(写扩散)
- 大V:拉模式(读扩散)
存储方案:
- Redis ZSet:收件箱/发件箱缓存
- MySQL:动态内容持久化
- HBase:海量历史数据
消息队列:
- Kafka:异步推送Feed
关键优化策略
性能优化:
- 多级缓存(本地+Redis+DB)
- 分库分表(按user_id/时间)
- 读写分离
- 批量操作
高可用:
- Kafka消息持久化
- Redis主从+哨兵
- MySQL主从+双写
- 降级策略
数据一致性:
- 最终一致性
- 定时对账
- 幂等性保证
面试重点
面试中要能说清楚:
- 架构选择:为什么选推拉结合?
- 存储设计:如何用Redis ZSet实现时间线?
- 性能优化:如何支撑亿级用户?
- 大V处理:如何处理百万粉丝的大V?
- 一致性保证:如何保证Feed不丢不重?
下一步学习:
- 实现一个简单的Feed流系统(支持发布、查看)
- 压测Feed系统(模拟百万用户)
- 研究微博/Twitter的技术分享
参考资料:
- Twitter的Snowflake时间线算法
- Instagram的Feed排序算法
- 微博的推拉结合架构演进
