Skip to content

Mutex 互斥锁

Mutex 的几种状态

Go 中的 sync.Mutex 是最基本的同步原语,它有几种不同的状态,理解这些状态对于深入掌握 Go 并发编程至关重要。

Mutex 的内部结构

go
type Mutex struct {
    state int32   // 状态字段
    sema  uint32  // 信号量,用于阻塞/唤醒 goroutine
}

状态位定义

Mutex 的状态用一个 32 位整数表示,不同的位有不同的含义:

go
const (
    mutexLocked = 1 << iota    // 第0位:是否已锁定 (0000...0001)
    mutexWoken                 // 第1位:是否有唤醒的 goroutine (0000...0010)
    mutexStarving              // 第2位:是否处于饥饿模式 (0000...0100)
    mutexWaiterShift = iota    // 等待者数量的位移量(从第3位开始)
)

状态详解

1. mutexLocked(已锁定状态)

  • 含义:表示互斥锁当前是否被某个 goroutine 持有
  • 位置:第 0 位
  • :1 表示已锁定,0 表示未锁定

2. mutexWoken(唤醒状态)

  • 含义:表示是否有 goroutine 已经被唤醒且正在尝试获取锁
  • 位置:第 1 位
  • 目的:避免不必要的唤醒操作

3. mutexStarving(饥饿状态)

  • 含义:表示互斥锁是否处于饥饿模式
  • 位置:第 2 位
  • 触发条件:等待超过 1ms 的 goroutine

4. Waiter Count(等待者计数)

  • 含义:记录当前等待获取锁的 goroutine 数量
  • 位置:第 3-31 位(29 位)
  • 最大值:2^29 - 1 个等待者

示例代码

点击查看完整代码实现
点击查看完整代码实现
go
package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
    "unsafe"
)

// 通过反射获取 Mutex 的内部状态
func getMutexState(m *sync.Mutex) int32 {
    return atomic.LoadInt32((*int32)(unsafe.Pointer(m)))
}

func analyzeState(state int32) {
    locked := state & 1
    woken := (state >> 1) & 1
    starving := (state >> 2) & 1
    waiters := state >> 3
    
    fmt.Printf("状态值: %032b\n", state)
    fmt.Printf("  锁定状态: %d (已锁定: %t)\n", locked, locked == 1)
    fmt.Printf("  唤醒状态: %d (有唤醒: %t)\n", woken, woken == 1)
    fmt.Printf("  饥饿状态: %d (饥饿模式: %t)\n", starving, starving == 1)
    fmt.Printf("  等待者数: %d\n", waiters)
    fmt.Println()
}

func testMutexStates() {
    var mu sync.Mutex
    
    fmt.Println("=== 初始状态 ===")
    analyzeState(getMutexState(&mu))
    
    fmt.Println("=== 锁定后状态 ===")
    mu.Lock()
    analyzeState(getMutexState(&mu))
    
    // 启动多个等待的 goroutine
    fmt.Println("=== 有等待者的状态 ===")
    done := make(chan bool)
    
    for i := 0; i < 3; i++ {
        go func(id int) {
            mu.Lock()
            fmt.Printf("Goroutine %d 获得锁\n", id)
            time.Sleep(100 * time.Millisecond)
            mu.Unlock()
            done <- true
        }(i)
    }
    
    // 等待 goroutine 启动并开始等待
    time.Sleep(10 * time.Millisecond)
    analyzeState(getMutexState(&mu))
    
    fmt.Println("=== 释放锁 ===")
    mu.Unlock()
    
    // 等待所有 goroutine 完成
    for i := 0; i < 3; i++ {
        <-done
    }
    
    fmt.Println("=== 最终状态 ===")
    analyzeState(getMutexState(&mu))
}

func main() {
    testMutexStates()
}

:::

Mutex 正常模式和饥饿模式

Mutex 有两种工作模式,这是 Go 1.9 引入的重要优化。

正常模式(Normal Mode)

特点:

  • 新来的 goroutine 和被唤醒的 goroutine 竞争锁
  • 新来的 goroutine 有优势(它们已经在 CPU 上运行)
  • 被唤醒的 goroutine 可能再次被阻塞
  • 吞吐量高,但可能导致尾部延迟较高

工作流程:

1. goroutine A 持有锁
2. goroutine B 尝试获取锁,进入等待队列
3. goroutine A 释放锁
4. goroutine B 被唤醒,同时 goroutine C 也尝试获取锁
5. goroutine C 可能抢先获得锁(因为它在 CPU 上运行)
6. goroutine B 再次进入等待

饥饿模式(Starvation Mode)

触发条件:

  • 等待锁的 goroutine 超过 1ms

特点:

  • 锁直接传递给队列头部的等待者
  • 新来的 goroutine 不会竞争,直接排队
  • 公平性高,避免长时间等待
  • 吞吐量可能降低

退出条件:

  • 等待时间少于 1ms
  • 或者它是队列中的最后一个等待者

模式切换示例

点击查看完整代码实现
点击查看完整代码实现
go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
    "unsafe"
)

func demonstrateStarvation() {
    var mu sync.Mutex
    var counter int64
    
    // 持续持有锁的 goroutine
    go func() {
        for i := 0; i < 100; i++ {
            mu.Lock()
            time.Sleep(2 * time.Millisecond)  // 持有锁较长时间
            atomic.AddInt64(&counter, 1)
            mu.Unlock()
            time.Sleep(1 * time.Millisecond)  // 短暂释放
        }
    }()
    
    // 竞争锁的 goroutine
    start := time.Now()
    var wg sync.WaitGroup
    wg.Add(5)
    
    for i := 0; i < 5; i++ {
        go func(id int) {
            defer wg.Done()
            
            waitStart := time.Now()
            mu.Lock()
            waitTime := time.Since(waitStart)
            
            fmt.Printf("Goroutine %d 等待时间: %v\n", id, waitTime)
            
            time.Sleep(1 * time.Millisecond)
            mu.Unlock()
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("总耗时: %v, Counter: %d\n", time.Since(start), counter)
}

func main() {
    demonstrateStarvation()
}

:::

Mutex 允许自旋的条件

自旋是一种优化技术,在某些条件下,goroutine 不会立即进入休眠,而是进行短暂的忙等待。

自旋的条件

必须同时满足以下所有条件:

  1. 锁已被占用,且不是饥饿模式
  2. 积累的自旋次数小于 4
  3. CPU 核数大于 1
  4. 至少有一个其他正在运行的 P(处理器)
  5. 当前 goroutine 所在的 P 的本地队列为空

自旋的源码逻辑

go
// 简化的自旋条件检查
func canSpin(iter int) bool {
    return iter < active_spin &&    // 自旋次数 < 4
           runtime.NumCPU() > 1 &&  // 多核
           runtime.GOMAXPROCS(0) > 1 && // 多个 P
           runtime.NumGoroutine() > 1   // 有其他 goroutine
}

自旋的好处

  1. 避免上下文切换:减少 goroutine 的阻塞和唤醒开销
  2. 提高响应速度:在锁快速释放的场景下效果明显
  3. 减少系统调用:避免进入内核态

自旋示例

点击查看完整代码实现
点击查看完整代码实现
go
package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

func testSpinning() {
    var mu sync.Mutex
    var spinCount int64
    
    fmt.Printf("CPU 核数: %d\n", runtime.NumCPU())
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    
    // 快速释放锁的场景
    go func() {
        for i := 0; i < 1000; i++ {
            mu.Lock()
            time.Sleep(10 * time.Microsecond)  // 很短的持有时间
            mu.Unlock()
            time.Sleep(10 * time.Microsecond)  // 很短的间隔
        }
    }()
    
    // 多个竞争者
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            for j := 0; j < 100; j++ {
                start := time.Now()
                mu.Lock()
                duration := time.Since(start)
                
                if duration < 100*time.Microsecond {
                    atomic.AddInt64(&spinCount, 1)
                }
                
                mu.Unlock()
                time.Sleep(50 * time.Microsecond)
            }
        }(i)
    }
    
    wg.Wait()
    
    fmt.Printf("可能的自旋次数: %d\n", atomic.LoadInt64(&spinCount))
}

func main() {
    testSpinning()
}

:::

最佳实践

1. 锁的粒度

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
// 错误:锁粒度太粗
type BadCounter struct {
    mu    sync.Mutex
    count int
    data  map[string]int
}

func (c *BadCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *BadCounter) GetData(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.data[key]  // 读操作也用了写锁
}

// 正确:合适的锁粒度
type GoodCounter struct {
    mu     sync.RWMutex
    count  int
    dataMu sync.RWMutex
    data   map[string]int
}

func (c *GoodCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *GoodCounter) GetData(key string) int {
    c.dataMu.RLock()
    defer c.dataMu.RUnlock()
    return c.data[key]
}

::: :::

2. 避免死锁

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
// 错误:可能死锁
func transfer(from, to *Account, amount int) {
    from.mu.Lock()
    defer from.mu.Unlock()
    
    to.mu.Lock()  // 可能死锁
    defer to.mu.Unlock()
    
    from.balance -= amount
    to.balance += amount
}

// 正确:按固定顺序获取锁
func transferSafe(from, to *Account, amount int) {
    if from.id < to.id {
        from.mu.Lock()
        defer from.mu.Unlock()
        to.mu.Lock()
        defer to.mu.Unlock()
    } else {
        to.mu.Lock()
        defer to.mu.Unlock()
        from.mu.Lock()
        defer from.mu.Unlock()
    }
    
    from.balance -= amount
    to.balance += amount
}

::: :::

3. 锁的持有时间

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
// 错误:持有锁时间过长
func processData(mu *sync.Mutex, data []int) {
    mu.Lock()
    defer mu.Unlock()
    
    // 长时间的计算
    result := heavyComputation(data)
    
    // 更新共享状态
    updateSharedState(result)
}

// 正确:最小化锁持有时间
func processDataOptimized(mu *sync.Mutex, data []int) {
    // 在锁外进行计算
    result := heavyComputation(data)
    
    // 仅在必要时持有锁
    mu.Lock()
    updateSharedState(result)
    mu.Unlock()
}

::: :::

面试要点

  1. Mutex 状态位

    • 32 位状态字段的不同位含义
    • 锁定、唤醒、饥饿状态的作用
  2. 工作模式

    • 正常模式:高吞吐量,可能有饥饿
    • 饥饿模式:保证公平性,避免长时间等待
  3. 自旋优化

    • 5 个自旋条件
    • 避免不必要的上下文切换
  4. 最佳实践

    • 合适的锁粒度
    • 避免死锁
    • 最小化锁持有时间

正在精进