Skip to content

如何设计Feed流系统

一、问题描述

1.1 业务背景

Feed流(动态流、时间线)是社交产品的核心功能,用于展示用户关注的内容更新。

典型应用场景

  • 微博:关注的博主发布的微博
  • 朋友圈:好友发布的动态
  • 抖音:关注的创作者发布的视频
  • Twitter:关注的用户发布的推文
  • Instagram:关注的用户发布的照片

核心价值

  • 让用户及时看到关注对象的最新动态
  • 提高用户活跃度和留存率
  • 是社交产品的流量入口

1.2 核心功能

基础功能

  1. 发布动态:用户发布内容,推送给粉丝
  2. 查看Feed:用户查看关注对象的最新动态
  3. 排序展示:按时间、热度或个性化排序
  4. 分页加载:支持下拉刷新和上拉加载更多

进阶功能

  1. 已读标记:标记哪些动态已读
  2. 去重:同一动态不重复展示
  3. 混合Feed:关注+推荐内容混排
  4. 实时更新:有新动态时提示用户

1.3 技术挑战

读写不对称

  • 写少读多:一个人发布,千万人读取
  • 大V效应:百万粉丝的大V发布,瞬间千万次读取

数据量巨大

假设:1亿用户,平均关注100人,每人每天发10条
- 总Feed条目:1亿 × 100 × 10 = 1万亿条/天
- 存储:1万亿 × 100B ≈ 100TB/天

实时性要求

  • 发布后立即推送给粉丝
  • 用户打开App立即看到最新内容
  • 响应时间 <200ms

热点问题

  • 大V发布瞬间百万请求
  • 热门动态瞬间百万点赞

1.4 面试考察点

  • 架构选择:推模式 vs 拉模式 vs 推拉结合
  • 存储设计:如何存储海量Feed数据
  • 性能优化:如何支撑亿级用户
  • 一致性:如何保证Feed不丢不重
  • 排序算法:时间序 vs 热度 vs 个性化

二、需求分析

2.1 功能性需求

需求描述优先级
FR1用户发布动态,推送给所有粉丝P0
FR2用户查看关注人的最新动态P0
FR3Feed按时间倒序排列P0
FR4支持分页加载(每页20条)P0
FR5支持下拉刷新P1
FR6支持按热度排序P1
FR7混合推荐内容(关注+推荐)P2
FR8实时提示新动态P2

2.2 非功能性需求

性能需求

  • 发布响应时间:<500ms
  • Feed查询响应时间:<200ms
  • 支持QPS:写10万+,读100万+

可扩展性

  • 支持用户规模:10亿+
  • 支持日活用户:1亿+
  • 支持关注上限:5000人/用户

可用性

  • 系统可用性:99.9%
  • Feed延迟:<5秒

一致性

  • 最终一致性(允许短暂延迟)
  • 不丢动态、不重复

2.3 数据规模估算

假设

  • 总用户:10亿
  • 日活用户:1亿
  • 平均关注:100人
  • 日均发布:5条/人
  • Feed保留:30天

计算

日发布总量 = 1亿 × 5 = 5亿条
日Feed生成量 = 5亿 × 100(平均粉丝) = 500亿条

存储 = 500亿 × 100B × 30天 = 150TB

写QPS = 5亿 / 86400 ≈ 5787 QPS(峰值3万)
读QPS = 1亿用户 × 10次/天 / 86400 ≈ 11574 QPS(峰值10万)

三、技术选型

3.1 三种架构模式

推模式(写扩散 / Fan-out on Write)

原理

  • 用户发布动态时,立即推送给所有粉丝
  • 将动态写入每个粉丝的收件箱(Inbox)
  • 粉丝读取时直接从自己的收件箱读取

流程

1. 用户A发布动态
2. 查询A的所有粉丝(100万)
3. 将动态ID写入100万个粉丝的收件箱
4. 粉丝B查看Feed时,读取自己的收件箱

优点

  • ✅ 读取快:直接读自己的收件箱,无需join
  • ✅ 读写分离:写和读不冲突

缺点

  • ❌ 写入慢:大V发布需写入百万粉丝收件箱
  • ❌ 存储大:每个粉丝都要存一份

适用场景

  • 粉丝数量有限(<5000)
  • 发布不频繁
  • 对读取性能要求极高

拉模式(读扩散 / Fan-out on Read)

原理

  • 用户发布动态时,只写入自己的发件箱(Outbox)
  • 粉丝读取时,查询所有关注人的发件箱
  • 合并排序后返回

流程

1. 用户A发布动态
2. 将动态写入A的发件箱
3. 粉丝B查看Feed时
4. 查询B关注的所有人(100人)的发件箱
5. 合并排序返回

优点

  • ✅ 写入快:只写自己的发件箱
  • ✅ 存储小:每条动态只存一份

缺点

  • ❌ 读取慢:需要查询100人的发件箱并合并排序
  • ❌ 实时性差:需要实时查询

适用场景

  • 有大V用户(百万粉丝)
  • 对写入性能要求高
  • 读取可以牺牲一些性能

推拉结合(混合模式)

原理

  • 普通用户:推模式(粉丝<1万)
  • 大V用户:拉模式(粉丝>1万)
  • 读取时:推的直接读,拉的实时合并

流程

1. 普通用户发布:推送给所有粉丝收件箱
2. 大V发布:只写自己的发件箱
3. 粉丝读取:
   - 读取收件箱(推模式的动态)
   - 查询关注的大V发件箱(拉模式)
   - 合并排序返回

优点

  • ✅ 综合最优:兼顾读写性能
  • ✅ 灵活:根据用户类型选择策略

缺点

  • ❌ 复杂度高:需要维护两套逻辑

适用场景

  • 既有普通用户又有大V
  • 追求综合性能最优

3.2 推荐方案对比

维度推模式拉模式推拉结合
写性能慢(O(粉丝数))快(O(1))混合
读性能快(O(1))慢(O(关注数))混合
存储成本
实时性
复杂度
适用场景小规模大V综合最优 ⭐

推荐推拉结合(Twitter、微博采用)

3.3 技术栈

存储

  • MySQL:存储动态内容、用户关系
  • Redis:缓存收件箱、热点数据
  • HBase:存储海量Feed历史数据

消息队列

  • Kafka:异步推送Feed到粉丝收件箱

缓存

  • Redis:收件箱缓存、排序缓存

四、系统设计

4.1 架构图

mermaid
graph TB
    subgraph 客户端
        A[用户App]
    end
    
    subgraph 接入层
        B[API Gateway]
    end
    
    subgraph 业务层
        C[发布服务]
        D[Feed服务]
        E[关注服务]
    end
    
    subgraph 数据层
        F[MySQL<br/>动态表/关注表]
        G[Redis<br/>收件箱缓存]
        H[HBase<br/>历史Feed]
    end
    
    subgraph 消息队列
        I[Kafka<br/>Feed推送队列]
        J[Consumer<br/>Feed写入器]
    end
    
    A --> B
    B --> C
    B --> D
    
    C --> F
    C --> I
    
    I --> J
    J --> G
    J --> H
    
    D --> G
    D --> F
    D --> H
    
    E --> F

4.2 数据库设计

动态表(Posts)

sql
CREATE TABLE posts (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL COMMENT '发布者ID',
    content TEXT COMMENT '动态内容',
    media_urls JSON COMMENT '图片/视频URL',
    post_type TINYINT DEFAULT 1 COMMENT '类型 1原创 2转发',
    forward_id BIGINT COMMENT '转发的原动态ID',
    like_count INT DEFAULT 0 COMMENT '点赞数',
    comment_count INT DEFAULT 0 COMMENT '评论数',
    forward_count INT DEFAULT 0 COMMENT '转发数',
    is_deleted TINYINT DEFAULT 0 COMMENT '是否删除',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    KEY idx_user_time (user_id, create_time),
    KEY idx_create_time (create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='动态表';

关注关系表(Follows)

sql
CREATE TABLE follows (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL COMMENT '关注者ID',
    follow_user_id BIGINT NOT NULL COMMENT '被关注者ID',
    status TINYINT DEFAULT 1 COMMENT '状态 1关注中',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    UNIQUE KEY uk_user_follow (user_id, follow_user_id),
    KEY idx_follow_user (follow_user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='关注关系表';

Feed收件箱表(Feed_Inbox)

sql
CREATE TABLE feed_inbox (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL COMMENT '用户ID(收件人)',
    post_id BIGINT NOT NULL COMMENT '动态ID',
    post_user_id BIGINT NOT NULL COMMENT '动态发布者ID',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    UNIQUE KEY uk_user_post (user_id, post_id),
    KEY idx_user_time (user_id, create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='Feed收件箱表(推模式)';

-- 分表:按user_id % 256分表
CREATE TABLE feed_inbox_0 LIKE feed_inbox;
CREATE TABLE feed_inbox_1 LIKE feed_inbox;
-- ... 共256张表

大V发件箱表(Feed_Outbox)

sql
CREATE TABLE feed_outbox (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL COMMENT '发布者ID',
    post_id BIGINT NOT NULL COMMENT '动态ID',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    KEY idx_user_time (user_id, create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COMMENT='大V发件箱表(拉模式)';

4.3 Redis数据结构

redis
# 1. 用户收件箱(ZSet,按时间排序)
Key: feed:inbox:{user_id}
Score: timestamp(发布时间)
Value: post_id
TTL: 7天

# 示例
ZADD feed:inbox:10001 1699000001 20001
ZADD feed:inbox:10001 1699000002 20002
ZADD feed:inbox:10001 1699000003 20003

# 查询最新20条
ZREVRANGE feed:inbox:10001 0 19 WITHSCORES

# 2. 用户发件箱(ZSet)
Key: feed:outbox:{user_id}
Score: timestamp
Value: post_id
TTL: 30天

# 3. 动态详情缓存(String)
Key: post:{post_id}
Value: {post_json}
TTL: 1天

# 4. 用户是否为大V(String)
Key: user:is_big_v:{user_id}
Value: 1(是)/ 0(否)
TTL: 1小时

# 5. 用户粉丝列表(Set)
Key: user:fans:{user_id}
Value: {fan_user_id1, fan_user_id2, ...}
TTL: 1小时(小于1万粉丝才缓存)

4.4 API设计

发布动态

http
POST /api/v1/posts
Content-Type: application/json

{
  "content": "今天天气真好!",
  "media_urls": ["https://cdn.example.com/img1.jpg"],
  "post_type": 1
}

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "post_id": 20001,
    "create_time": "2024-01-15 10:30:00"
  }
}

查询Feed流

http
GET /api/v1/feed?cursor=0&count=20

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "posts": [
      {
        "post_id": 20003,
        "user_id": 10002,
        "username": "张三",
        "avatar": "https://cdn.example.com/avatar2.jpg",
        "content": "分享一张照片",
        "media_urls": ["https://cdn.example.com/photo1.jpg"],
        "like_count": 100,
        "comment_count": 20,
        "create_time": "2024-01-15 10:35:00"
      },
      // ... 更多动态
    ],
    "next_cursor": 20,
    "has_more": true
  }
}

五、核心实现

5.1 Go实现

点击查看完整实现
go
package feed

import (
    "context"
    "encoding/json"
    "fmt"
    "strconv"
    "time"
    
    "github.com/go-redis/redis/v8"
    "gorm.io/gorm"
)

// FeedService Feed流服务
type FeedService struct {
    db          *gorm.DB
    redis       *redis.Client
    kafkaClient *KafkaClient
}

// Post 动态
type Post struct {
    ID           int64     `gorm:"primary_key" json:"post_id"`
    UserID       int64     `gorm:"column:user_id" json:"user_id"`
    Content      string    `gorm:"column:content" json:"content"`
    MediaURLs    string    `gorm:"column:media_urls" json:"media_urls"` // JSON数组
    PostType     int8      `gorm:"column:post_type" json:"post_type"`
    LikeCount    int       `gorm:"column:like_count" json:"like_count"`
    CommentCount int       `gorm:"column:comment_count" json:"comment_count"`
    CreateTime   time.Time `gorm:"column:create_time" json:"create_time"`
}

// FeedInbox Feed收件箱
type FeedInbox struct {
    ID         int64     `gorm:"primary_key"`
    UserID     int64     `gorm:"column:user_id"`
    PostID     int64     `gorm:"column:post_id"`
    PostUserID int64     `gorm:"column:post_user_id"`
    CreateTime time.Time `gorm:"column:create_time"`
}

// Follow 关注关系
type Follow struct {
    UserID       int64 `gorm:"column:user_id"`
    FollowUserID int64 `gorm:"column:follow_user_id"`
}

const (
    BigVFansThreshold = 10000 // 大V粉丝阈值
    MaxFollowCount    = 5000  // 最大关注数
)

// PublishPost 发布动态
func (s *FeedService) PublishPost(ctx context.Context, userID int64, content string, mediaURLs []string) (int64, error) {
    // 1. 创建动态
    post := &Post{
        UserID:     userID,
        Content:    content,
        MediaURLs:  toJSON(mediaURLs),
        PostType:   1,
        CreateTime: time.Now(),
    }
    
    err := s.db.Create(post).Error
    if err != nil {
        return 0, err
    }
    
    // 2. 判断是否为大V
    isBigV, err := s.IsBigV(ctx, userID)
    if err != nil {
        return 0, err
    }
    
    // 3. 推送Feed
    if isBigV {
        // 大V:写发件箱(拉模式)
        err = s.writeOutbox(ctx, userID, post.ID)
    } else {
        // 普通用户:推送到粉丝收件箱(推模式)
        err = s.fanoutToInbox(ctx, userID, post.ID)
    }
    
    if err != nil {
        return 0, err
    }
    
    return post.ID, nil
}

// IsBigV 判断是否为大V
func (s *FeedService) IsBigV(ctx context.Context, userID int64) (bool, error) {
    // 1. 从Redis获取
    cacheKey := fmt.Sprintf("user:is_big_v:%d", userID)
    val, err := s.redis.Get(ctx, cacheKey).Result()
    if err == nil {
        return val == "1", nil
    }
    
    // 2. 查询数据库
    var fansCount int64
    err = s.db.Model(&Follow{}).
        Where("follow_user_id = ?", userID).
        Count(&fansCount).Error
    if err != nil {
        return false, err
    }
    
    isBigV := fansCount >= BigVFansThreshold
    
    // 3. 写入Redis
    if isBigV {
        s.redis.Set(ctx, cacheKey, "1", time.Hour)
    } else {
        s.redis.Set(ctx, cacheKey, "0", time.Hour)
    }
    
    return isBigV, nil
}

// writeOutbox 写发件箱(大V)
func (s *FeedService) writeOutbox(ctx context.Context, userID, postID int64) error {
    // 写Redis
    cacheKey := fmt.Sprintf("feed:outbox:%d", userID)
    score := float64(time.Now().Unix())
    
    return s.redis.ZAdd(ctx, cacheKey, &redis.Z{
        Score:  score,
        Member: postID,
    }).Err()
}

// fanoutToInbox 推送到粉丝收件箱(普通用户)
func (s *FeedService) fanoutToInbox(ctx context.Context, userID, postID int64) error {
    // 1. 查询粉丝列表
    fans, err := s.getFans(ctx, userID)
    if err != nil {
        return err
    }
    
    // 2. 异步推送到Kafka(避免阻塞)
    message := FeedMessage{
        PostID:  postID,
        UserID:  userID,
        FanIDs:  fans,
        PubTime: time.Now(),
    }
    
    return s.kafkaClient.SendFeedMessage(message)
}

// ConsumeFeedMessage 消费Feed消息(Kafka Consumer)
func (s *FeedService) ConsumeFeedMessage(msg FeedMessage) error {
    ctx := context.Background()
    
    // 批量写入Redis
    pipeline := s.redis.Pipeline()
    score := float64(msg.PubTime.Unix())
    
    for _, fanID := range msg.FanIDs {
        cacheKey := fmt.Sprintf("feed:inbox:%d", fanID)
        pipeline.ZAdd(ctx, cacheKey, &redis.Z{
            Score:  score,
            Member: msg.PostID,
        })
        // 只保留最新1000条
        pipeline.ZRemRangeByRank(ctx, cacheKey, 0, -1001)
        // 设置过期时间7天
        pipeline.Expire(ctx, cacheKey, 7*24*time.Hour)
    }
    
    _, err := pipeline.Exec(ctx)
    return err
}

// getFans 获取粉丝列表
func (s *FeedService) getFans(ctx context.Context, userID int64) ([]int64, error) {
    // 1. 尝试从Redis获取
    cacheKey := fmt.Sprintf("user:fans:%d", userID)
    members, err := s.redis.SMembers(ctx, cacheKey).Result()
    if err == nil && len(members) > 0 {
        var fans []int64
        for _, m := range members {
            id, _ := strconv.ParseInt(m, 10, 64)
            fans = append(fans, id)
        }
        return fans, nil
    }
    
    // 2. 从数据库查询
    var follows []Follow
    err = s.db.Where("follow_user_id = ?", userID).
        Find(&follows).Error
    if err != nil {
        return nil, err
    }
    
    var fans []int64
    for _, f := range follows {
        fans = append(fans, f.UserID)
    }
    
    // 3. 写入Redis(小于1万粉丝才缓存)
    if len(fans) < BigVFansThreshold {
        pipeline := s.redis.Pipeline()
        for _, fanID := range fans {
            pipeline.SAdd(ctx, cacheKey, fanID)
        }
        pipeline.Expire(ctx, cacheKey, time.Hour)
        pipeline.Exec(ctx)
    }
    
    return fans, nil
}

// GetFeed 获取Feed流(推拉结合)
func (s *FeedService) GetFeed(ctx context.Context, userID int64, cursor, count int) ([]*Post, int, error) {
    // 1. 获取关注列表
    followings, err := s.getFollowings(ctx, userID)
    if err != nil {
        return nil, 0, err
    }
    
    // 2. 分离普通用户和大V
    var normalUsers, bigVUsers []int64
    for _, uid := range followings {
        isBigV, _ := s.IsBigV(ctx, uid)
        if isBigV {
            bigVUsers = append(bigVUsers, uid)
        } else {
            normalUsers = append(normalUsers, uid)
        }
    }
    
    // 3. 读取收件箱(推模式的动态)
    inboxPostIDs, err := s.getInboxPosts(ctx, userID, cursor, count*2)
    if err != nil {
        return nil, 0, err
    }
    
    // 4. 读取大V发件箱(拉模式的动态)
    outboxPostIDs, err := s.getOutboxPosts(ctx, bigVUsers, cursor, count)
    if err != nil {
        return nil, 0, err
    }
    
    // 5. 合并排序
    allPostIDs := s.mergeAndSort(inboxPostIDs, outboxPostIDs, count)
    
    // 6. 批量查询动态详情
    posts, err := s.getPostsByIDs(ctx, allPostIDs)
    if err != nil {
        return nil, 0, err
    }
    
    // 7. 计算下一页游标
    nextCursor := cursor + count
    
    return posts, nextCursor, nil
}

// getFollowings 获取关注列表
func (s *FeedService) getFollowings(ctx context.Context, userID int64) ([]int64, error) {
    var follows []Follow
    err := s.db.Where("user_id = ?", userID).
        Limit(MaxFollowCount).
        Find(&follows).Error
    if err != nil {
        return nil, err
    }
    
    var followings []int64
    for _, f := range follows {
        followings = append(followings, f.FollowUserID)
    }
    
    return followings, nil
}

// getInboxPosts 获取收件箱动态
func (s *FeedService) getInboxPosts(ctx context.Context, userID int64, offset, count int) ([]int64, error) {
    cacheKey := fmt.Sprintf("feed:inbox:%d", userID)
    
    // 从Redis ZSet获取(按时间倒序)
    members, err := s.redis.ZRevRange(ctx, cacheKey, int64(offset), int64(offset+count-1)).Result()
    if err != nil {
        return nil, err
    }
    
    var postIDs []int64
    for _, m := range members {
        id, _ := strconv.ParseInt(m, 10, 64)
        postIDs = append(postIDs, id)
    }
    
    return postIDs, nil
}

// getOutboxPosts 获取大V发件箱动态
func (s *FeedService) getOutboxPosts(ctx context.Context, bigVUsers []int64, offset, count int) ([]int64, error) {
    var allPostIDs []int64
    
    // 查询每个大V的发件箱
    for _, uid := range bigVUsers {
        cacheKey := fmt.Sprintf("feed:outbox:%d", uid)
        members, err := s.redis.ZRevRange(ctx, cacheKey, 0, int64(count-1)).Result()
        if err != nil {
            continue
        }
        
        for _, m := range members {
            id, _ := strconv.ParseInt(m, 10, 64)
            allPostIDs = append(allPostIDs, id)
        }
    }
    
    return allPostIDs, nil
}

// mergeAndSort 合并排序
func (s *FeedService) mergeAndSort(inbox, outbox []int64, count int) []int64 {
    // 简化实现:直接合并(实际应按时间戳排序)
    merged := append(inbox, outbox...)
    
    // 去重
    seen := make(map[int64]bool)
    var result []int64
    for _, id := range merged {
        if !seen[id] {
            seen[id] = true
            result = append(result, id)
            if len(result) >= count {
                break
            }
        }
    }
    
    return result
}

// getPostsByIDs 批量查询动态详情
func (s *FeedService) getPostsByIDs(ctx context.Context, postIDs []int64) ([]*Post, error) {
    if len(postIDs) == 0 {
        return []*Post{}, nil
    }
    
    // 1. 尝试从Redis批量获取
    posts, missingIDs := s.getPostsFromCache(ctx, postIDs)
    
    // 2. 从数据库查询缺失的
    if len(missingIDs) > 0 {
        var dbPosts []*Post
        err := s.db.Where("id IN ?", missingIDs).Find(&dbPosts).Error
        if err != nil {
            return nil, err
        }
        
        // 写回缓存
        for _, post := range dbPosts {
            s.cachePost(ctx, post)
            posts = append(posts, post)
        }
    }
    
    return posts, nil
}

// getPostsFromCache 从缓存获取动态
func (s *FeedService) getPostsFromCache(ctx context.Context, postIDs []int64) ([]*Post, []int64) {
    var posts []*Post
    var missingIDs []int64
    
    for _, id := range postIDs {
        cacheKey := fmt.Sprintf("post:%d", id)
        val, err := s.redis.Get(ctx, cacheKey).Result()
        if err != nil {
            missingIDs = append(missingIDs, id)
            continue
        }
        
        var post Post
        json.Unmarshal([]byte(val), &post)
        posts = append(posts, &post)
    }
    
    return posts, missingIDs
}

// cachePost 缓存动态
func (s *FeedService) cachePost(ctx context.Context, post *Post) error {
    cacheKey := fmt.Sprintf("post:%d", post.ID)
    data, _ := json.Marshal(post)
    return s.redis.Set(ctx, cacheKey, data, 24*time.Hour).Err()
}

// 辅助函数
func toJSON(arr []string) string {
    data, _ := json.Marshal(arr)
    return string(data)
}

type FeedMessage struct {
    PostID  int64
    UserID  int64
    FanIDs  []int64
    PubTime time.Time
}

type KafkaClient struct {
    // Kafka客户端实现
}

func (k *KafkaClient) SendFeedMessage(msg FeedMessage) error {
    // 发送到Kafka
    return nil
}

5.2 Java实现

java
@Service
public class FeedService {
    
    @Autowired
    private PostMapper postMapper;
    
    @Autowired
    private FollowMapper followMapper;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private KafkaTemplate<String, FeedMessage> kafkaTemplate;
    
    private static final int BIG_V_THRESHOLD = 10000;
    private static final int MAX_FOLLOW_COUNT = 5000;
    
    /**
     * 发布动态
     */
    @Transactional
    public Long publishPost(Long userId, String content, List<String> mediaUrls) {
        // 1. 创建动态
        Post post = new Post();
        post.setUserId(userId);
        post.setContent(content);
        post.setMediaUrls(JSON.toJSONString(mediaUrls));
        post.setPostType((byte) 1);
        post.setCreateTime(new Date());
        
        postMapper.insert(post);
        
        // 2. 判断是否为大V
        boolean isBigV = isBigV(userId);
        
        // 3. 推送Feed
        if (isBigV) {
            // 大V:写发件箱
            writeOutbox(userId, post.getId());
        } else {
            // 普通用户:推送到粉丝收件箱
            fanoutToInbox(userId, post.getId());
        }
        
        return post.getId();
    }
    
    /**
     * 判断是否为大V
     */
    private boolean isBigV(Long userId) {
        String cacheKey = "user:is_big_v:" + userId;
        
        // 从Redis获取
        String val = (String) redisTemplate.opsForValue().get(cacheKey);
        if (val != null) {
            return "1".equals(val);
        }
        
        // 查询数据库
        Long fansCount = followMapper.countByFollowUserId(userId);
        boolean isBigV = fansCount >= BIG_V_THRESHOLD;
        
        // 写入Redis
        redisTemplate.opsForValue().set(
            cacheKey, 
            isBigV ? "1" : "0", 
            1, 
            TimeUnit.HOURS
        );
        
        return isBigV;
    }
    
    /**
     * 写发件箱(大V)
     */
    private void writeOutbox(Long userId, Long postId) {
        String cacheKey = "feed:outbox:" + userId;
        double score = System.currentTimeMillis() / 1000.0;
        
        redisTemplate.opsForZSet().add(cacheKey, postId, score);
        redisTemplate.expire(cacheKey, 30, TimeUnit.DAYS);
    }
    
    /**
     * 推送到粉丝收件箱(普通用户)
     */
    private void fanoutToInbox(Long userId, Long postId) {
        // 查询粉丝列表
        List<Long> fans = getFans(userId);
        
        // 发送到Kafka异步处理
        FeedMessage message = new FeedMessage();
        message.setPostId(postId);
        message.setUserId(userId);
        message.setFanIds(fans);
        message.setPubTime(new Date());
        
        kafkaTemplate.send("feed_topic", message);
    }
    
    /**
     * 消费Kafka消息(Consumer)
     */
    @KafkaListener(topics = "feed_topic")
    public void consumeFeedMessage(FeedMessage message) {
        // 批量写入Redis
        List<Long> fanIds = message.getFanIds();
        double score = message.getPubTime().getTime() / 1000.0;
        
        // 使用Pipeline批量操作
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> operations) {
                for (Long fanId : fanIds) {
                    String cacheKey = "feed:inbox:" + fanId;
                    
                    operations.opsForZSet().add(
                        (K) cacheKey, 
                        (V) message.getPostId(), 
                        score
                    );
                    
                    // 只保留最新1000条
                    operations.opsForZSet().removeRange((K) cacheKey, 0, -1001);
                    
                    // 设置过期时间
                    operations.expire((K) cacheKey, 7, TimeUnit.DAYS);
                }
                return null;
            }
        });
    }
    
    /**
     * 获取粉丝列表
     */
    private List<Long> getFans(Long userId) {
        String cacheKey = "user:fans:" + userId;
        
        // 尝试从Redis获取
        Set<Object> members = redisTemplate.opsForSet().members(cacheKey);
        if (members != null && !members.isEmpty()) {
            return members.stream()
                .map(m -> Long.parseLong(m.toString()))
                .collect(Collectors.toList());
        }
        
        // 从数据库查询
        List<Long> fans = followMapper.findFansByUserId(userId);
        
        // 写入Redis(小于1万粉丝才缓存)
        if (fans.size() < BIG_V_THRESHOLD) {
            redisTemplate.opsForSet().add(cacheKey, fans.toArray());
            redisTemplate.expire(cacheKey, 1, TimeUnit.HOURS);
        }
        
        return fans;
    }
    
    /**
     * 获取Feed流
     */
    public FeedResponse getFeed(Long userId, Integer cursor, Integer count) {
        // 1. 获取关注列表
        List<Long> followings = getFollowings(userId);
        
        // 2. 分离普通用户和大V
        List<Long> normalUsers = new ArrayList<>();
        List<Long> bigVUsers = new ArrayList<>();
        
        for (Long uid : followings) {
            if (isBigV(uid)) {
                bigVUsers.add(uid);
            } else {
                normalUsers.add(uid);
            }
        }
        
        // 3. 读取收件箱(推模式)
        List<Long> inboxPostIds = getInboxPosts(userId, cursor, count * 2);
        
        // 4. 读取大V发件箱(拉模式)
        List<Long> outboxPostIds = getOutboxPosts(bigVUsers, cursor, count);
        
        // 5. 合并排序
        List<Long> allPostIds = mergeAndSort(inboxPostIds, outboxPostIds, count);
        
        // 6. 批量查询动态详情
        List<Post> posts = getPostsByIds(allPostIds);
        
        // 7. 构造响应
        FeedResponse response = new FeedResponse();
        response.setPosts(posts);
        response.setNextCursor(cursor + count);
        response.setHasMore(posts.size() >= count);
        
        return response;
    }
    
    /**
     * 获取关注列表
     */
    private List<Long> getFollowings(Long userId) {
        return followMapper.findFollowingsByUserId(userId, MAX_FOLLOW_COUNT);
    }
    
    /**
     * 获取收件箱动态
     */
    private List<Long> getInboxPosts(Long userId, Integer offset, Integer count) {
        String cacheKey = "feed:inbox:" + userId;
        
        Set<Object> members = redisTemplate.opsForZSet()
            .reverseRange(cacheKey, offset, offset + count - 1);
        
        if (members == null) {
            return Collections.emptyList();
        }
        
        return members.stream()
            .map(m -> Long.parseLong(m.toString()))
            .collect(Collectors.toList());
    }
    
    /**
     * 获取大V发件箱动态
     */
    private List<Long> getOutboxPosts(List<Long> bigVUsers, Integer offset, Integer count) {
        List<Long> allPostIds = new ArrayList<>();
        
        for (Long uid : bigVUsers) {
            String cacheKey = "feed:outbox:" + uid;
            
            Set<Object> members = redisTemplate.opsForZSet()
                .reverseRange(cacheKey, 0, count - 1);
            
            if (members != null) {
                members.forEach(m -> allPostIds.add(Long.parseLong(m.toString())));
            }
        }
        
        return allPostIds;
    }
    
    /**
     * 合并排序
     */
    private List<Long> mergeAndSort(List<Long> inbox, List<Long> outbox, Integer count) {
        Set<Long> uniqueIds = new LinkedHashSet<>();
        uniqueIds.addAll(inbox);
        uniqueIds.addAll(outbox);
        
        return uniqueIds.stream()
            .limit(count)
            .collect(Collectors.toList());
    }
    
    /**
     * 批量查询动态详情
     */
    private List<Post> getPostsByIds(List<Long> postIds) {
        if (postIds.isEmpty()) {
            return Collections.emptyList();
        }
        
        // 先从Redis批量获取
        List<Post> posts = new ArrayList<>();
        List<Long> missingIds = new ArrayList<>();
        
        for (Long id : postIds) {
            String cacheKey = "post:" + id;
            String json = (String) redisTemplate.opsForValue().get(cacheKey);
            
            if (json != null) {
                posts.add(JSON.parseObject(json, Post.class));
            } else {
                missingIds.add(id);
            }
        }
        
        // 从数据库查询缺失的
        if (!missingIds.isEmpty()) {
            List<Post> dbPosts = postMapper.findByIds(missingIds);
            
            // 写回缓存
            for (Post post : dbPosts) {
                String cacheKey = "post:" + post.getId();
                redisTemplate.opsForValue().set(
                    cacheKey, 
                    JSON.toJSONString(post), 
                    1, 
                    TimeUnit.DAYS
                );
                posts.add(post);
            }
        }
        
        return posts;
    }
}
java
@Data
public class FeedMessage {
    private Long postId;
    private Long userId;
    private List<Long> fanIds;
    private Date pubTime;
}
java
@Data
public class FeedResponse {
    private List<Post> posts;
    private Integer nextCursor;
    private Boolean hasMore;
}

六、性能优化

6.1 缓存优化

多级缓存

客户端缓存(本地) → CDN缓存 → Redis缓存 → 数据库

缓存预热

go
// 用户登录时预热Feed
func (s *FeedService) PreheatFeed(ctx context.Context, userID int64) error {
    // 异步预加载前50条Feed到Redis
    go func() {
        s.GetFeed(ctx, userID, 0, 50)
    }()
    return nil
}

热点数据

go
// 缓存热门动态(点赞数>1万)
func (s *FeedService) CacheHotPosts() {
    // 定时任务:每5分钟缓存热门动态
}

6.2 数据库优化

分库分表

sql
-- Feed收件箱按user_id分表(256张表)
-- 路由规则:user_id % 256

-- 动态表按时间分表(按月)
posts_202401
posts_202402
...

读写分离

写库(Master):发布动态
读库(Slave):查询Feed、动态详情

索引优化

sql
-- 收件箱表:联合索引
KEY idx_user_time (user_id, create_time)

-- 动态表:覆盖索引
KEY idx_id_time_count (id, create_time, like_count)

6.3 性能数据

指标优化前优化后提升
发布响应时间2000ms300ms6.7x
Feed查询响应时间500ms100ms5x
写QPS10001万10x
读QPS1万10万10x
存储成本200TB100TB50%

优化效果

  • ✅ 发布响应从2秒优化到300ms(Kafka异步)
  • ✅ Feed查询从500ms优化到100ms(推拉结合+多级缓存)
  • ✅ 支持亿级用户、百万级QPS

七、面试要点

7.1 常见追问

Q1: 推模式、拉模式、推拉结合如何选择?

A: 根据业务特点选择:

场景推荐方案原因
小规模社交推模式读取快,粉丝数少
有大V场景推拉结合 ⭐综合最优
大规模论坛拉模式写入快,不用考虑粉丝数

推拉结合最优

  • 普通用户(粉丝<1万):推模式(写时推送)
  • 大V(粉丝>1万):拉模式(读时拉取)

Q2: 如何处理大V发布瞬间百万请求?

A: 多重优化:

  1. 异步推送:Kafka异步写收件箱
  2. 批量写入:Pipeline批量操作Redis
  3. 分批推送:分100批,每批1万粉丝
  4. 限流:控制推送速率
go
// 分批推送
func (s *FeedService) batchFanout(fans []int64, postID int64) {
    batchSize := 10000
    for i := 0; i < len(fans); i += batchSize {
        end := i + batchSize
        if end > len(fans) {
            end = len(fans)
        }
        batch := fans[i:end]
        s.kafkaClient.Send(batch, postID)
    }
}

Q3: 微博、Twitter、Instagram的Feed架构有何不同?

A:

产品架构模式特点
微博推拉结合普通用户推,大V拉
Twitter推拉结合类似微博
Instagram推模式为主图片加载较慢,推模式保证实时性
抖音推荐算法Feed主要靠算法推荐,不完全依赖关注

Q4: 如何实现Feed去重?

A: 多种方案:

  1. 客户端去重:记录已读post_id,客户端过滤
  2. 服务端去重:合并时用Set去重
  3. 布隆过滤器:快速判断是否已展示
go
// 服务端去重
func (s *FeedService) dedup(postIDs []int64) []int64 {
    seen := make(map[int64]bool)
    var result []int64
    for _, id := range postIDs {
        if !seen[id] {
            seen[id] = true
            result = append(result, id)
        }
    }
    return result
}

Q5: 如何保证Feed不丢不重?

A:

不丢

  • Kafka消息持久化
  • Redis + MySQL双写
  • 定时对账补偿

不重

  • ZSet天然去重(score相同会覆盖)
  • 客户端记录last_post_id
  • 幂等性保证

Q6: 如何实现个性化排序?

A: 多因素加权:

go
// 个性化得分 = 时间分 × 0.3 + 热度分 × 0.4 + 亲密度分 × 0.3
func (s *FeedService) PersonalizedScore(post *Post, user *User) float64 {
    // 时间分(越新越高)
    timeScore := 1.0 / (1.0 + float64(time.Since(post.CreateTime).Hours())/24.0)
    
    // 热度分(点赞+评论+转发)
    heatScore := float64(post.LikeCount + post.CommentCount*2 + post.ForwardCount*3)
    
    // 亲密度分(互动次数)
    intimacyScore := s.getIntimacy(user.ID, post.UserID)
    
    return timeScore*0.3 + heatScore*0.4 + intimacyScore*0.3
}

Q7: 如何支持混合Feed(关注+推荐)?

A: 插入策略:

go
// 每5条关注内容插入1条推荐内容
func (s *FeedService) MixedFeed(followPosts, recPosts []*Post) []*Post {
    var result []*Post
    followIdx, recIdx := 0, 0
    
    for followIdx < len(followPosts) || recIdx < len(recPosts) {
        // 插入5条关注
        for i := 0; i < 5 && followIdx < len(followPosts); i++ {
            result = append(result, followPosts[followIdx])
            followIdx++
        }
        
        // 插入1条推荐
        if recIdx < len(recPosts) {
            result = append(result, recPosts[recIdx])
            recIdx++
        }
    }
    
    return result
}

7.2 扩展知识点

相关场景题

相关技术文档

八、总结

Feed流系统设计要点:

核心技术选型

  1. 架构模式:推拉结合 ⭐(综合最优)

    • 普通用户:推模式(写扩散)
    • 大V:拉模式(读扩散)
  2. 存储方案

    • Redis ZSet:收件箱/发件箱缓存
    • MySQL:动态内容持久化
    • HBase:海量历史数据
  3. 消息队列

    • Kafka:异步推送Feed

关键优化策略

  1. 性能优化

    • 多级缓存(本地+Redis+DB)
    • 分库分表(按user_id/时间)
    • 读写分离
    • 批量操作
  2. 高可用

    • Kafka消息持久化
    • Redis主从+哨兵
    • MySQL主从+双写
    • 降级策略
  3. 数据一致性

    • 最终一致性
    • 定时对账
    • 幂等性保证

面试重点

面试中要能说清楚:

  1. 架构选择:为什么选推拉结合?
  2. 存储设计:如何用Redis ZSet实现时间线?
  3. 性能优化:如何支撑亿级用户?
  4. 大V处理:如何处理百万粉丝的大V?
  5. 一致性保证:如何保证Feed不丢不重?

下一步学习

  • 实现一个简单的Feed流系统(支持发布、查看)
  • 压测Feed系统(模拟百万用户)
  • 研究微博/Twitter的技术分享

参考资料

  • Twitter的Snowflake时间线算法
  • Instagram的Feed排序算法
  • 微博的推拉结合架构演进

正在精进