如何设计限流系统
一、问题描述
1.1 业务背景
限流(Rate Limiting)是保护系统稳定性的重要手段,在以下场景中广泛应用:
API网关场景:
- 限制单个用户的请求频率(如每秒10次)
- 限制单个IP的请求频率(防DDoS攻击)
- 限制某个API接口的总QPS(保护后端服务)
业务场景:
- 短信验证码:每个手机号每天最多5条
- 评论发布:每个用户每分钟最多5条
- 文件上传:每个用户每小时最多100MB
系统保护:
- 数据库连接数限制
- 线程池任务数限制
- 消息队列消费速率限制
1.2 核心功能
- 限流判断:判断当前请求是否超过限制
- 多维度限流:支持按用户、IP、接口等多个维度限流
- 动态调整:支持动态修改限流阈值
- 统计监控:统计限流效果和被限流的请求数
- 分布式限流:支持分布式环境下的全局限流
1.3 技术挑战
算法选择:
- 固定窗口、滑动窗口、漏桶、令牌桶各有什么优缺点?
- 如何选择合适的算法?
性能要求:
- 限流判断本身不能成为性能瓶颈
- 要求毫秒级响应时间
- 支持百万级QPS
分布式场景:
- 如何在多台服务器间共享限流数据?
- 如何保证限流的准确性?
- 如何避免单点故障?
用户体验:
- 如何友好地告知用户被限流?
- 如何处理突发流量?
- 如何避免误杀正常请求?
1.4 面试考察点
- 算法理解:能否深入理解各种限流算法的原理?
- 工程实现:能否实现高性能的限流器?
- 分布式能力:能否设计分布式限流方案?
- 权衡思维:能否根据场景选择合适的方案?
二、需求分析
2.1 功能性需求
| 需求 | 描述 | 优先级 |
|---|---|---|
| FR1 | 支持固定窗口限流 | P0 |
| FR2 | 支持滑动窗口限流 | P1 |
| FR3 | 支持漏桶限流 | P1 |
| FR4 | 支持令牌桶限流 | P1 |
| FR5 | 支持多维度限流(用户、IP、接口) | P0 |
| FR6 | 支持分布式限流 | P0 |
| FR7 | 支持动态调整限流规则 | P1 |
| FR8 | 提供限流统计和监控 | P1 |
2.2 非功能性需求
性能需求:
- 限流判断延迟:<1ms(P99)
- 支持QPS:单机10万+,集群100万+
- 内存占用:<100MB
准确性需求:
- 单机限流:100%准确
- 分布式限流:>95%准确(可接受小误差)
可用性需求:
- 限流服务可用性:99.99%
- 限流失败时的降级策略
2.3 约束条件
- 时钟同步:分布式环境下时钟可能不同步
- 网络延迟:分布式限流有网络延迟
- 存储限制:不能存储无限的历史数据
2.4 边界场景
- 时间边界:固定窗口边界处的流量突刺
- 分布式时钟:不同机器时间不一致
- 突发流量:瞬间大量请求
- 恶意攻击:分布式刷请求绕过限流
三、技术选型
3.1 算法方案对比
方案A:固定窗口计数器
原理: 将时间划分为固定窗口(如1分钟),统计窗口内的请求数。
时间轴: |--窗口1--|--窗口2--|--窗口3--|
请求数: 5次 10次 3次实现:
python
import time
class FixedWindowRateLimiter:
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.counters = {} # {window_start: count}
def allow(self, key):
now = int(time.time())
window_start = now - (now % self.window_seconds)
# 清理过期窗口
expired_windows = [w for w in self.counters if w < window_start]
for w in expired_windows:
del self.counters[w]
# 检查当前窗口
count = self.counters.get(window_start, 0)
if count >= self.max_requests:
return False
self.counters[window_start] = count + 1
return True优点:
- 实现简单
- 内存占用小(只需存储当前窗口)
- 性能高(O(1)时间复杂度)
缺点:
- 临界问题:窗口边界处可能超过限制
- 例如:限制60秒100次,在0:59秒和1:00秒各100次,实际1秒内200次
- 突发流量处理差
- 精度不高
适用场景:对精度要求不高、实现简单优先
方案B:滑动窗口计数器
原理: 将时间窗口划分为多个小格子,统计滑动窗口内所有格子的请求总数。
窗口: |--1--|--2--|--3--|--4--|--5--|
| | | |当前 | |
统计: [10, 15, 20, 25, 30] = 100次实现:
python
import time
from collections import deque
class SlidingWindowRateLimiter:
def __init__(self, max_requests, window_seconds, slots=10):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.slots = slots
self.slot_duration = window_seconds / slots
self.counters = {} # {key: deque([timestamp])}
def allow(self, key):
now = time.time()
if key not in self.counters:
self.counters[key] = deque()
# 移除过期的时间戳
cutoff = now - self.window_seconds
while self.counters[key] and self.counters[key][0] < cutoff:
self.counters[key].popleft()
# 检查是否超限
if len(self.counters[key]) >= self.max_requests:
return False
self.counters[key].append(now)
return True优点:
- 解决了固定窗口的临界问题
- 限流更加平滑
- 精度更高
缺点:
- 内存占用较大(需存储每个请求的时间戳)
- 时间复杂度O(N)(N为窗口内请求数)
- 实现相对复杂
适用场景:精度要求较高、内存充足
方案C:漏桶算法(Leaky Bucket)
原理: 请求进入漏桶,漏桶以恒定速率流出请求,桶满则拒绝。
请求流入 ↓
┌─────────┐
│ 漏桶 │ 容量:burst
│ ▓▓▓▓▓ │
└────┬────┘
↓ 恒定速率流出实现:
go
package ratelimit
import (
"sync"
"time"
)
type LeakyBucket struct {
capacity int // 桶容量
rate float64 // 流出速率(个/秒)
water float64 // 当前水量
lastTime time.Time // 上次流出时间
mutex sync.Mutex
}
func NewLeakyBucket(capacity int, rate float64) *LeakyBucket {
return &LeakyBucket{
capacity: capacity,
rate: rate,
water: 0,
lastTime: time.Now(),
}
}
func (lb *LeakyBucket) Allow() bool {
lb.mutex.Lock()
defer lb.mutex.Unlock()
now := time.Now()
elapsed := now.Sub(lb.lastTime).Seconds()
lb.lastTime = now
// 按速率流出
lb.water -= elapsed * lb.rate
if lb.water < 0 {
lb.water = 0
}
// 检查是否还有空间
if lb.water >= float64(lb.capacity) {
return false
}
// 加入新请求
lb.water += 1
return true
}优点:
- 流量平滑:输出速率恒定
- 能够应对突发流量(桶的容量)
- 实现相对简单
缺点:
- 不能处理突发流量(一旦满了就拒绝)
- 需要队列存储请求
- 响应时间不均匀(后面的请求需要等待)
适用场景:需要平滑流量的场景(如视频播放、消息发送)
方案D:令牌桶算法(Token Bucket)
原理: 以恒定速率生成令牌放入桶中,请求需要获取令牌才能通过,桶满则令牌溢出。
令牌生成器(恒定速率)
↓
┌─────────┐
│ 令牌桶 │ 容量:burst
│ ●●●●● │
└────┬────┘
↓ 请求消费令牌实现:
go
package ratelimit
import (
"sync"
"time"
)
type TokenBucket struct {
capacity int // 桶容量
tokens float64 // 当前令牌数
refillRate float64 // 令牌生成速率(个/秒)
lastRefill time.Time // 上次补充令牌时间
mutex sync.Mutex
}
func NewTokenBucket(capacity int, refillRate float64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: float64(capacity),
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
return tb.AllowN(1)
}
func (tb *TokenBucket) AllowN(n int) bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
// 补充令牌
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
tb.tokens += elapsed * tb.refillRate
if tb.tokens > float64(tb.capacity) {
tb.tokens = float64(tb.capacity)
}
tb.lastRefill = now
// 检查令牌是否足够
if tb.tokens < float64(n) {
return false
}
// 消费令牌
tb.tokens -= float64(n)
return true
}优点:
- 允许突发流量:可以攒令牌,短时间内处理大量请求
- 实现简单
- 性能高(O(1))
- 应用广泛(Guava RateLimiter、Nginx等)
缺点:
- 不能保证流量绝对平滑
- 需要定期补充令牌
适用场景:大多数场景(推荐方案)
3.2 算法对比总结
| 算法 | 时间复杂度 | 空间复杂度 | 突发流量 | 平滑性 | 精度 | 实现难度 |
|---|---|---|---|---|---|---|
| 固定窗口 | O(1) | O(1) | 差 | 差 | 低 | 简单 |
| 滑动窗口 | O(N) | O(N) | 中 | 好 | 高 | 中 |
| 漏桶 | O(1) | O(N) | 差 | 最好 | 高 | 中 |
| 令牌桶 | O(1) | O(1) | 最好 | 中 | 高 | 简单 |
推荐方案:令牌桶(Token Bucket)
- 性能高、实现简单
- 支持突发流量
- 精度高、应用广泛
3.3 分布式限流方案
方案A:Redis + Lua脚本
实现:
lua
-- Redis Lua脚本:令牌桶限流
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
-- 获取当前状态
local token_key = key .. ":tokens"
local ts_key = key .. ":ts"
local tokens = tonumber(redis.call('get', token_key))
local last_ts = tonumber(redis.call('get', ts_key))
local now = tonumber(ARGV[4])
-- 初始化
if tokens == nil then
tokens = capacity
last_ts = now
end
-- 补充令牌
local elapsed = now - last_ts
local refill = math.floor(elapsed * rate)
tokens = math.min(capacity, tokens + refill)
-- 检查并消费令牌
if tokens >= requested then
tokens = tokens - requested
redis.call('set', token_key, tokens)
redis.call('set', ts_key, now)
redis.call('expire', token_key, 10)
redis.call('expire', ts_key, 10)
return 1
else
return 0
end优点:
- 原子性保证
- 性能高
- 易于扩展
缺点:
- 依赖Redis
- 网络延迟
方案B:本地限流 + 定期同步
思路:
- 每台服务器本地限流(配额 = 总配额 / 服务器数量)
- 定期(如每秒)向中心节点同步实际使用量
- 中心节点动态调整各服务器配额
优点:
- 性能最高(本地判断)
- 降低中心节点压力
缺点:
- 精度较低
- 实现复杂
3.4 技术栈清单
| 组件 | 技术选型 | 作用 |
|---|---|---|
| 单机限流 | Guava RateLimiter | 高性能令牌桶 |
| 分布式限流 | Redis + Lua | 分布式状态管理 |
| 限流中间件 | Sentinel / Hystrix | 成熟的限流框架 |
| API网关 | Nginx / Kong / Envoy | 网关层限流 |
| 监控 | Prometheus + Grafana | 限流效果监控 |
四、架构设计
4.1 系统架构图
mermaid
graph TB
subgraph 客户端层
A[用户请求]
end
subgraph 接入层
A --> B[Nginx/网关]
B --> C{限流检查}
end
subgraph 限流层
C --> D1[本地限流器]
C --> D2[Redis限流器]
D1 --> E{是否通过?}
D2 --> E
end
subgraph 业务层
E -->|是| F[业务服务]
E -->|否| G[返回429]
end
subgraph 配置层
H[限流配置中心] --> D1
H --> D2
end
subgraph 监控层
D1 --> I[Prometheus]
D2 --> I
I --> J[Grafana]
end4.2 核心流程
mermaid
sequenceDiagram
participant Client as 客户端
participant Gateway as 网关
participant Local as 本地限流器
participant Redis as Redis
participant Service as 业务服务
Client->>Gateway: 发送请求
Gateway->>Local: 本地限流检查
alt 本地限流通过
Local->>Redis: 分布式限流检查
alt 分布式限流通过
Redis-->>Local: 允许通过
Local-->>Gateway: 通过
Gateway->>Service: 转发请求
Service-->>Gateway: 返回响应
Gateway-->>Client: 返回结果
else 分布式限流拒绝
Redis-->>Local: 拒绝
Local-->>Gateway: 拒绝
Gateway-->>Client: 429 Too Many Requests
end
else 本地限流拒绝
Local-->>Gateway: 拒绝
Gateway-->>Client: 429 Too Many Requests
end4.3 数据结构设计
go
// 限流规则配置
type RateLimitRule struct {
Key string // 限流key(user:123, ip:1.2.3.4等)
Algorithm string // 算法类型(fixed_window, sliding_window, leaky_bucket, token_bucket)
Capacity int // 容量/阈值
Rate float64 // 速率(QPS)
Window time.Duration // 时间窗口
Distributed bool // 是否分布式限流
}
// 限流器接口
type RateLimiter interface {
Allow(key string) bool
AllowN(key string, n int) bool
GetRemaining(key string) int
}
// 限流统计
type RateLimitStats struct {
TotalRequests int64
AllowedRequests int64
RejectedRequests int64
AvgLatency time.Duration
}五、核心实现
5.1 单机令牌桶实现(Go语言)
点击查看完整实现
go
package ratelimit
import (
"sync"
"time"
)
// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {
capacity int // 桶容量
tokens float64 // 当前令牌数
refillRate float64 // 令牌生成速率(个/秒)
lastRefill time.Time // 上次补充时间
mutex sync.Mutex // 并发控制
}
// NewTokenBucketLimiter 创建令牌桶限流器
func NewTokenBucketLimiter(capacity int, refillRate float64) *TokenBucketLimiter {
return &TokenBucketLimiter{
capacity: capacity,
tokens: float64(capacity), // 初始满令牌
refillRate: refillRate,
lastRefill: time.Now(),
}
}
// Allow 判断是否允许通过(消费1个令牌)
func (tb *TokenBucketLimiter) Allow() bool {
return tb.AllowN(1)
}
// AllowN 判断是否允许通过(消费n个令牌)
func (tb *TokenBucketLimiter) AllowN(n int) bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
// 1. 补充令牌
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
// 根据时间和速率计算应补充的令牌数
tokensToAdd := elapsed * tb.refillRate
tb.tokens += tokensToAdd
// 令牌数不能超过容量
if tb.tokens > float64(tb.capacity) {
tb.tokens = float64(tb.capacity)
}
tb.lastRefill = now
// 2. 检查令牌是否足够
if tb.tokens < float64(n) {
return false // 令牌不足,拒绝请求
}
// 3. 消费令牌
tb.tokens -= float64(n)
return true
}
// GetRemaining 获取剩余令牌数
func (tb *TokenBucketLimiter) GetRemaining() int {
tb.mutex.Lock()
defer tb.mutex.Unlock()
// 先补充令牌
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
tokensToAdd := elapsed * tb.refillRate
tb.tokens += tokensToAdd
if tb.tokens > float64(tb.capacity) {
tb.tokens = float64(tb.capacity)
}
return int(tb.tokens)
}
// Wait 等待直到有可用令牌(阻塞式)
func (tb *TokenBucketLimiter) Wait(n int) time.Duration {
for {
if tb.AllowN(n) {
return 0
}
// 计算需要等待的时间
tb.mutex.Lock()
needed := float64(n) - tb.tokens
waitTime := time.Duration(needed/tb.refillRate) * time.Second
tb.mutex.Unlock()
time.Sleep(waitTime)
}
}5.2 分布式限流实现(Redis + Lua)
go
package ratelimit
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
// RedisRateLimiter Redis分布式限流器
type RedisRateLimiter struct {
client *redis.Client
capacity int
rate float64
}
// Lua脚本:令牌桶算法
const luaTokenBucketScript = `
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local token_key = key .. ":tokens"
local ts_key = key .. ":ts"
local tokens = tonumber(redis.call('get', token_key))
local last_ts = tonumber(redis.call('get', ts_key))
if tokens == nil then
tokens = capacity
last_ts = now
end
local elapsed = now - last_ts
local refill = elapsed * rate
tokens = math.min(capacity, tokens + refill)
if tokens >= requested then
tokens = tokens - requested
redis.call('set', token_key, tokens)
redis.call('set', ts_key, now)
redis.call('expire', token_key, 60)
redis.call('expire', ts_key, 60)
return 1
else
return 0
end
`
func NewRedisRateLimiter(client *redis.Client, capacity int, rate float64) *RedisRateLimiter {
return &RedisRateLimiter{
client: client,
capacity: capacity,
rate: rate,
}
}
func (r *RedisRateLimiter) Allow(ctx context.Context, key string) bool {
now := float64(time.Now().Unix())
result, err := r.client.Eval(ctx, luaTokenBucketScript,
[]string{key},
r.capacity, r.rate, 1, now,
).Int()
if err != nil {
// Redis失败时的降级策略:允许通过
return true
}
return result == 1
}java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class RedisRateLimiter {
private JedisPool jedisPool;
private int capacity;
private double rate;
private static final String LUA_SCRIPT =
"local key = KEYS[1]\n" +
"local capacity = tonumber(ARGV[1])\n" +
"local rate = tonumber(ARGV[2])\n" +
"local requested = tonumber(ARGV[3])\n" +
"local now = tonumber(ARGV[4])\n" +
"\n" +
"local token_key = key .. ':tokens'\n" +
"local ts_key = key .. ':ts'\n" +
"\n" +
"local tokens = tonumber(redis.call('get', token_key))\n" +
"local last_ts = tonumber(redis.call('get', ts_key))\n" +
"\n" +
"if tokens == nil then\n" +
" tokens = capacity\n" +
" last_ts = now\n" +
"end\n" +
"\n" +
"local elapsed = now - last_ts\n" +
"local refill = elapsed * rate\n" +
"tokens = math.min(capacity, tokens + refill)\n" +
"\n" +
"if tokens >= requested then\n" +
" tokens = tokens - requested\n" +
" redis.call('set', token_key, tokens)\n" +
" redis.call('set', ts_key, now)\n" +
" redis.call('expire', token_key, 60)\n" +
" redis.call('expire', ts_key, 60)\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
public RedisRateLimiter(JedisPool jedisPool, int capacity, double rate) {
this.jedisPool = jedisPool;
this.capacity = capacity;
this.rate = rate;
}
public boolean allow(String key) {
try (Jedis jedis = jedisPool.getResource()) {
long now = System.currentTimeMillis() / 1000;
Object result = jedis.eval(
LUA_SCRIPT,
1,
key,
String.valueOf(capacity),
String.valueOf(rate),
"1",
String.valueOf(now)
);
return result.equals(1L);
} catch (Exception e) {
// Redis失败降级:允许通过
return true;
}
}
}python
import redis
import time
class RedisRateLimiter:
"""Redis分布式限流器"""
LUA_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local token_key = key .. ':tokens'
local ts_key = key .. ':ts'
local tokens = tonumber(redis.call('get', token_key))
local last_ts = tonumber(redis.call('get', ts_key))
if not tokens then
tokens = capacity
last_ts = now
end
local elapsed = now - last_ts
local refill = elapsed * rate
tokens = math.min(capacity, tokens + refill)
if tokens >= requested then
tokens = tokens - requested
redis.call('set', token_key, tokens)
redis.call('set', ts_key, now)
redis.call('expire', token_key, 60)
redis.call('expire', ts_key, 60)
return 1
else
return 0
end
"""
def __init__(self, redis_client, capacity, rate):
self.redis = redis_client
self.capacity = capacity
self.rate = rate
self.script = self.redis.register_script(self.LUA_SCRIPT)
def allow(self, key):
"""判断是否允许通过"""
try:
now = int(time.time())
result = self.script(
keys=[key],
args=[self.capacity, self.rate, 1, now]
)
return result == 1
except Exception as e:
# Redis失败降级:允许通过
return True5.3 性能对比
| 实现方式 | QPS | 延迟(P99) | 内存占用 |
|---|---|---|---|
| 单机令牌桶 | 100万+ | <0.1ms | <1MB |
| Redis限流 | 10万+ | <2ms | <10MB |
| 本地+Redis | 50万+ | <0.5ms | <5MB |
六、性能优化
6.1 优化策略
本地缓存 + Redis组合
- 本地限流器快速拒绝明显超限的请求
- Redis限流器保证全局准确性
限流器预热
- 启动时预创建限流器对象
- 避免运行时频繁创建销毁
批量操作
- 批量检查多个key的限流状态
- 减少Redis往返次数
异步统计
- 限流统计异步进行
- 避免影响主流程性能
七、运维监控
7.1 监控指标
yaml
metrics:
# 业务指标
- rate_limit_total_requests: 总请求数
- rate_limit_allowed_requests: 允许通过的请求数
- rate_limit_rejected_requests: 被拒绝的请求数
- rate_limit_reject_rate: 拒绝率
# 性能指标
- rate_limit_check_latency_p50: 限流检查延迟P50
- rate_limit_check_latency_p99: 限流检查延迟P99
# Redis指标
- rate_limit_redis_errors: Redis错误数
- rate_limit_redis_latency: Redis延迟7.2 告警规则
- 拒绝率 > 50%(可能遭受攻击)
- 限流检查延迟P99 > 10ms
- Redis错误率 > 1%
八、面试要点
8.1 常见追问
Q1: 令牌桶和漏桶的区别?
A:
- 令牌桶:允许突发流量,令牌按速率生成
- 漏桶:平滑流量,请求按速率流出
选择:API网关用令牌桶(允许突发),视频播放用漏桶(平滑)
Q2: 分布式限流如何保证准确性?
A:
- 使用Redis + Lua脚本保证原子性
- 可以接受一定误差(如5%)
- 本地限流 + 定期同步可以提升性能
Q3: 如何防止限流器本身成为瓶颈?
A:
- 使用高性能的限流算法(令牌桶O(1))
- 本地限流优先,减少网络开销
- Redis集群分片,避免单点
8.2 扩展知识点
九、相关资源
9.1 相关技术栈
9.2 开源项目
- Guava RateLimiter - Google限流库
- Bucket4j - Java限流库
- go-rate - Go限流库
总结:限流系统是保护系统的第一道防线,核心在于选择合适的算法(推荐令牌桶)和实现高性能的限流器。面试中要能说清楚各种算法的原理、优缺点和适用场景,展现你对性能和权衡的理解。
