Skip to content

如何设计限流系统

一、问题描述

1.1 业务背景

限流(Rate Limiting)是保护系统稳定性的重要手段,在以下场景中广泛应用:

API网关场景

  • 限制单个用户的请求频率(如每秒10次)
  • 限制单个IP的请求频率(防DDoS攻击)
  • 限制某个API接口的总QPS(保护后端服务)

业务场景

  • 短信验证码:每个手机号每天最多5条
  • 评论发布:每个用户每分钟最多5条
  • 文件上传:每个用户每小时最多100MB

系统保护

  • 数据库连接数限制
  • 线程池任务数限制
  • 消息队列消费速率限制

1.2 核心功能

  1. 限流判断:判断当前请求是否超过限制
  2. 多维度限流:支持按用户、IP、接口等多个维度限流
  3. 动态调整:支持动态修改限流阈值
  4. 统计监控:统计限流效果和被限流的请求数
  5. 分布式限流:支持分布式环境下的全局限流

1.3 技术挑战

算法选择

  • 固定窗口、滑动窗口、漏桶、令牌桶各有什么优缺点?
  • 如何选择合适的算法?

性能要求

  • 限流判断本身不能成为性能瓶颈
  • 要求毫秒级响应时间
  • 支持百万级QPS

分布式场景

  • 如何在多台服务器间共享限流数据?
  • 如何保证限流的准确性?
  • 如何避免单点故障?

用户体验

  • 如何友好地告知用户被限流?
  • 如何处理突发流量?
  • 如何避免误杀正常请求?

1.4 面试考察点

  • 算法理解:能否深入理解各种限流算法的原理?
  • 工程实现:能否实现高性能的限流器?
  • 分布式能力:能否设计分布式限流方案?
  • 权衡思维:能否根据场景选择合适的方案?

二、需求分析

2.1 功能性需求

需求描述优先级
FR1支持固定窗口限流P0
FR2支持滑动窗口限流P1
FR3支持漏桶限流P1
FR4支持令牌桶限流P1
FR5支持多维度限流(用户、IP、接口)P0
FR6支持分布式限流P0
FR7支持动态调整限流规则P1
FR8提供限流统计和监控P1

2.2 非功能性需求

性能需求

  • 限流判断延迟:<1ms(P99)
  • 支持QPS:单机10万+,集群100万+
  • 内存占用:<100MB

准确性需求

  • 单机限流:100%准确
  • 分布式限流:>95%准确(可接受小误差)

可用性需求

  • 限流服务可用性:99.99%
  • 限流失败时的降级策略

2.3 约束条件

  1. 时钟同步:分布式环境下时钟可能不同步
  2. 网络延迟:分布式限流有网络延迟
  3. 存储限制:不能存储无限的历史数据

2.4 边界场景

  1. 时间边界:固定窗口边界处的流量突刺
  2. 分布式时钟:不同机器时间不一致
  3. 突发流量:瞬间大量请求
  4. 恶意攻击:分布式刷请求绕过限流

三、技术选型

3.1 算法方案对比

方案A:固定窗口计数器

原理: 将时间划分为固定窗口(如1分钟),统计窗口内的请求数。

时间轴: |--窗口1--|--窗口2--|--窗口3--|
请求数:    5次      10次      3次

实现

python
import time

class FixedWindowRateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.counters = {}  # {window_start: count}
    
    def allow(self, key):
        now = int(time.time())
        window_start = now - (now % self.window_seconds)
        
        # 清理过期窗口
        expired_windows = [w for w in self.counters if w < window_start]
        for w in expired_windows:
            del self.counters[w]
        
        # 检查当前窗口
        count = self.counters.get(window_start, 0)
        if count >= self.max_requests:
            return False
        
        self.counters[window_start] = count + 1
        return True

优点

  • 实现简单
  • 内存占用小(只需存储当前窗口)
  • 性能高(O(1)时间复杂度)

缺点

  • 临界问题:窗口边界处可能超过限制
    • 例如:限制60秒100次,在0:59秒和1:00秒各100次,实际1秒内200次
  • 突发流量处理差
  • 精度不高

适用场景:对精度要求不高、实现简单优先


方案B:滑动窗口计数器

原理: 将时间窗口划分为多个小格子,统计滑动窗口内所有格子的请求总数。

窗口: |--1--|--2--|--3--|--4--|--5--|
      |     |     |     |当前 |     |
统计: [10,  15,   20,   25,   30] = 100次

实现

python
import time
from collections import deque

class SlidingWindowRateLimiter:
    def __init__(self, max_requests, window_seconds, slots=10):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.slots = slots
        self.slot_duration = window_seconds / slots
        self.counters = {}  # {key: deque([timestamp])}
    
    def allow(self, key):
        now = time.time()
        
        if key not in self.counters:
            self.counters[key] = deque()
        
        # 移除过期的时间戳
        cutoff = now - self.window_seconds
        while self.counters[key] and self.counters[key][0] < cutoff:
            self.counters[key].popleft()
        
        # 检查是否超限
        if len(self.counters[key]) >= self.max_requests:
            return False
        
        self.counters[key].append(now)
        return True

优点

  • 解决了固定窗口的临界问题
  • 限流更加平滑
  • 精度更高

缺点

  • 内存占用较大(需存储每个请求的时间戳)
  • 时间复杂度O(N)(N为窗口内请求数)
  • 实现相对复杂

适用场景:精度要求较高、内存充足


方案C:漏桶算法(Leaky Bucket)

原理: 请求进入漏桶,漏桶以恒定速率流出请求,桶满则拒绝。

  请求流入 ↓
  ┌─────────┐
  │  漏桶   │ 容量:burst
  │  ▓▓▓▓▓  │
  └────┬────┘
       ↓ 恒定速率流出

实现

go
package ratelimit

import (
    "sync"
    "time"
)

type LeakyBucket struct {
    capacity  int        // 桶容量
    rate      float64    // 流出速率(个/秒)
    water     float64    // 当前水量
    lastTime  time.Time  // 上次流出时间
    mutex     sync.Mutex
}

func NewLeakyBucket(capacity int, rate float64) *LeakyBucket {
    return &LeakyBucket{
        capacity: capacity,
        rate:     rate,
        water:    0,
        lastTime: time.Now(),
    }
}

func (lb *LeakyBucket) Allow() bool {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(lb.lastTime).Seconds()
    lb.lastTime = now
    
    // 按速率流出
    lb.water -= elapsed * lb.rate
    if lb.water < 0 {
        lb.water = 0
    }
    
    // 检查是否还有空间
    if lb.water >= float64(lb.capacity) {
        return false
    }
    
    // 加入新请求
    lb.water += 1
    return true
}

优点

  • 流量平滑:输出速率恒定
  • 能够应对突发流量(桶的容量)
  • 实现相对简单

缺点

  • 不能处理突发流量(一旦满了就拒绝)
  • 需要队列存储请求
  • 响应时间不均匀(后面的请求需要等待)

适用场景:需要平滑流量的场景(如视频播放、消息发送)


方案D:令牌桶算法(Token Bucket)

原理: 以恒定速率生成令牌放入桶中,请求需要获取令牌才能通过,桶满则令牌溢出。

   令牌生成器(恒定速率)

  ┌─────────┐
  │ 令牌桶  │ 容量:burst
  │ ●●●●●   │
  └────┬────┘
       ↓ 请求消费令牌

实现

go
package ratelimit

import (
    "sync"
    "time"
)

type TokenBucket struct {
    capacity     int        // 桶容量
    tokens       float64    // 当前令牌数
    refillRate   float64    // 令牌生成速率(个/秒)
    lastRefill   time.Time  // 上次补充令牌时间
    mutex        sync.Mutex
}

func NewTokenBucket(capacity int, refillRate float64) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     float64(capacity),
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    return tb.AllowN(1)
}

func (tb *TokenBucket) AllowN(n int) bool {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    
    // 补充令牌
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    tb.tokens += elapsed * tb.refillRate
    if tb.tokens > float64(tb.capacity) {
        tb.tokens = float64(tb.capacity)
    }
    tb.lastRefill = now
    
    // 检查令牌是否足够
    if tb.tokens < float64(n) {
        return false
    }
    
    // 消费令牌
    tb.tokens -= float64(n)
    return true
}

优点

  • 允许突发流量:可以攒令牌,短时间内处理大量请求
  • 实现简单
  • 性能高(O(1))
  • 应用广泛(Guava RateLimiter、Nginx等)

缺点

  • 不能保证流量绝对平滑
  • 需要定期补充令牌

适用场景:大多数场景(推荐方案)


3.2 算法对比总结

算法时间复杂度空间复杂度突发流量平滑性精度实现难度
固定窗口O(1)O(1)简单
滑动窗口O(N)O(N)
漏桶O(1)O(N)最好
令牌桶O(1)O(1)最好简单

推荐方案:令牌桶(Token Bucket)

  • 性能高、实现简单
  • 支持突发流量
  • 精度高、应用广泛

3.3 分布式限流方案

方案A:Redis + Lua脚本

实现

lua
-- Redis Lua脚本:令牌桶限流
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])

-- 获取当前状态
local token_key = key .. ":tokens"
local ts_key = key .. ":ts"

local tokens = tonumber(redis.call('get', token_key))
local last_ts = tonumber(redis.call('get', ts_key))
local now = tonumber(ARGV[4])

-- 初始化
if tokens == nil then
    tokens = capacity
    last_ts = now
end

-- 补充令牌
local elapsed = now - last_ts
local refill = math.floor(elapsed * rate)
tokens = math.min(capacity, tokens + refill)

-- 检查并消费令牌
if tokens >= requested then
    tokens = tokens - requested
    redis.call('set', token_key, tokens)
    redis.call('set', ts_key, now)
    redis.call('expire', token_key, 10)
    redis.call('expire', ts_key, 10)
    return 1
else
    return 0
end

优点

  • 原子性保证
  • 性能高
  • 易于扩展

缺点

  • 依赖Redis
  • 网络延迟

方案B:本地限流 + 定期同步

思路

  1. 每台服务器本地限流(配额 = 总配额 / 服务器数量)
  2. 定期(如每秒)向中心节点同步实际使用量
  3. 中心节点动态调整各服务器配额

优点

  • 性能最高(本地判断)
  • 降低中心节点压力

缺点

  • 精度较低
  • 实现复杂

3.4 技术栈清单

组件技术选型作用
单机限流Guava RateLimiter高性能令牌桶
分布式限流Redis + Lua分布式状态管理
限流中间件Sentinel / Hystrix成熟的限流框架
API网关Nginx / Kong / Envoy网关层限流
监控Prometheus + Grafana限流效果监控

四、架构设计

4.1 系统架构图

mermaid
graph TB
    subgraph 客户端层
        A[用户请求]
    end
    
    subgraph 接入层
        A --> B[Nginx/网关]
        B --> C{限流检查}
    end
    
    subgraph 限流层
        C --> D1[本地限流器]
        C --> D2[Redis限流器]
        D1 --> E{是否通过?}
        D2 --> E
    end
    
    subgraph 业务层
        E -->|是| F[业务服务]
        E -->|否| G[返回429]
    end
    
    subgraph 配置层
        H[限流配置中心] --> D1
        H --> D2
    end
    
    subgraph 监控层
        D1 --> I[Prometheus]
        D2 --> I
        I --> J[Grafana]
    end

4.2 核心流程

mermaid
sequenceDiagram
    participant Client as 客户端
    participant Gateway as 网关
    participant Local as 本地限流器
    participant Redis as Redis
    participant Service as 业务服务
    
    Client->>Gateway: 发送请求
    Gateway->>Local: 本地限流检查
    alt 本地限流通过
        Local->>Redis: 分布式限流检查
        alt 分布式限流通过
            Redis-->>Local: 允许通过
            Local-->>Gateway: 通过
            Gateway->>Service: 转发请求
            Service-->>Gateway: 返回响应
            Gateway-->>Client: 返回结果
        else 分布式限流拒绝
            Redis-->>Local: 拒绝
            Local-->>Gateway: 拒绝
            Gateway-->>Client: 429 Too Many Requests
        end
    else 本地限流拒绝
        Local-->>Gateway: 拒绝
        Gateway-->>Client: 429 Too Many Requests
    end

4.3 数据结构设计

go
// 限流规则配置
type RateLimitRule struct {
    Key         string        // 限流key(user:123, ip:1.2.3.4等)
    Algorithm   string        // 算法类型(fixed_window, sliding_window, leaky_bucket, token_bucket)
    Capacity    int           // 容量/阈值
    Rate        float64       // 速率(QPS)
    Window      time.Duration // 时间窗口
    Distributed bool          // 是否分布式限流
}

// 限流器接口
type RateLimiter interface {
    Allow(key string) bool
    AllowN(key string, n int) bool
    GetRemaining(key string) int
}

// 限流统计
type RateLimitStats struct {
    TotalRequests   int64
    AllowedRequests int64
    RejectedRequests int64
    AvgLatency      time.Duration
}

五、核心实现

5.1 单机令牌桶实现(Go语言)

点击查看完整实现
go
package ratelimit

import (
    "sync"
    "time"
)

// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
    capacity     int           // 桶容量
    tokens       float64       // 当前令牌数
    refillRate   float64       // 令牌生成速率(个/秒)
    lastRefill   time.Time     // 上次补充时间
    mutex        sync.Mutex    // 并发控制
}

// NewTokenBucketLimiter 创建令牌桶限流器
func NewTokenBucketLimiter(capacity int, refillRate float64) *TokenBucketLimiter {
    return &TokenBucketLimiter{
        capacity:   capacity,
        tokens:     float64(capacity), // 初始满令牌
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

// Allow 判断是否允许通过(消费1个令牌)
func (tb *TokenBucketLimiter) Allow() bool {
    return tb.AllowN(1)
}

// AllowN 判断是否允许通过(消费n个令牌)
func (tb *TokenBucketLimiter) AllowN(n int) bool {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    
    // 1. 补充令牌
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    
    // 根据时间和速率计算应补充的令牌数
    tokensToAdd := elapsed * tb.refillRate
    tb.tokens += tokensToAdd
    
    // 令牌数不能超过容量
    if tb.tokens > float64(tb.capacity) {
        tb.tokens = float64(tb.capacity)
    }
    
    tb.lastRefill = now
    
    // 2. 检查令牌是否足够
    if tb.tokens < float64(n) {
        return false // 令牌不足,拒绝请求
    }
    
    // 3. 消费令牌
    tb.tokens -= float64(n)
    return true
}

// GetRemaining 获取剩余令牌数
func (tb *TokenBucketLimiter) GetRemaining() int {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    
    // 先补充令牌
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    tokensToAdd := elapsed * tb.refillRate
    tb.tokens += tokensToAdd
    
    if tb.tokens > float64(tb.capacity) {
        tb.tokens = float64(tb.capacity)
    }
    
    return int(tb.tokens)
}

// Wait 等待直到有可用令牌(阻塞式)
func (tb *TokenBucketLimiter) Wait(n int) time.Duration {
    for {
        if tb.AllowN(n) {
            return 0
        }
        
        // 计算需要等待的时间
        tb.mutex.Lock()
        needed := float64(n) - tb.tokens
        waitTime := time.Duration(needed/tb.refillRate) * time.Second
        tb.mutex.Unlock()
        
        time.Sleep(waitTime)
    }
}

5.2 分布式限流实现(Redis + Lua)

go
package ratelimit

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "time"
)

// RedisRateLimiter Redis分布式限流器
type RedisRateLimiter struct {
    client   *redis.Client
    capacity int
    rate     float64
}

// Lua脚本:令牌桶算法
const luaTokenBucketScript = `
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local token_key = key .. ":tokens"
local ts_key = key .. ":ts"

local tokens = tonumber(redis.call('get', token_key))
local last_ts = tonumber(redis.call('get', ts_key))

if tokens == nil then
    tokens = capacity
    last_ts = now
end

local elapsed = now - last_ts
local refill = elapsed * rate
tokens = math.min(capacity, tokens + refill)

if tokens >= requested then
    tokens = tokens - requested
    redis.call('set', token_key, tokens)
    redis.call('set', ts_key, now)
    redis.call('expire', token_key, 60)
    redis.call('expire', ts_key, 60)
    return 1
else
    return 0
end
`

func NewRedisRateLimiter(client *redis.Client, capacity int, rate float64) *RedisRateLimiter {
    return &RedisRateLimiter{
        client:   client,
        capacity: capacity,
        rate:     rate,
    }
}

func (r *RedisRateLimiter) Allow(ctx context.Context, key string) bool {
    now := float64(time.Now().Unix())
    
    result, err := r.client.Eval(ctx, luaTokenBucketScript,
        []string{key},
        r.capacity, r.rate, 1, now,
    ).Int()
    
    if err != nil {
        // Redis失败时的降级策略:允许通过
        return true
    }
    
    return result == 1
}
java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class RedisRateLimiter {
    
    private JedisPool jedisPool;
    private int capacity;
    private double rate;
    
    private static final String LUA_SCRIPT = 
        "local key = KEYS[1]\n" +
        "local capacity = tonumber(ARGV[1])\n" +
        "local rate = tonumber(ARGV[2])\n" +
        "local requested = tonumber(ARGV[3])\n" +
        "local now = tonumber(ARGV[4])\n" +
        "\n" +
        "local token_key = key .. ':tokens'\n" +
        "local ts_key = key .. ':ts'\n" +
        "\n" +
        "local tokens = tonumber(redis.call('get', token_key))\n" +
        "local last_ts = tonumber(redis.call('get', ts_key))\n" +
        "\n" +
        "if tokens == nil then\n" +
        "    tokens = capacity\n" +
        "    last_ts = now\n" +
        "end\n" +
        "\n" +
        "local elapsed = now - last_ts\n" +
        "local refill = elapsed * rate\n" +
        "tokens = math.min(capacity, tokens + refill)\n" +
        "\n" +
        "if tokens >= requested then\n" +
        "    tokens = tokens - requested\n" +
        "    redis.call('set', token_key, tokens)\n" +
        "    redis.call('set', ts_key, now)\n" +
        "    redis.call('expire', token_key, 60)\n" +
        "    redis.call('expire', ts_key, 60)\n" +
        "    return 1\n" +
        "else\n" +
        "    return 0\n" +
        "end";
    
    public RedisRateLimiter(JedisPool jedisPool, int capacity, double rate) {
        this.jedisPool = jedisPool;
        this.capacity = capacity;
        this.rate = rate;
    }
    
    public boolean allow(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            long now = System.currentTimeMillis() / 1000;
            
            Object result = jedis.eval(
                LUA_SCRIPT,
                1,
                key,
                String.valueOf(capacity),
                String.valueOf(rate),
                "1",
                String.valueOf(now)
            );
            
            return result.equals(1L);
        } catch (Exception e) {
            // Redis失败降级:允许通过
            return true;
        }
    }
}
python
import redis
import time

class RedisRateLimiter:
    """Redis分布式限流器"""
    
    LUA_SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local rate = tonumber(ARGV[2])
    local requested = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])
    
    local token_key = key .. ':tokens'
    local ts_key = key .. ':ts'
    
    local tokens = tonumber(redis.call('get', token_key))
    local last_ts = tonumber(redis.call('get', ts_key))
    
    if not tokens then
        tokens = capacity
        last_ts = now
    end
    
    local elapsed = now - last_ts
    local refill = elapsed * rate
    tokens = math.min(capacity, tokens + refill)
    
    if tokens >= requested then
        tokens = tokens - requested
        redis.call('set', token_key, tokens)
        redis.call('set', ts_key, now)
        redis.call('expire', token_key, 60)
        redis.call('expire', ts_key, 60)
        return 1
    else
        return 0
    end
    """
    
    def __init__(self, redis_client, capacity, rate):
        self.redis = redis_client
        self.capacity = capacity
        self.rate = rate
        self.script = self.redis.register_script(self.LUA_SCRIPT)
    
    def allow(self, key):
        """判断是否允许通过"""
        try:
            now = int(time.time())
            result = self.script(
                keys=[key],
                args=[self.capacity, self.rate, 1, now]
            )
            return result == 1
        except Exception as e:
            # Redis失败降级:允许通过
            return True

5.3 性能对比

实现方式QPS延迟(P99)内存占用
单机令牌桶100万+<0.1ms<1MB
Redis限流10万+<2ms<10MB
本地+Redis50万+<0.5ms<5MB

六、性能优化

6.1 优化策略

  1. 本地缓存 + Redis组合

    • 本地限流器快速拒绝明显超限的请求
    • Redis限流器保证全局准确性
  2. 限流器预热

    • 启动时预创建限流器对象
    • 避免运行时频繁创建销毁
  3. 批量操作

    • 批量检查多个key的限流状态
    • 减少Redis往返次数
  4. 异步统计

    • 限流统计异步进行
    • 避免影响主流程性能

七、运维监控

7.1 监控指标

yaml
metrics:
  # 业务指标
  - rate_limit_total_requests: 总请求数
  - rate_limit_allowed_requests: 允许通过的请求数
  - rate_limit_rejected_requests: 被拒绝的请求数
  - rate_limit_reject_rate: 拒绝率
  
  # 性能指标
  - rate_limit_check_latency_p50: 限流检查延迟P50
  - rate_limit_check_latency_p99: 限流检查延迟P99
  
  # Redis指标
  - rate_limit_redis_errors: Redis错误数
  - rate_limit_redis_latency: Redis延迟

7.2 告警规则

  • 拒绝率 > 50%(可能遭受攻击)
  • 限流检查延迟P99 > 10ms
  • Redis错误率 > 1%

八、面试要点

8.1 常见追问

Q1: 令牌桶和漏桶的区别?

A:

  • 令牌桶:允许突发流量,令牌按速率生成
  • 漏桶:平滑流量,请求按速率流出

选择:API网关用令牌桶(允许突发),视频播放用漏桶(平滑)

Q2: 分布式限流如何保证准确性?

A:

  • 使用Redis + Lua脚本保证原子性
  • 可以接受一定误差(如5%)
  • 本地限流 + 定期同步可以提升性能

Q3: 如何防止限流器本身成为瓶颈?

A:

  • 使用高性能的限流算法(令牌桶O(1))
  • 本地限流优先,减少网络开销
  • Redis集群分片,避免单点

8.2 扩展知识点

九、相关资源

9.1 相关技术栈

9.2 开源项目


总结:限流系统是保护系统的第一道防线,核心在于选择合适的算法(推荐令牌桶)和实现高性能的限流器。面试中要能说清楚各种算法的原理、优缺点和适用场景,展现你对性能和权衡的理解。

正在精进