Skip to content

如何设计分布式锁

一、问题描述

1.1 业务背景

在分布式系统中,多个节点可能同时访问共享资源,需要通过分布式锁来保证互斥访问。

典型场景

  • 库存扣减:秒杀时防止超卖
  • 订单号生成:保证订单号唯一
  • 定时任务:防止多个节点重复执行
  • 缓存更新:防止缓存击穿

1.2 核心需求

必须满足

  1. 互斥性:同一时刻只有一个客户端持有锁
  2. 防死锁:锁最终一定会被释放
  3. 容错性:即使持锁客户端宕机,锁也能释放

最好满足: 4. 可重入:同一客户端可以多次获取锁 5. 高性能:加锁解锁速度快 6. 阻塞/非阻塞:支持tryLock和阻塞lock

1.3 技术挑战

互斥性保证

  • 如何保证只有一个客户端能加锁成功

死锁避免

  • 客户端宕机如何自动释放锁
  • 如何避免误删其他客户端的锁

高可用性

  • Redis主从切换时的一致性问题
  • 如何实现分布式环境下的共识

二、方案对比

2.1 常见方案对比

方案优点缺点适用场景
数据库简单、强一致性能差、锁表风险低并发
Redis SETNX高性能主从不一致高并发(推荐)
Redlock强一致复杂、性能差强一致性要求
Zookeeper强一致、支持阻塞性能一般CP场景
etcd强一致、天然支持租约重量级K8s场景

2.2 推荐方案

单Redis实例:Redis SETNX + Lua(推荐) Redis集群:Redlock算法 强一致性:Zookeeper

三、Redis分布式锁

3.1 基础版(有问题)

go
// ❌ 错误示例
func Lock(key string, expireTime int) bool {
    // SETNX设置锁
    success := redis.SetNX(key, "1", 0)
    if success {
        // 设置过期时间
        redis.Expire(key, expireTime)
        return true
    }
    return false
}

问题:SETNX和Expire不是原子操作,如果SETNX成功后进程崩溃,锁永远不会释放!

3.2 改进版1(SET NX EX)

go
// ✅ 正确:SET NX EX原子操作
func Lock(key string, value string, expireTime int) bool {
    // SET key value NX EX seconds
    result := redis.Set(key, value, expireTime, "NX")
    return result == "OK"
}

func Unlock(key string, value string) bool {
    // 直接删除(有问题)
    redis.Del(key)
}

问题:可能误删其他客户端的锁!

  • A加锁成功,设置30秒过期
  • A业务执行40秒(超时)
  • 30秒时锁自动过期
  • B加锁成功
  • A执行完,删除锁(实际删除的是B的锁)

3.3 改进版2(Lua保证原子性)

go
// ✅ 完整的分布式锁
type RedisLock struct {
    redis  *redis.Client
    key    string
    value  string // 唯一标识(UUID)
    expire time.Duration
}

// Lock 加锁
func (l *RedisLock) Lock(ctx context.Context) error {
    // SET key value NX EX seconds
    success, err := l.redis.SetNX(ctx, l.key, l.value, l.expire).Result()
    if err != nil {
        return err
    }
    if !success {
        return errors.New("acquire lock failed")
    }
    return nil
}

// Unlock 解锁(Lua脚本保证原子性)
func (l *RedisLock) Unlock(ctx context.Context) error {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    
    result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
    if err != nil {
        return err
    }
    if result == int64(0) {
        return errors.New("unlock failed: lock not owned")
    }
    return nil
}

3.4 完整实现(Go)

点击查看完整实现
go
package distlock

import (
    "context"
    "errors"
    "time"
    
    "github.com/go-redis/redis/v8"
    "github.com/google/uuid"
)

// RedisLock Redis分布式锁
type RedisLock struct {
    redis      *redis.Client
    key        string
    value      string // 锁的唯一标识
    expiration time.Duration
    // 看门狗相关
    watchdog   *time.Ticker
    stopCh     chan struct{}
}

// NewRedisLock 创建Redis锁
func NewRedisLock(redis *redis.Client, key string, expiration time.Duration) *RedisLock {
    return &RedisLock{
        redis:      redis,
        key:        key,
        value:      uuid.New().String(),
        expiration: expiration,
        stopCh:     make(chan struct{}),
    }
}

// Lock 阻塞加锁
func (l *RedisLock) Lock(ctx context.Context) error {
    for {
        success, err := l.TryLock(ctx)
        if err != nil {
            return err
        }
        if success {
            // 启动看门狗(自动续期)
            l.startWatchdog(ctx)
            return nil
        }
        
        // 等待一段时间后重试
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(100 * time.Millisecond):
            continue
        }
    }
}

// TryLock 尝试加锁(非阻塞)
func (l *RedisLock) TryLock(ctx context.Context) (bool, error) {
    // SET key value NX EX seconds
    success, err := l.redis.SetNX(ctx, l.key, l.value, l.expiration).Result()
    if err != nil {
        return false, err
    }
    return success, nil
}

// LockWithTimeout 带超时的加锁
func (l *RedisLock) LockWithTimeout(ctx context.Context, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()
    
    return l.Lock(ctx)
}

// Unlock 解锁
func (l *RedisLock) Unlock(ctx context.Context) error {
    // 停止看门狗
    l.stopWatchdog()
    
    // Lua脚本保证原子性
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    
    result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
    if err != nil {
        return err
    }
    
    if result == int64(0) {
        return errors.New("unlock failed: lock not owned or already released")
    }
    
    return nil
}

// startWatchdog 启动看门狗(自动续期)
func (l *RedisLock) startWatchdog(ctx context.Context) {
    // 每1/3过期时间续期一次
    renewInterval := l.expiration / 3
    l.watchdog = time.NewTicker(renewInterval)
    
    go func() {
        for {
            select {
            case <-l.watchdog.C:
                // 续期
                l.renew(ctx)
            case <-l.stopCh:
                return
            }
        }
    }()
}

// stopWatchdog 停止看门狗
func (l *RedisLock) stopWatchdog() {
    if l.watchdog != nil {
        l.watchdog.Stop()
        close(l.stopCh)
    }
}

// renew 续期锁
func (l *RedisLock) renew(ctx context.Context) error {
    // Lua脚本续期
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("EXPIRE", KEYS[1], ARGV[2])
        else
            return 0
        end
    `
    
    result, err := l.redis.Eval(ctx, script, 
        []string{l.key}, 
        l.value, 
        int(l.expiration.Seconds())).Result()
    
    if err != nil {
        return err
    }
    
    if result == int64(0) {
        return errors.New("renew failed: lock not owned")
    }
    
    return nil
}

// IsLocked 检查是否持有锁
func (l *RedisLock) IsLocked(ctx context.Context) (bool, error) {
    value, err := l.redis.Get(ctx, l.key).Result()
    if err == redis.Nil {
        return false, nil
    }
    if err != nil {
        return false, err
    }
    return value == l.value, nil
}

3.5 可重入锁实现

go
// ReentrantLock 可重入锁
type ReentrantLock struct {
    redis      *redis.Client
    key        string
    value      string
    expiration time.Duration
}

// Lock 加锁(支持可重入)
func (l *ReentrantLock) Lock(ctx context.Context) error {
    script := `
        if redis.call("EXISTS", KEYS[1]) == 0 then
            redis.call("HSET", KEYS[1], ARGV[1], 1)
            redis.call("EXPIRE", KEYS[1], ARGV[2])
            return 1
        elseif redis.call("HEXISTS", KEYS[1], ARGV[1]) == 1 then
            redis.call("HINCRBY", KEYS[1], ARGV[1], 1)
            redis.call("EXPIRE", KEYS[1], ARGV[2])
            return 1
        else
            return 0
        end
    `
    
    result, err := l.redis.Eval(ctx, script, 
        []string{l.key}, 
        l.value, 
        int(l.expiration.Seconds())).Result()
    
    if err != nil {
        return err
    }
    
    if result == int64(0) {
        return errors.New("acquire lock failed")
    }
    
    return nil
}

// Unlock 解锁(支持可重入)
func (l *ReentrantLock) Unlock(ctx context.Context) error {
    script := `
        if redis.call("HEXISTS", KEYS[1], ARGV[1]) == 0 then
            return nil
        end
        
        local counter = redis.call("HINCRBY", KEYS[1], ARGV[1], -1)
        if counter > 0 then
            return 0
        else
            redis.call("DEL", KEYS[1])
            return 1
        end
    `
    
    result, err := l.redis.Eval(ctx, script, []string{l.key}, l.value).Result()
    if err != nil {
        return err
    }
    
    if result == nil {
        return errors.New("unlock failed: lock not owned")
    }
    
    return nil
}

3.6 Java实现(Redisson)

java
@Service
public class RedisLockService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    /**
     * 执行带锁的操作
     */
    public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            // 尝试加锁,最多等待3秒,锁30秒后自动释放
            boolean success = lock.tryLock(3, 30, TimeUnit.SECONDS);
            if (!success) {
                throw new RuntimeException("获取锁失败");
            }
            
            // 执行业务逻辑
            return supplier.get();
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("加锁被中断", e);
        } finally {
            // 释放锁
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    
    /**
     * 扣减库存(防止超卖)
     */
    public boolean deductStock(Long productId, Integer quantity) {
        String lockKey = "stock:lock:" + productId;
        
        return executeWithLock(lockKey, () -> {
            // 查询库存
            Integer stock = stockService.getStock(productId);
            if (stock < quantity) {
                return false;
            }
            
            // 扣减库存
            stockService.deduct(productId, quantity);
            return true;
        });
    }
}
java
@Component
public class CustomRedisLock {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 加锁
     */
    public boolean lock(String key, String value, long expireTime) {
        // SET key value NX EX seconds
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }
    
    /**
     * 解锁(Lua脚本保证原子性)
     */
    public boolean unlock(String key, String value) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "  return redis.call('del', KEYS[1]) " +
            "else " +
            "  return 0 " +
            "end";
        
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        redisScript.setResultType(Long.class);
        
        Long result = redisTemplate.execute(
            redisScript, 
            Collections.singletonList(key), 
            value
        );
        
        return result != null && result == 1L;
    }
    
    /**
     * 续期锁
     */
    public boolean renew(String key, String value, long expireTime) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "  return redis.call('expire', KEYS[1], ARGV[2]) " +
            "else " +
            "  return 0 " +
            "end";
        
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        redisScript.setResultType(Long.class);
        
        Long result = redisTemplate.execute(
            redisScript, 
            Collections.singletonList(key), 
            value, 
            String.valueOf(expireTime)
        );
        
        return result != null && result == 1L;
    }
}

四、Redlock算法

4.1 为什么需要Redlock

Redis主从架构存在问题:

  1. 客户端A在Master加锁成功
  2. Master在同步到Slave前宕机
  3. Slave被提升为Master(锁丢失)
  4. 客户端B在新Master加锁成功
  5. A和B同时持有锁(违反互斥性)

4.2 Redlock算法流程

前提:部署N个独立的Redis Master(N=5,奇数)

加锁流程

  1. 获取当前时间戳t1
  2. 依次向N个Redis实例请求加锁
  3. 设置较短的超时时间(避免阻塞)
  4. 计算加锁耗时:t2 - t1
  5. 判断是否加锁成功:
    • 成功数 > N/2(至少3个)
    • 总耗时 < 锁有效期

解锁流程: 向所有Redis实例发送解锁请求

4.3 Redlock实现

go
type Redlock struct {
    clients []*redis.Client
}

func NewRedlock(addrs []string) *Redlock {
    clients := make([]*redis.Client, len(addrs))
    for i, addr := range addrs {
        clients[i] = redis.NewClient(&redis.Options{Addr: addr})
    }
    return &Redlock{clients: clients}
}

func (r *Redlock) Lock(ctx context.Context, key, value string, expiration time.Duration) (bool, error) {
    start := time.Now()
    successCount := 0
    
    // 向所有实例请求加锁
    for _, client := range r.clients {
        success, err := client.SetNX(ctx, key, value, expiration).Result()
        if err == nil && success {
            successCount++
        }
    }
    
    // 计算耗时
    cost := time.Since(start)
    
    // 判断是否成功
    if successCount >= len(r.clients)/2+1 && cost < expiration {
        return true, nil
    }
    
    // 失败,释放已加锁的实例
    r.Unlock(ctx, key, value)
    return false, nil
}

func (r *Redlock) Unlock(ctx context.Context, key, value string) error {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    
    // 向所有实例发送解锁请求
    for _, client := range r.clients {
        client.Eval(ctx, script, []string{key}, value)
    }
    
    return nil
}

4.4 Redlock的争议

Martin Kleppmann的质疑

  1. 依赖系统时钟(NTP同步问题)
  2. GC导致的STW可能破坏互斥性
  3. 复杂度高,性能差

Redis作者Antirez的回应

  1. 使用单调时钟而非系统时钟
  2. GC问题所有方案都存在
  3. 适用于特定场景

结论

  • 如果需要强一致性,用Zookeeper
  • 如果能容忍偶尔失败,用单Redis实例

五、Zookeeper分布式锁

5.1 原理

利用Zookeeper的临时顺序节点:

  1. 创建临时顺序节点:/lock/seq-0000000001
  2. 获取所有子节点,判断自己是否最小
  3. 如果是最小节点,加锁成功
  4. 否则,监听前一个节点的删除事件
  5. 收到通知后,重新判断

优点

  • 强一致性(CP)
  • 天然支持阻塞等待
  • 自动释放(临时节点)

缺点

  • 性能较差
  • 依赖Zookeeper

5.2 实现

go
type ZkLock struct {
    conn    *zk.Conn
    path    string
    myNode  string
}

func (l *ZkLock) Lock() error {
    // 创建临时顺序节点
    path, err := l.conn.Create(
        l.path+"/seq-",
        []byte(""),
        zk.FlagEphemeral|zk.FlagSequence,
        zk.WorldACL(zk.PermAll),
    )
    if err != nil {
        return err
    }
    l.myNode = path
    
    for {
        // 获取所有子节点
        children, _, err := l.conn.Children(l.path)
        if err != nil {
            return err
        }
        
        // 排序
        sort.Strings(children)
        
        // 判断是否最小
        if children[0] == filepath.Base(l.myNode) {
            // 加锁成功
            return nil
        }
        
        // 找到前一个节点
        var prevNode string
        for i, child := range children {
            if child == filepath.Base(l.myNode) {
                prevNode = children[i-1]
                break
            }
        }
        
        // 监听前一个节点
        _, _, ch, err := l.conn.ExistsW(l.path + "/" + prevNode)
        if err != nil {
            return err
        }
        
        // 等待通知
        <-ch
    }
}

func (l *ZkLock) Unlock() error {
    return l.conn.Delete(l.myNode, -1)
}

六、性能对比

方案TPS延迟一致性可用性
Redis单实例10万<1ms
Redlock1万5-10ms较强
Zookeeper100010-50ms
etcd50005-20ms

七、面试要点

7.1 常见追问

Q1: Redis分布式锁如何防止死锁?

A:

  1. 设置过期时间:避免持锁客户端宕机
  2. 看门狗机制:自动续期(Redisson实现)
  3. Lua脚本:保证加锁和设置过期的原子性

Q2: 如何防止误删其他客户端的锁?

A: 使用唯一标识(UUID)

  • 加锁时设置UUID为value
  • 解锁时先判断value是否匹配
  • 使用Lua脚本保证原子性

Q3: Redis分布式锁的缺陷?

A:

  1. 主从不一致:主从切换时可能丢锁
  2. 时钟依赖:依赖系统时钟(Redlock)
  3. 不支持阻塞:需要轮询(可用Redisson解决)

Q4: Redlock vs 单Redis?

A:

  • 单Redis:简单、高性能,适合大多数场景
  • Redlock:复杂、性能差,但更安全
  • 建议:除非有强一致性要求,否则用单Redis

7.2 扩展知识点

八、总结

分布式锁选型建议:

  1. 高性能场景:Redis单实例(推荐)
  2. 强一致性场景:Zookeeper
  3. Redis集群:Redlock(慎用)
  4. 低并发场景:MySQL

Redis分布式锁三要素

  1. SET NX EX:原子加锁+过期
  2. UUID value:防误删
  3. Lua解锁:原子判断+删除

相关场景题

正在精进