Skip to content

条件变量Cond详解 - Golang并发编程面试题

条件变量(Cond)是Go语言提供的一种同步原语,用于在某个条件满足时唤醒等待的goroutine。本章深入探讨Cond的工作原理、使用模式和实际应用场景。

📋 重点面试题

面试题 1:Cond的基本概念和工作原理

难度级别:⭐⭐⭐⭐
考察范围:同步原语/条件等待
技术标签sync.Cond condition variable wait-notify pattern spurious wakeup broadcast

问题分析

理解条件变量的设计理念和与其他同步原语的配合使用,掌握等待-通知模式是高级并发编程的重要技能。

详细解答

1. Cond基本概念

点击查看完整代码实现
点击查看完整代码实现
go
package main

import (
    "fmt"
    "sync"
    "time"
    "math/rand"
)

func demonstrateCondBasics() {
    fmt.Println("=== Cond基本概念演示 ===")
    
    // Cond的基本结构和使用
    demonstrateBasicStructure()
    
    // Wait-Signal模式
    demonstrateWaitSignalPattern()
    
    // Wait-Broadcast模式
    demonstrateWaitBroadcastPattern()
    
    // 条件检查的重要性
    demonstrateConditionCheck()
}

func demonstrateBasicStructure() {
    fmt.Println("\n--- Cond基本结构 ---")
    
    /*
    sync.Cond的基本结构:
    type Cond struct {
        noCopy noCopy
        L Locker    // 关联的锁(通常是*Mutex或*RWMutex)
        notify notifyList
        checker copyChecker
    }
    
    主要方法:
    - Wait(): 等待条件满足
    - Signal(): 唤醒一个等待的goroutine
    - Broadcast(): 唤醒所有等待的goroutine
    */
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    ready := false
    
    fmt.Println("创建条件变量,初始条件为false")
    
    // 启动等待者
    go func() {
        cond.L.Lock()
        defer cond.L.Unlock()
        
        fmt.Println("等待者: 开始等待条件满足")
        
        // 等待条件满足
        for !ready {
            fmt.Println("等待者: 条件未满足,进入等待")
            cond.Wait() // 释放锁并等待
            fmt.Println("等待者: 被唤醒,重新检查条件")
        }
        
        fmt.Println("等待者: 条件满足,继续执行")
    }()
    
    // 等待一段时间后设置条件
    time.Sleep(1 * time.Second)
    
    cond.L.Lock()
    ready = true
    fmt.Println("设置者: 条件已设置为true")
    cond.Signal() // 唤醒等待的goroutine
    fmt.Println("设置者: 已发送信号")
    cond.L.Unlock()
    
    time.Sleep(100 * time.Millisecond) // 等待等待者完成
}

func demonstrateWaitSignalPattern() {
    fmt.Println("\n--- Wait-Signal模式 ---")
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    // 共享资源和条件
    var queue []int
    var wg sync.WaitGroup
    
    // 消费者goroutine
    consumer := func(id int) {
        defer wg.Done()
        
        cond.L.Lock()
        defer cond.L.Unlock()
        
        // 等待队列中有数据
        for len(queue) == 0 {
            fmt.Printf("消费者 %d: 队列为空,等待数据\n", id)
            cond.Wait()
        }
        
        // 消费数据
        item := queue[0]
        queue = queue[1:]
        fmt.Printf("消费者 %d: 消费了数据 %d,队列长度: %d\n", id, item, len(queue))
    }
    
    // 生产者函数
    producer := func(data int) {
        cond.L.Lock()
        defer cond.L.Unlock()
        
        // 生产数据
        queue = append(queue, data)
        fmt.Printf("生产者: 生产了数据 %d,队列长度: %d\n", data, len(queue))
        
        // 通知一个等待的消费者
        cond.Signal()
    }
    
    // 启动多个消费者
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go consumer(i)
    }
    
    // 等待消费者准备好
    time.Sleep(200 * time.Millisecond)
    
    // 生产一些数据
    for i := 1; i <= 3; i++ {
        producer(i * 10)
        time.Sleep(300 * time.Millisecond)
    }
    
    wg.Wait()
}

func demonstrateWaitBroadcastPattern() {
    fmt.Println("\n--- Wait-Broadcast模式 ---")
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var gameStarted bool
    var wg sync.WaitGroup
    
    // 玩家goroutine
    player := func(id int) {
        defer wg.Done()
        
        cond.L.Lock()
        defer cond.L.Unlock()
        
        fmt.Printf("玩家 %d: 准备就绪,等待游戏开始\n", id)
        
        // 等待游戏开始
        for !gameStarted {
            cond.Wait()
        }
        
        fmt.Printf("玩家 %d: 游戏开始!开始游戏\n", id)
        
        // 模拟游戏过程
        cond.L.Unlock() // 临时释放锁执行游戏逻辑
        time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
        cond.L.Lock() // 重新获取锁
        
        fmt.Printf("玩家 %d: 游戏结束\n", id)
    }
    
    // 启动多个玩家
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go player(i)
    }
    
    // 等待所有玩家准备
    time.Sleep(500 * time.Millisecond)
    
    // 游戏管理员启动游戏
    cond.L.Lock()
    gameStarted = true
    fmt.Println("游戏管理员: 所有玩家准备就绪,游戏开始!")
    cond.Broadcast() // 唤醒所有等待的玩家
    cond.L.Unlock()
    
    wg.Wait()
    fmt.Println("游戏管理员: 所有玩家游戏结束")
}

func demonstrateConditionCheck() {
    fmt.Println("\n--- 条件检查的重要性 ---")
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var resource int
    var processed []int
    var wg sync.WaitGroup
    
    // 错误的条件检查(仅作演示)
    wrongWorker := func(id int) {
        defer wg.Done()
        
        cond.L.Lock()
        defer cond.L.Unlock()
        
        fmt.Printf("错误Worker %d: 等待资源\n", id)
        
        // 错误:使用if而不是for,可能导致虚假唤醒问题
        if resource <= 0 {
            cond.Wait()
        }
        
        if resource > 0 {
            resource--
            processed = append(processed, id)
            fmt.Printf("错误Worker %d: 处理了资源,剩余: %d\n", id, resource)
        } else {
            fmt.Printf("错误Worker %d: 被唤醒但没有资源可处理\n", id)
        }
    }
    
    // 正确的条件检查
    correctWorker := func(id int) {
        defer wg.Done()
        
        cond.L.Lock()
        defer cond.L.Unlock()
        
        fmt.Printf("正确Worker %d: 等待资源\n", id)
        
        // 正确:使用for循环检查条件,防止虚假唤醒
        for resource <= 0 {
            cond.Wait()
        }
        
        resource--
        processed = append(processed, id+100) // 区分正确worker
        fmt.Printf("正确Worker %d: 处理了资源,剩余: %d\n", id, resource)
    }
    
    // 演示错误方式
    fmt.Println("演示错误的条件检查:")
    resource = 2
    processed = nil
    
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go wrongWorker(i)
    }
    
    time.Sleep(200 * time.Millisecond)
    
    cond.L.Lock()
    fmt.Println("管理者: 广播唤醒所有worker")
    cond.Broadcast()
    cond.L.Unlock()
    
    wg.Wait()
    fmt.Printf("错误方式处理结果: %v\n", processed)
    
    // 演示正确方式
    fmt.Println("\n演示正确的条件检查:")
    resource = 2
    processed = nil
    
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go correctWorker(i)
    }
    
    time.Sleep(200 * time.Millisecond)
    
    cond.L.Lock()
    fmt.Println("管理者: 广播唤醒所有worker")
    cond.Broadcast()
    cond.L.Unlock()
    
    wg.Wait()
    fmt.Printf("正确方式处理结果: %v\n", processed)
}

:::

2. Cond的内部机制

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateCondInternals() {
    fmt.Println("\n=== Cond内部机制演示 ===")
    
    // 演示Wait()的三步操作
    demonstrateWaitMechanism()
    
    // 演示Signal vs Broadcast的区别
    demonstrateSignalVsBroadcast()
    
    // 演示虚假唤醒问题
    demonstrateSpuriousWakeup()
}

func demonstrateWaitMechanism() {
    fmt.Println("\n--- Wait()机制详解 ---")
    
    /*
    Wait()的三步操作:
    1. 释放关联的锁 c.L.Unlock()
    2. 将当前goroutine加入等待队列并阻塞
    3. 被唤醒后重新获取锁 c.L.Lock()
    */
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var step int
    var wg sync.WaitGroup
    
    // 监控锁状态的goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 0; i < 6; i++ {
            time.Sleep(200 * time.Millisecond)
            
            // 尝试获取锁来检查状态
            acquired := false
            done := make(chan struct{})
            
            go func() {
                mu.Lock()
                acquired = true
                mu.Unlock()
                close(done)
            }()
            
            select {
            case <-done:
                if acquired {
                    fmt.Printf("监控器: 锁当前是可用的 (步骤 %d)\n", step)
                }
            case <-time.After(50 * time.Millisecond):
                fmt.Printf("监控器: 锁当前被持有 (步骤 %d)\n", step)
            }
        }
    }()
    
    // 等待者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        fmt.Println("等待者: 获取锁")
        cond.L.Lock()
        step = 1
        
        fmt.Println("等待者: 持有锁,准备等待")
        step = 2
        time.Sleep(300 * time.Millisecond)
        
        fmt.Println("等待者: 调用Wait() - 即将释放锁")
        step = 3
        cond.Wait() // 1.释放锁 2.等待 3.重新获取锁
        
        fmt.Println("等待者: Wait()返回,重新持有锁")
        step = 5
        time.Sleep(200 * time.Millisecond)
        
        fmt.Println("等待者: 释放锁")
        step = 6
        cond.L.Unlock()
    }()
    
    // 信号发送者
    time.Sleep(800 * time.Millisecond)
    
    fmt.Println("信号者: 获取锁并发送信号")
    cond.L.Lock()
    step = 4
    cond.Signal()
    fmt.Println("信号者: 信号已发送,释放锁")
    cond.L.Unlock()
    
    wg.Wait()
}

func demonstrateSignalVsBroadcast() {
    fmt.Println("\n--- Signal vs Broadcast 对比 ---")
    
    // 测试Signal:只唤醒一个goroutine
    fmt.Println("测试Signal:")
    testSignalBehavior()
    
    time.Sleep(500 * time.Millisecond)
    
    // 测试Broadcast:唤醒所有goroutine
    fmt.Println("\n测试Broadcast:")
    testBroadcastBehavior()
}

func testSignalBehavior() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var ready bool
    var wg sync.WaitGroup
    
    // 启动多个等待者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            cond.L.Lock()
            defer cond.L.Unlock()
            
            fmt.Printf("Signal等待者 %d: 开始等待\n", id)
            
            for !ready {
                cond.Wait()
            }
            
            fmt.Printf("Signal等待者 %d: 被唤醒\n", id)
        }(i)
    }
    
    time.Sleep(200 * time.Millisecond)
    
    // 发送一个Signal
    cond.L.Lock()
    ready = true
    fmt.Println("Signal发送者: 发送Signal")
    cond.Signal() // 只唤醒一个
    cond.L.Unlock()
    
    time.Sleep(100 * time.Millisecond)
    
    // 再发送一个Signal
    cond.L.Lock()
    fmt.Println("Signal发送者: 再次发送Signal")
    cond.Signal()
    cond.L.Unlock()
    
    time.Sleep(100 * time.Millisecond)
    
    // 最后一个Signal
    cond.L.Lock()
    fmt.Println("Signal发送者: 最后一次发送Signal")
    cond.Signal()
    cond.L.Unlock()
    
    wg.Wait()
}

func testBroadcastBehavior() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var ready bool
    var wg sync.WaitGroup
    
    // 启动多个等待者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            cond.L.Lock()
            defer cond.L.Unlock()
            
            fmt.Printf("Broadcast等待者 %d: 开始等待\n", id)
            
            for !ready {
                cond.Wait()
            }
            
            fmt.Printf("Broadcast等待者 %d: 被唤醒\n", id)
        }(i)
    }
    
    time.Sleep(200 * time.Millisecond)
    
    // 发送一个Broadcast
    cond.L.Lock()
    ready = true
    fmt.Println("Broadcast发送者: 发送Broadcast")
    cond.Broadcast() // 唤醒所有
    cond.L.Unlock()
    
    wg.Wait()
}

func demonstrateSpuriousWakeup() {
    fmt.Println("\n--- 虚假唤醒问题演示 ---")
    
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    
    var condition bool
    var wg sync.WaitGroup
    
    // 模拟可能的虚假唤醒情况
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        cond.L.Lock()
        defer cond.L.Unlock()
        
        fmt.Println("等待者: 开始等待条件")
        
        // 正确的做法:使用for循环而不是if
        waitCount := 0
        for !condition {
            waitCount++
            fmt.Printf("等待者: 第 %d 次等待\n", waitCount)
            cond.Wait()
            fmt.Printf("等待者: 第 %d 次被唤醒,检查条件: %v\n", waitCount, condition)
        }
        
        fmt.Printf("等待者: 条件满足,总共等待了 %d\n", waitCount)
    }()
    
    time.Sleep(200 * time.Millisecond)
    
    // 模拟虚假唤醒:发送信号但不改变条件
    fmt.Println("干扰者: 发送虚假信号(条件仍为false)")
    cond.Signal()
    
    time.Sleep(100 * time.Millisecond)
    
    // 再次虚假唤醒
    fmt.Println("干扰者: 再次发送虚假信号")
    cond.Signal()
    
    time.Sleep(100 * time.Millisecond)
    
    // 最终真正满足条件
    cond.L.Lock()
    condition = true
    fmt.Println("设置者: 条件设置为true并发送信号")
    cond.Signal()
    cond.L.Unlock()
    
    wg.Wait()
}

::: :::

面试题 2:Cond的实际应用场景和模式

难度级别:⭐⭐⭐⭐⭐
考察范围:实际应用/设计模式
技术标签producer-consumer reader-writer barrier pattern event notification thread pool

问题分析

理解Cond在实际场景中的应用,掌握常见的并发设计模式和最佳实践。

详细解答

1. 生产者-消费者模式

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateProducerConsumer() {
    fmt.Println("\n=== 生产者-消费者模式 ===")
    
    // 有界缓冲区的实现
    demonstrateBoundedBuffer()
    
    // 多生产者多消费者模式
    demonstrateMultiProducerConsumer()
}

func demonstrateBoundedBuffer() {
    fmt.Println("\n--- 有界缓冲区实现 ---")
    
    type BoundedBuffer struct {
        buffer   []interface{}
        capacity int
        count    int
        in       int  // 生产者索引
        out      int  // 消费者索引
        mu       sync.Mutex
        notFull  *sync.Cond // 缓冲区不满的条件
        notEmpty *sync.Cond // 缓冲区不空的条件
    }
    
    NewBoundedBuffer := func(capacity int) *BoundedBuffer {
        bb := &BoundedBuffer{
            buffer:   make([]interface{}, capacity),
            capacity: capacity,
        }
        bb.notFull = sync.NewCond(&bb.mu)
        bb.notEmpty = sync.NewCond(&bb.mu)
        return bb
    }
    
    // 生产者操作
    Put := func(bb *BoundedBuffer, item interface{}) {
        bb.mu.Lock()
        defer bb.mu.Unlock()
        
        // 等待缓冲区不满
        for bb.count == bb.capacity {
            fmt.Printf("生产者: 缓冲区已满,等待空间\n")
            bb.notFull.Wait()
        }
        
        // 放入数据
        bb.buffer[bb.in] = item
        bb.in = (bb.in + 1) % bb.capacity
        bb.count++
        
        fmt.Printf("生产者: 放入 %v,缓冲区大小: %d/%d\n", 
            item, bb.count, bb.capacity)
        
        // 通知消费者
        bb.notEmpty.Signal()
    }
    
    // 消费者操作
    Get := func(bb *BoundedBuffer) interface{} {
        bb.mu.Lock()
        defer bb.mu.Unlock()
        
        // 等待缓冲区不空
        for bb.count == 0 {
            fmt.Printf("消费者: 缓冲区为空,等待数据\n")
            bb.notEmpty.Wait()
        }
        
        // 取出数据
        item := bb.buffer[bb.out]
        bb.buffer[bb.out] = nil // 清理引用
        bb.out = (bb.out + 1) % bb.capacity
        bb.count--
        
        fmt.Printf("消费者: 取出 %v,缓冲区大小: %d/%d\n", 
            item, bb.count, bb.capacity)
        
        // 通知生产者
        bb.notFull.Signal()
        
        return item
    }
    
    // 演示使用
    buffer := NewBoundedBuffer(3)
    var wg sync.WaitGroup
    
    // 启动生产者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 1; i <= 6; i++ {
            Put(buffer, fmt.Sprintf("item-%d", i))
            time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
        }
    }()
    
    // 启动消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 0; i < 6; i++ {
            item := Get(buffer)
            fmt.Printf("消费者处理: %v\n", item)
            time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
        }
    }()
    
    wg.Wait()
}

func demonstrateMultiProducerConsumer() {
    fmt.Println("\n--- 多生产者多消费者模式 ---")
    
    type WorkQueue struct {
        jobs     []string
        mu       sync.Mutex
        notEmpty *sync.Cond
        closed   bool
    }
    
    NewWorkQueue := func() *WorkQueue {
        wq := &WorkQueue{}
        wq.notEmpty = sync.NewCond(&wq.mu)
        return wq
    }
    
    // 添加工作
    AddJob := func(wq *WorkQueue, job string) bool {
        wq.mu.Lock()
        defer wq.mu.Unlock()
        
        if wq.closed {
            return false
        }
        
        wq.jobs = append(wq.jobs, job)
        fmt.Printf("添加工作: %s,队列长度: %d\n", job, len(wq.jobs))
        
        wq.notEmpty.Signal() // 通知一个等待的worker
        return true
    }
    
    // 获取工作
    GetJob := func(wq *WorkQueue) (string, bool) {
        wq.mu.Lock()
        defer wq.mu.Unlock()
        
        // 等待工作或队列关闭
        for len(wq.jobs) == 0 && !wq.closed {
            wq.notEmpty.Wait()
        }
        
        if len(wq.jobs) == 0 && wq.closed {
            return "", false // 队列已关闭且无工作
        }
        
        job := wq.jobs[0]
        wq.jobs = wq.jobs[1:]
        
        fmt.Printf("获取工作: %s,队列长度: %d\n", job, len(wq.jobs))
        return job, true
    }
    
    // 关闭队列
    Close := func(wq *WorkQueue) {
        wq.mu.Lock()
        defer wq.mu.Unlock()
        
        wq.closed = true
        wq.notEmpty.Broadcast() // 唤醒所有等待的worker
        fmt.Println("工作队列已关闭")
    }
    
    // 演示使用
    queue := NewWorkQueue()
    var wg sync.WaitGroup
    
    // 启动多个生产者
    for i := 1; i <= 2; i++ {
        wg.Add(1)
        go func(producerID int) {
            defer wg.Done()
            
            for j := 1; j <= 3; j++ {
                job := fmt.Sprintf("P%d-Job%d", producerID, j)
                AddJob(queue, job)
                time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
            }
        }(i)
    }
    
    // 启动多个消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for {
                job, ok := GetJob(queue)
                if !ok {
                    fmt.Printf("Worker %d: 队列已关闭,退出\n", workerID)
                    break
                }
                
                fmt.Printf("Worker %d: 处理 %s\n", workerID, job)
                time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
            }
        }(i)
    }
    
    // 等待所有生产者完成
    time.Sleep(2 * time.Second)
    Close(queue)
    
    wg.Wait()
}

::: :::

2. 同步屏障模式

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateBarrierPattern() {
    fmt.Println("\n=== 同步屏障模式 ===")
    
    // 实现一个同步屏障
    demonstrateSyncBarrier()
    
    // 分阶段执行模式
    demonstratePhaseExecution()
}

func demonstrateSyncBarrier() {
    fmt.Println("\n--- 同步屏障实现 ---")
    
    type Barrier struct {
        n       int         // 需要等待的goroutine数量
        count   int         // 当前已到达的goroutine数量
        mu      sync.Mutex
        cond    *sync.Cond
    }
    
    NewBarrier := func(n int) *Barrier {
        b := &Barrier{n: n}
        b.cond = sync.NewCond(&b.mu)
        return b
    }
    
    // 等待所有goroutine到达屏障点
    Wait := func(b *Barrier) {
        b.mu.Lock()
        defer b.mu.Unlock()
        
        b.count++
        fmt.Printf("Goroutine到达屏障,当前: %d/%d\n", b.count, b.n)
        
        if b.count < b.n {
            // 还没有全部到达,等待
            for b.count < b.n {
                b.cond.Wait()
            }
        } else {
            // 最后一个到达,唤醒所有等待的goroutine
            fmt.Println("所有goroutine已到达,释放屏障")
            b.cond.Broadcast()
        }
    }
    
    // 演示使用
    barrier := NewBarrier(4)
    var wg sync.WaitGroup
    
    for i := 1; i <= 4; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            // 第一阶段工作
            workTime := time.Duration(rand.Intn(1000)) * time.Millisecond
            fmt.Printf("Worker %d: 开始第一阶段工作 (%v)\n", id, workTime)
            time.Sleep(workTime)
            fmt.Printf("Worker %d: 完成第一阶段工作\n", id)
            
            // 等待所有worker完成第一阶段
            Wait(barrier)
            
            // 第二阶段工作
            fmt.Printf("Worker %d: 开始第二阶段工作\n", id)
            time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
            fmt.Printf("Worker %d: 完成第二阶段工作\n", id)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("所有worker完成所有阶段工作")
}

func demonstratePhaseExecution() {
    fmt.Println("\n--- 分阶段执行模式 ---")
    
    type PhaseManager struct {
        currentPhase int
        participants int
        arrived      int
        mu           sync.Mutex
        cond         *sync.Cond
    }
    
    NewPhaseManager := func(participants int) *PhaseManager {
        pm := &PhaseManager{
            participants: participants,
        }
        pm.cond = sync.NewCond(&pm.mu)
        return pm
    }
    
    // 等待进入下一阶段
    WaitForPhase := func(pm *PhaseManager, expectedPhase int) {
        pm.mu.Lock()
        defer pm.mu.Unlock()
        
        // 等待指定阶段到来
        for pm.currentPhase < expectedPhase {
            pm.cond.Wait()
        }
    }
    
    // 完成当前阶段
    CompletePhase := func(pm *PhaseManager) {
        pm.mu.Lock()
        defer pm.mu.Unlock()
        
        pm.arrived++
        fmt.Printf("参与者完成阶段 %d,已完成: %d/%d\n", 
            pm.currentPhase, pm.arrived, pm.participants)
        
        if pm.arrived == pm.participants {
            // 所有参与者完成当前阶段
            pm.currentPhase++
            pm.arrived = 0
            fmt.Printf("所有参与者完成阶段 %d,进入阶段 %d\n", 
                pm.currentPhase-1, pm.currentPhase)
            pm.cond.Broadcast()
        }
    }
    
    GetCurrentPhase := func(pm *PhaseManager) int {
        pm.mu.Lock()
        defer pm.mu.Unlock()
        return pm.currentPhase
    }
    
    // 演示使用
    phaseManager := NewPhaseManager(3)
    var wg sync.WaitGroup
    
    phases := []string{"初始化", "数据处理", "结果汇总", "清理"}
    
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for phase := 0; phase < len(phases); phase++ {
                // 等待进入阶段
                WaitForPhase(phaseManager, phase)
                
                // 执行阶段工作
                fmt.Printf("Worker %d: 执行%s阶段\n", workerID, phases[phase])
                workTime := time.Duration(rand.Intn(500)+200) * time.Millisecond
                time.Sleep(workTime)
                
                // 完成阶段
                CompletePhase(phaseManager)
            }
            
            fmt.Printf("Worker %d: 所有阶段完成\n", workerID)
        }(i)
    }
    
    // 监控进度
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for {
            currentPhase := GetCurrentPhase(phaseManager)
            if currentPhase >= len(phases) {
                fmt.Println("监控器: 所有阶段完成")
                break
            }
            
            fmt.Printf("监控器: 当前阶段 %d (%s)\n", currentPhase, phases[currentPhase])
            time.Sleep(300 * time.Millisecond)
        }
    }()
    
    wg.Wait()
}

::: :::

3. 事件通知和状态管理

点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateEventNotification() {
    fmt.Println("\n=== 事件通知和状态管理 ===")
    
    // 状态机实现
    demonstrateStateMachine()
    
    // 事件总线实现
    demonstrateEventBus()
}

func demonstrateStateMachine() {
    fmt.Println("\n--- 状态机实现 ---")
    
    type State int
    
    const (
        StateIdle State = iota
        StateRunning
        StatePaused
        StateStopped
    )
    
    func (s State) String() string {
        switch s {
        case StateIdle:
            return "Idle"
        case StateRunning:
            return "Running"
        case StatePaused:
            return "Paused"
        case StateStopped:
            return "Stopped"
        default:
            return "Unknown"
        }
    }
    
    type StateMachine struct {
        currentState State
        mu           sync.Mutex
        stateChanged *sync.Cond
    }
    
    NewStateMachine := func() *StateMachine {
        sm := &StateMachine{
            currentState: StateIdle,
        }
        sm.stateChanged = sync.NewCond(&sm.mu)
        return sm
    }
    
    // 获取当前状态
    GetState := func(sm *StateMachine) State {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        return sm.currentState
    }
    
    // 改变状态
    ChangeState := func(sm *StateMachine, newState State) bool {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        
        // 检查状态转换是否合法
        if !isValidTransition(sm.currentState, newState) {
            return false
        }
        
        oldState := sm.currentState
        sm.currentState = newState
        
        fmt.Printf("状态转换: %s -> %s\n", oldState, newState)
        sm.stateChanged.Broadcast()
        return true
    }
    
    // 等待特定状态
    WaitForState := func(sm *StateMachine, targetState State) {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        
        for sm.currentState != targetState {
            sm.stateChanged.Wait()
        }
    }
    
    // 等待任意状态变化
    WaitForStateChange := func(sm *StateMachine, fromState State) State {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        
        for sm.currentState == fromState {
            sm.stateChanged.Wait()
        }
        
        return sm.currentState
    }
    
    isValidTransition := func(from, to State) bool {
        switch from {
        case StateIdle:
            return to == StateRunning
        case StateRunning:
            return to == StatePaused || to == StateStopped
        case StatePaused:
            return to == StateRunning || to == StateStopped
        case StateStopped:
            return to == StateIdle
        default:
            return false
        }
    }
    
    // 演示使用
    sm := NewStateMachine()
    var wg sync.WaitGroup
    
    // 状态监控器
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 0; i < 5; i++ {
            currentState := GetState(sm)
            fmt.Printf("监控器: 当前状态 %s\n", currentState)
            
            newState := WaitForStateChange(sm, currentState)
            fmt.Printf("监控器: 状态已变化为 %s\n", newState)
        }
    }()
    
    // 等待特定状态的观察者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        fmt.Println("观察者: 等待Running状态")
        WaitForState(sm, StateRunning)
        fmt.Println("观察者: 检测到Running状态")
        
        fmt.Println("观察者: 等待Stopped状态")
        WaitForState(sm, StateStopped)
        fmt.Println("观察者: 检测到Stopped状态")
    }()
    
    // 状态控制器
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        transitions := []State{
            StateRunning,
            StatePaused,
            StateRunning,
            StateStopped,
            StateIdle,
        }
        
        for _, newState := range transitions {
            time.Sleep(300 * time.Millisecond)
            
            if ChangeState(sm, newState) {
                fmt.Printf("控制器: 成功转换到 %s\n", newState)
            } else {
                fmt.Printf("控制器: 无法转换到 %s\n", newState)
            }
        }
    }()
    
    wg.Wait()
}

func demonstrateEventBus() {
    fmt.Println("\n--- 事件总线实现 ---")
    
    type Event struct {
        Type string
        Data interface{}
    }
    
    type EventBus struct {
        subscribers map[string][]*Subscriber
        mu          sync.Mutex
        eventCond   *sync.Cond
    }
    
    type Subscriber struct {
        ID      string
        EventCh chan Event
        Active  bool
        cond    *sync.Cond
    }
    
    NewEventBus := func() *EventBus {
        eb := &EventBus{
            subscribers: make(map[string][]*Subscriber),
        }
        eb.eventCond = sync.NewCond(&eb.mu)
        return eb
    }
    
    NewSubscriber := func(id string) *Subscriber {
        s := &Subscriber{
            ID:      id,
            EventCh: make(chan Event, 10),
            Active:  true,
        }
        s.cond = sync.NewCond(&sync.Mutex{})
        return s
    }
    
    // 订阅事件
    Subscribe := func(eb *EventBus, eventType string, subscriber *Subscriber) {
        eb.mu.Lock()
        defer eb.mu.Unlock()
        
        eb.subscribers[eventType] = append(eb.subscribers[eventType], subscriber)
        fmt.Printf("订阅者 %s 订阅了事件类型: %s\n", subscriber.ID, eventType)
    }
    
    // 发布事件
    Publish := func(eb *EventBus, event Event) {
        eb.mu.Lock()
        subscribers := eb.subscribers[event.Type]
        eb.mu.Unlock()
        
        fmt.Printf("发布事件: %s, 数据: %v\n", event.Type, event.Data)
        
        for _, subscriber := range subscribers {
            if subscriber.Active {
                select {
                case subscriber.EventCh <- event:
                    fmt.Printf("事件已发送给订阅者 %s\n", subscriber.ID)
                default:
                    fmt.Printf("订阅者 %s 的通道已满,丢弃事件\n", subscriber.ID)
                }
            }
        }
        
        eb.mu.Lock()
        eb.eventCond.Broadcast()
        eb.mu.Unlock()
    }
    
    // 等待特定事件
    WaitForEvent := func(eb *EventBus, eventType string) {
        eb.mu.Lock()
        defer eb.mu.Unlock()
        
        // 简化的实现:在实际使用中需要更复杂的事件匹配
        eb.eventCond.Wait()
    }
    
    // 演示使用
    eventBus := NewEventBus()
    var wg sync.WaitGroup
    
    // 创建订阅者
    subscriber1 := NewSubscriber("Logger")
    subscriber2 := NewSubscriber("MetricsCollector")
    subscriber3 := NewSubscriber("Alerter")
    
    // 订阅不同事件
    Subscribe(eventBus, "user.login", subscriber1)
    Subscribe(eventBus, "user.login", subscriber2)
    Subscribe(eventBus, "system.error", subscriber1)
    Subscribe(eventBus, "system.error", subscriber3)
    
    // 启动订阅者处理器
    for _, sub := range []*Subscriber{subscriber1, subscriber2, subscriber3} {
        wg.Add(1)
        go func(s *Subscriber) {
            defer wg.Done()
            defer close(s.EventCh)
            
            fmt.Printf("订阅者 %s 开始处理事件\n", s.ID)
            
            eventCount := 0
            for event := range s.EventCh {
                eventCount++
                fmt.Printf("订阅者 %s 处理事件: %s, 数据: %v\n", 
                    s.ID, event.Type, event.Data)
                
                // 模拟处理时间
                time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
                
                if eventCount >= 3 { // 处理3个事件后退出
                    s.Active = false
                    break
                }
            }
            
            fmt.Printf("订阅者 %s 停止处理事件\n", s.ID)
        }(sub)
    }
    
    // 发布事件
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        events := []Event{
            {Type: "user.login", Data: map[string]string{"user": "alice", "ip": "192.168.1.1"}},
            {Type: "system.error", Data: map[string]string{"error": "database connection failed"}},
            {Type: "user.login", Data: map[string]string{"user": "bob", "ip": "192.168.1.2"}},
            {Type: "system.error", Data: map[string]string{"error": "memory usage high"}},
            {Type: "user.login", Data: map[string]string{"user": "charlie", "ip": "192.168.1.3"}},
        }
        
        for _, event := range events {
            time.Sleep(400 * time.Millisecond)
            Publish(eventBus, event)
        }
    }()
    
    wg.Wait()
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    demonstrateCondBasics()
    demonstrateCondInternals()
    demonstrateProducerConsumer()
    demonstrateBarrierPattern()
    demonstrateEventNotification()
}

:::

🎯 核心知识点总结

Cond基础要点

  1. 等待-通知模式: Cond实现了经典的等待-通知同步模式
  2. 必须配合锁使用: Cond必须与Mutex或RWMutex配合使用
  3. 三个核心方法: Wait()、Signal()、Broadcast()
  4. 条件检查: 使用for循环而不是if检查条件,防止虚假唤醒

工作机制要点

  1. Wait()的三步操作: 释放锁、等待、重新获取锁
  2. Signal vs Broadcast: Signal唤醒一个,Broadcast唤醒所有
  3. 虚假唤醒: 系统可能在条件未满足时唤醒goroutine
  4. 原子性: 条件检查和Wait()调用需要在同一锁保护下

应用场景要点

  1. 生产者-消费者: 经典的缓冲区管理模式
  2. 同步屏障: 等待所有线程到达同步点
  3. 状态机: 等待状态变化的通知机制
  4. 事件通知: 实现事件驱动的架构模式

最佳实践要点

  1. 正确的条件检查: 始终使用for循环检查条件
  2. 避免死锁: 确保Wait()和Signal()在相同锁保护下
  3. 合理的粒度: 不要在持有锁时执行长时间操作
  4. 资源清理: 确保goroutine能够正确退出和清理

🔍 面试准备建议

  1. 理解设计原理: 深入理解条件变量的设计理念和工作机制
  2. 掌握经典模式: 熟练使用生产者-消费者、屏障等经典模式
  3. 避免常见陷阱: 了解虚假唤醒、死锁等常见问题
  4. 实际应用能力: 能够在实际场景中正确使用Cond
  5. 性能考虑: 理解Cond的性能特点和适用场景

正在精进