如何设计分布式锁
一、问题描述
1.1 业务背景
在分布式系统中,多个节点可能同时访问共享资源,需要通过分布式锁来保证互斥访问。
典型场景:
- 库存扣减:秒杀时防止超卖
- 订单号生成:保证订单号唯一
- 定时任务:防止多个节点重复执行
- 缓存更新:防止缓存击穿
1.2 核心需求
必须满足:
- 互斥性:同一时刻只有一个客户端持有锁
- 防死锁:锁最终一定会被释放
- 容错性:即使持锁客户端宕机,锁也能释放
最好满足: 4. 可重入:同一客户端可以多次获取锁 5. 高性能:加锁解锁速度快 6. 阻塞/非阻塞:支持tryLock和阻塞lock
1.3 技术挑战
互斥性保证:
- 如何保证只有一个客户端能加锁成功
死锁避免:
- 客户端宕机如何自动释放锁
- 如何避免误删其他客户端的锁
高可用性:
- Redis主从切换时的一致性问题
- 如何实现分布式环境下的共识
二、方案对比
2.1 常见方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 数据库 | 简单、强一致 | 性能差、锁表风险 | 低并发 |
| Redis SETNX | 高性能 | 主从不一致 | 高并发(推荐) |
| Redlock | 强一致 | 复杂、性能差 | 强一致性要求 |
| Zookeeper | 强一致、支持阻塞 | 性能一般 | CP场景 |
| etcd | 强一致、天然支持租约 | 重量级 | K8s场景 |
2.2 推荐方案
单Redis实例:Redis SETNX + Lua(推荐) Redis集群:Redlock算法 强一致性:Zookeeper
三、Redis分布式锁
3.1 基础版(有问题)
go
// ❌ 错误示例
func Lock(key string, expireTime int) bool {
// SETNX设置锁
success := redis.SetNX(key, "1", 0)
if success {
// 设置过期时间
redis.Expire(key, expireTime)
return true
}
return false
}问题:SETNX和Expire不是原子操作,如果SETNX成功后进程崩溃,锁永远不会释放!
3.2 改进版1(SET NX EX)
go
// ✅ 正确:SET NX EX原子操作
func Lock(key string, value string, expireTime int) bool {
// SET key value NX EX seconds
result := redis.Set(key, value, expireTime, "NX")
return result == "OK"
}
func Unlock(key string, value string) bool {
// 直接删除(有问题)
redis.Del(key)
}问题:可能误删其他客户端的锁!
- A加锁成功,设置30秒过期
- A业务执行40秒(超时)
- 30秒时锁自动过期
- B加锁成功
- A执行完,删除锁(实际删除的是B的锁)
3.3 改进版2(Lua保证原子性)
go
// ✅ 完整的分布式锁
type RedisLock struct {
redis *redis.Client
key string
value string // 唯一标识(UUID)
expire time.Duration
}
// Lock 加锁
func (l *RedisLock) Lock(ctx context.Context) error {
// SET key value NX EX seconds
success, err := l.redis.SetNX(ctx, l.key, l.value, l.expire).Result()
if err != nil {
return err
}
if !success {
return errors.New("acquire lock failed")
}
return nil
}
// Unlock 解锁(Lua脚本保证原子性)
func (l *RedisLock) Unlock(ctx context.Context) error {
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
if err != nil {
return err
}
if result == int64(0) {
return errors.New("unlock failed: lock not owned")
}
return nil
}3.4 完整实现(Go)
点击查看完整实现
go
package distlock
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
// RedisLock Redis分布式锁
type RedisLock struct {
redis *redis.Client
key string
value string // 锁的唯一标识
expiration time.Duration
// 看门狗相关
watchdog *time.Ticker
stopCh chan struct{}
}
// NewRedisLock 创建Redis锁
func NewRedisLock(redis *redis.Client, key string, expiration time.Duration) *RedisLock {
return &RedisLock{
redis: redis,
key: key,
value: uuid.New().String(),
expiration: expiration,
stopCh: make(chan struct{}),
}
}
// Lock 阻塞加锁
func (l *RedisLock) Lock(ctx context.Context) error {
for {
success, err := l.TryLock(ctx)
if err != nil {
return err
}
if success {
// 启动看门狗(自动续期)
l.startWatchdog(ctx)
return nil
}
// 等待一段时间后重试
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
continue
}
}
}
// TryLock 尝试加锁(非阻塞)
func (l *RedisLock) TryLock(ctx context.Context) (bool, error) {
// SET key value NX EX seconds
success, err := l.redis.SetNX(ctx, l.key, l.value, l.expiration).Result()
if err != nil {
return false, err
}
return success, nil
}
// LockWithTimeout 带超时的加锁
func (l *RedisLock) LockWithTimeout(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return l.Lock(ctx)
}
// Unlock 解锁
func (l *RedisLock) Unlock(ctx context.Context) error {
// 停止看门狗
l.stopWatchdog()
// Lua脚本保证原子性
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
if err != nil {
return err
}
if result == int64(0) {
return errors.New("unlock failed: lock not owned or already released")
}
return nil
}
// startWatchdog 启动看门狗(自动续期)
func (l *RedisLock) startWatchdog(ctx context.Context) {
// 每1/3过期时间续期一次
renewInterval := l.expiration / 3
l.watchdog = time.NewTicker(renewInterval)
go func() {
for {
select {
case <-l.watchdog.C:
// 续期
l.renew(ctx)
case <-l.stopCh:
return
}
}
}()
}
// stopWatchdog 停止看门狗
func (l *RedisLock) stopWatchdog() {
if l.watchdog != nil {
l.watchdog.Stop()
close(l.stopCh)
}
}
// renew 续期锁
func (l *RedisLock) renew(ctx context.Context) error {
// Lua脚本续期
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`
result, err := l.redis.Eval(ctx, script,
[]string{l.key},
l.value,
int(l.expiration.Seconds())).Result()
if err != nil {
return err
}
if result == int64(0) {
return errors.New("renew failed: lock not owned")
}
return nil
}
// IsLocked 检查是否持有锁
func (l *RedisLock) IsLocked(ctx context.Context) (bool, error) {
value, err := l.redis.Get(ctx, l.key).Result()
if err == redis.Nil {
return false, nil
}
if err != nil {
return false, err
}
return value == l.value, nil
}3.5 可重入锁实现
go
// ReentrantLock 可重入锁
type ReentrantLock struct {
redis *redis.Client
key string
value string
expiration time.Duration
}
// Lock 加锁(支持可重入)
func (l *ReentrantLock) Lock(ctx context.Context) error {
script := `
if redis.call("EXISTS", KEYS[1]) == 0 then
redis.call("HSET", KEYS[1], ARGV[1], 1)
redis.call("EXPIRE", KEYS[1], ARGV[2])
return 1
elseif redis.call("HEXISTS", KEYS[1], ARGV[1]) == 1 then
redis.call("HINCRBY", KEYS[1], ARGV[1], 1)
redis.call("EXPIRE", KEYS[1], ARGV[2])
return 1
else
return 0
end
`
result, err := l.redis.Eval(ctx, script,
[]string{l.key},
l.value,
int(l.expiration.Seconds())).Result()
if err != nil {
return err
}
if result == int64(0) {
return errors.New("acquire lock failed")
}
return nil
}
// Unlock 解锁(支持可重入)
func (l *ReentrantLock) Unlock(ctx context.Context) error {
script := `
if redis.call("HEXISTS", KEYS[1], ARGV[1]) == 0 then
return nil
end
local counter = redis.call("HINCRBY", KEYS[1], ARGV[1], -1)
if counter > 0 then
return 0
else
redis.call("DEL", KEYS[1])
return 1
end
`
result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
if err != nil {
return err
}
if result == nil {
return errors.New("unlock failed: lock not owned")
}
return nil
}3.6 Java实现(Redisson)
java
@Service
public class RedisLockService {
@Autowired
private RedissonClient redissonClient;
/**
* 执行带锁的操作
*/
public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁,最多等待3秒,锁30秒后自动释放
boolean success = lock.tryLock(3, 30, TimeUnit.SECONDS);
if (!success) {
throw new RuntimeException("获取锁失败");
}
// 执行业务逻辑
return supplier.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("加锁被中断", e);
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 扣减库存(防止超卖)
*/
public boolean deductStock(Long productId, Integer quantity) {
String lockKey = "stock:lock:" + productId;
return executeWithLock(lockKey, () -> {
// 查询库存
Integer stock = stockService.getStock(productId);
if (stock < quantity) {
return false;
}
// 扣减库存
stockService.deduct(productId, quantity);
return true;
});
}
}java
@Component
public class CustomRedisLock {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加锁
*/
public boolean lock(String key, String value, long expireTime) {
// SET key value NX EX seconds
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 解锁(Lua脚本保证原子性)
*/
public boolean unlock(String key, String value) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(key),
value
);
return result != null && result == 1L;
}
/**
* 续期锁
*/
public boolean renew(String key, String value, long expireTime) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(key),
value,
String.valueOf(expireTime)
);
return result != null && result == 1L;
}
}四、Redlock算法
4.1 为什么需要Redlock
Redis主从架构存在问题:
- 客户端A在Master加锁成功
- Master在同步到Slave前宕机
- Slave被提升为Master(锁丢失)
- 客户端B在新Master加锁成功
- A和B同时持有锁(违反互斥性)
4.2 Redlock算法流程
前提:部署N个独立的Redis Master(N=5,奇数)
加锁流程:
- 获取当前时间戳t1
- 依次向N个Redis实例请求加锁
- 设置较短的超时时间(避免阻塞)
- 计算加锁耗时:t2 - t1
- 判断是否加锁成功:
- 成功数 > N/2(至少3个)
- 总耗时 < 锁有效期
解锁流程: 向所有Redis实例发送解锁请求
4.3 Redlock实现
go
type Redlock struct {
clients []*redis.Client
}
func NewRedlock(addrs []string) *Redlock {
clients := make([]*redis.Client, len(addrs))
for i, addr := range addrs {
clients[i] = redis.NewClient(&redis.Options{Addr: addr})
}
return &Redlock{clients: clients}
}
func (r *Redlock) Lock(ctx context.Context, key, value string, expiration time.Duration) (bool, error) {
start := time.Now()
successCount := 0
// 向所有实例请求加锁
for _, client := range r.clients {
success, err := client.SetNX(ctx, key, value, expiration).Result()
if err == nil && success {
successCount++
}
}
// 计算耗时
cost := time.Since(start)
// 判断是否成功
if successCount >= len(r.clients)/2+1 && cost < expiration {
return true, nil
}
// 失败,释放已加锁的实例
r.Unlock(ctx, key, value)
return false, nil
}
func (r *Redlock) Unlock(ctx context.Context, key, value string) error {
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
// 向所有实例发送解锁请求
for _, client := range r.clients {
client.Eval(ctx, script, []string{key}, value)
}
return nil
}4.4 Redlock的争议
Martin Kleppmann的质疑:
- 依赖系统时钟(NTP同步问题)
- GC导致的STW可能破坏互斥性
- 复杂度高,性能差
Redis作者Antirez的回应:
- 使用单调时钟而非系统时钟
- GC问题所有方案都存在
- 适用于特定场景
结论:
- 如果需要强一致性,用Zookeeper
- 如果能容忍偶尔失败,用单Redis实例
五、Zookeeper分布式锁
5.1 原理
利用Zookeeper的临时顺序节点:
- 创建临时顺序节点:
/lock/seq-0000000001 - 获取所有子节点,判断自己是否最小
- 如果是最小节点,加锁成功
- 否则,监听前一个节点的删除事件
- 收到通知后,重新判断
优点:
- 强一致性(CP)
- 天然支持阻塞等待
- 自动释放(临时节点)
缺点:
- 性能较差
- 依赖Zookeeper
5.2 实现
go
type ZkLock struct {
conn *zk.Conn
path string
myNode string
}
func (l *ZkLock) Lock() error {
// 创建临时顺序节点
path, err := l.conn.Create(
l.path+"/seq-",
[]byte(""),
zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
if err != nil {
return err
}
l.myNode = path
for {
// 获取所有子节点
children, _, err := l.conn.Children(l.path)
if err != nil {
return err
}
// 排序
sort.Strings(children)
// 判断是否最小
if children[0] == filepath.Base(l.myNode) {
// 加锁成功
return nil
}
// 找到前一个节点
var prevNode string
for i, child := range children {
if child == filepath.Base(l.myNode) {
prevNode = children[i-1]
break
}
}
// 监听前一个节点
_, _, ch, err := l.conn.ExistsW(l.path + "/" + prevNode)
if err != nil {
return err
}
// 等待通知
<-ch
}
}
func (l *ZkLock) Unlock() error {
return l.conn.Delete(l.myNode, -1)
}六、性能对比
| 方案 | TPS | 延迟 | 一致性 | 可用性 |
|---|---|---|---|---|
| Redis单实例 | 10万 | <1ms | 弱 | 高 |
| Redlock | 1万 | 5-10ms | 较强 | 高 |
| Zookeeper | 1000 | 10-50ms | 强 | 中 |
| etcd | 5000 | 5-20ms | 强 | 高 |
七、面试要点
7.1 常见追问
Q1: Redis分布式锁如何防止死锁?
A:
- 设置过期时间:避免持锁客户端宕机
- 看门狗机制:自动续期(Redisson实现)
- Lua脚本:保证加锁和设置过期的原子性
Q2: 如何防止误删其他客户端的锁?
A: 使用唯一标识(UUID)
- 加锁时设置UUID为value
- 解锁时先判断value是否匹配
- 使用Lua脚本保证原子性
Q3: Redis分布式锁的缺陷?
A:
- 主从不一致:主从切换时可能丢锁
- 时钟依赖:依赖系统时钟(Redlock)
- 不支持阻塞:需要轮询(可用Redisson解决)
Q4: Redlock vs 单Redis?
A:
- 单Redis:简单、高性能,适合大多数场景
- Redlock:复杂、性能差,但更安全
- 建议:除非有强一致性要求,否则用单Redis
7.2 扩展知识点
八、总结
分布式锁选型建议:
- 高性能场景:Redis单实例(推荐)
- 强一致性场景:Zookeeper
- Redis集群:Redlock(慎用)
- 低并发场景:MySQL
Redis分布式锁三要素:
- SET NX EX:原子加锁+过期
- UUID value:防误删
- Lua解锁:原子判断+删除
相关场景题:
