条件变量Cond详解 - Golang并发编程面试题
条件变量(Cond)是Go语言提供的一种同步原语,用于在某个条件满足时唤醒等待的goroutine。本章深入探讨Cond的工作原理、使用模式和实际应用场景。
📋 重点面试题
面试题 1:Cond的基本概念和工作原理
难度级别:⭐⭐⭐⭐
考察范围:同步原语/条件等待
技术标签:sync.Cond condition variable wait-notify pattern spurious wakeup broadcast
问题分析
理解条件变量的设计理念和与其他同步原语的配合使用,掌握等待-通知模式是高级并发编程的重要技能。
详细解答
1. Cond基本概念
点击查看完整代码实现
点击查看完整代码实现
go
package main
import (
"fmt"
"sync"
"time"
"math/rand"
)
func demonstrateCondBasics() {
fmt.Println("=== Cond基本概念演示 ===")
// Cond的基本结构和使用
demonstrateBasicStructure()
// Wait-Signal模式
demonstrateWaitSignalPattern()
// Wait-Broadcast模式
demonstrateWaitBroadcastPattern()
// 条件检查的重要性
demonstrateConditionCheck()
}
func demonstrateBasicStructure() {
fmt.Println("\n--- Cond基本结构 ---")
/*
sync.Cond的基本结构:
type Cond struct {
noCopy noCopy
L Locker // 关联的锁(通常是*Mutex或*RWMutex)
notify notifyList
checker copyChecker
}
主要方法:
- Wait(): 等待条件满足
- Signal(): 唤醒一个等待的goroutine
- Broadcast(): 唤醒所有等待的goroutine
*/
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
fmt.Println("创建条件变量,初始条件为false")
// 启动等待者
go func() {
cond.L.Lock()
defer cond.L.Unlock()
fmt.Println("等待者: 开始等待条件满足")
// 等待条件满足
for !ready {
fmt.Println("等待者: 条件未满足,进入等待")
cond.Wait() // 释放锁并等待
fmt.Println("等待者: 被唤醒,重新检查条件")
}
fmt.Println("等待者: 条件满足,继续执行")
}()
// 等待一段时间后设置条件
time.Sleep(1 * time.Second)
cond.L.Lock()
ready = true
fmt.Println("设置者: 条件已设置为true")
cond.Signal() // 唤醒等待的goroutine
fmt.Println("设置者: 已发送信号")
cond.L.Unlock()
time.Sleep(100 * time.Millisecond) // 等待等待者完成
}
func demonstrateWaitSignalPattern() {
fmt.Println("\n--- Wait-Signal模式 ---")
var mu sync.Mutex
cond := sync.NewCond(&mu)
// 共享资源和条件
var queue []int
var wg sync.WaitGroup
// 消费者goroutine
consumer := func(id int) {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
// 等待队列中有数据
for len(queue) == 0 {
fmt.Printf("消费者 %d: 队列为空,等待数据\n", id)
cond.Wait()
}
// 消费数据
item := queue[0]
queue = queue[1:]
fmt.Printf("消费者 %d: 消费了数据 %d,队列长度: %d\n", id, item, len(queue))
}
// 生产者函数
producer := func(data int) {
cond.L.Lock()
defer cond.L.Unlock()
// 生产数据
queue = append(queue, data)
fmt.Printf("生产者: 生产了数据 %d,队列长度: %d\n", data, len(queue))
// 通知一个等待的消费者
cond.Signal()
}
// 启动多个消费者
for i := 0; i < 3; i++ {
wg.Add(1)
go consumer(i)
}
// 等待消费者准备好
time.Sleep(200 * time.Millisecond)
// 生产一些数据
for i := 1; i <= 3; i++ {
producer(i * 10)
time.Sleep(300 * time.Millisecond)
}
wg.Wait()
}
func demonstrateWaitBroadcastPattern() {
fmt.Println("\n--- Wait-Broadcast模式 ---")
var mu sync.Mutex
cond := sync.NewCond(&mu)
var gameStarted bool
var wg sync.WaitGroup
// 玩家goroutine
player := func(id int) {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("玩家 %d: 准备就绪,等待游戏开始\n", id)
// 等待游戏开始
for !gameStarted {
cond.Wait()
}
fmt.Printf("玩家 %d: 游戏开始!开始游戏\n", id)
// 模拟游戏过程
cond.L.Unlock() // 临时释放锁执行游戏逻辑
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
cond.L.Lock() // 重新获取锁
fmt.Printf("玩家 %d: 游戏结束\n", id)
}
// 启动多个玩家
for i := 1; i <= 5; i++ {
wg.Add(1)
go player(i)
}
// 等待所有玩家准备
time.Sleep(500 * time.Millisecond)
// 游戏管理员启动游戏
cond.L.Lock()
gameStarted = true
fmt.Println("游戏管理员: 所有玩家准备就绪,游戏开始!")
cond.Broadcast() // 唤醒所有等待的玩家
cond.L.Unlock()
wg.Wait()
fmt.Println("游戏管理员: 所有玩家游戏结束")
}
func demonstrateConditionCheck() {
fmt.Println("\n--- 条件检查的重要性 ---")
var mu sync.Mutex
cond := sync.NewCond(&mu)
var resource int
var processed []int
var wg sync.WaitGroup
// 错误的条件检查(仅作演示)
wrongWorker := func(id int) {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("错误Worker %d: 等待资源\n", id)
// 错误:使用if而不是for,可能导致虚假唤醒问题
if resource <= 0 {
cond.Wait()
}
if resource > 0 {
resource--
processed = append(processed, id)
fmt.Printf("错误Worker %d: 处理了资源,剩余: %d\n", id, resource)
} else {
fmt.Printf("错误Worker %d: 被唤醒但没有资源可处理\n", id)
}
}
// 正确的条件检查
correctWorker := func(id int) {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("正确Worker %d: 等待资源\n", id)
// 正确:使用for循环检查条件,防止虚假唤醒
for resource <= 0 {
cond.Wait()
}
resource--
processed = append(processed, id+100) // 区分正确worker
fmt.Printf("正确Worker %d: 处理了资源,剩余: %d\n", id, resource)
}
// 演示错误方式
fmt.Println("演示错误的条件检查:")
resource = 2
processed = nil
for i := 1; i <= 3; i++ {
wg.Add(1)
go wrongWorker(i)
}
time.Sleep(200 * time.Millisecond)
cond.L.Lock()
fmt.Println("管理者: 广播唤醒所有worker")
cond.Broadcast()
cond.L.Unlock()
wg.Wait()
fmt.Printf("错误方式处理结果: %v\n", processed)
// 演示正确方式
fmt.Println("\n演示正确的条件检查:")
resource = 2
processed = nil
for i := 1; i <= 3; i++ {
wg.Add(1)
go correctWorker(i)
}
time.Sleep(200 * time.Millisecond)
cond.L.Lock()
fmt.Println("管理者: 广播唤醒所有worker")
cond.Broadcast()
cond.L.Unlock()
wg.Wait()
fmt.Printf("正确方式处理结果: %v\n", processed)
}:::
2. Cond的内部机制
点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateCondInternals() {
fmt.Println("\n=== Cond内部机制演示 ===")
// 演示Wait()的三步操作
demonstrateWaitMechanism()
// 演示Signal vs Broadcast的区别
demonstrateSignalVsBroadcast()
// 演示虚假唤醒问题
demonstrateSpuriousWakeup()
}
func demonstrateWaitMechanism() {
fmt.Println("\n--- Wait()机制详解 ---")
/*
Wait()的三步操作:
1. 释放关联的锁 c.L.Unlock()
2. 将当前goroutine加入等待队列并阻塞
3. 被唤醒后重新获取锁 c.L.Lock()
*/
var mu sync.Mutex
cond := sync.NewCond(&mu)
var step int
var wg sync.WaitGroup
// 监控锁状态的goroutine
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 6; i++ {
time.Sleep(200 * time.Millisecond)
// 尝试获取锁来检查状态
acquired := false
done := make(chan struct{})
go func() {
mu.Lock()
acquired = true
mu.Unlock()
close(done)
}()
select {
case <-done:
if acquired {
fmt.Printf("监控器: 锁当前是可用的 (步骤 %d)\n", step)
}
case <-time.After(50 * time.Millisecond):
fmt.Printf("监控器: 锁当前被持有 (步骤 %d)\n", step)
}
}
}()
// 等待者
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("等待者: 获取锁")
cond.L.Lock()
step = 1
fmt.Println("等待者: 持有锁,准备等待")
step = 2
time.Sleep(300 * time.Millisecond)
fmt.Println("等待者: 调用Wait() - 即将释放锁")
step = 3
cond.Wait() // 1.释放锁 2.等待 3.重新获取锁
fmt.Println("等待者: Wait()返回,重新持有锁")
step = 5
time.Sleep(200 * time.Millisecond)
fmt.Println("等待者: 释放锁")
step = 6
cond.L.Unlock()
}()
// 信号发送者
time.Sleep(800 * time.Millisecond)
fmt.Println("信号者: 获取锁并发送信号")
cond.L.Lock()
step = 4
cond.Signal()
fmt.Println("信号者: 信号已发送,释放锁")
cond.L.Unlock()
wg.Wait()
}
func demonstrateSignalVsBroadcast() {
fmt.Println("\n--- Signal vs Broadcast 对比 ---")
// 测试Signal:只唤醒一个goroutine
fmt.Println("测试Signal:")
testSignalBehavior()
time.Sleep(500 * time.Millisecond)
// 测试Broadcast:唤醒所有goroutine
fmt.Println("\n测试Broadcast:")
testBroadcastBehavior()
}
func testSignalBehavior() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
var wg sync.WaitGroup
// 启动多个等待者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("Signal等待者 %d: 开始等待\n", id)
for !ready {
cond.Wait()
}
fmt.Printf("Signal等待者 %d: 被唤醒\n", id)
}(i)
}
time.Sleep(200 * time.Millisecond)
// 发送一个Signal
cond.L.Lock()
ready = true
fmt.Println("Signal发送者: 发送Signal")
cond.Signal() // 只唤醒一个
cond.L.Unlock()
time.Sleep(100 * time.Millisecond)
// 再发送一个Signal
cond.L.Lock()
fmt.Println("Signal发送者: 再次发送Signal")
cond.Signal()
cond.L.Unlock()
time.Sleep(100 * time.Millisecond)
// 最后一个Signal
cond.L.Lock()
fmt.Println("Signal发送者: 最后一次发送Signal")
cond.Signal()
cond.L.Unlock()
wg.Wait()
}
func testBroadcastBehavior() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
var wg sync.WaitGroup
// 启动多个等待者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("Broadcast等待者 %d: 开始等待\n", id)
for !ready {
cond.Wait()
}
fmt.Printf("Broadcast等待者 %d: 被唤醒\n", id)
}(i)
}
time.Sleep(200 * time.Millisecond)
// 发送一个Broadcast
cond.L.Lock()
ready = true
fmt.Println("Broadcast发送者: 发送Broadcast")
cond.Broadcast() // 唤醒所有
cond.L.Unlock()
wg.Wait()
}
func demonstrateSpuriousWakeup() {
fmt.Println("\n--- 虚假唤醒问题演示 ---")
var mu sync.Mutex
cond := sync.NewCond(&mu)
var condition bool
var wg sync.WaitGroup
// 模拟可能的虚假唤醒情况
wg.Add(1)
go func() {
defer wg.Done()
cond.L.Lock()
defer cond.L.Unlock()
fmt.Println("等待者: 开始等待条件")
// 正确的做法:使用for循环而不是if
waitCount := 0
for !condition {
waitCount++
fmt.Printf("等待者: 第 %d 次等待\n", waitCount)
cond.Wait()
fmt.Printf("等待者: 第 %d 次被唤醒,检查条件: %v\n", waitCount, condition)
}
fmt.Printf("等待者: 条件满足,总共等待了 %d 次\n", waitCount)
}()
time.Sleep(200 * time.Millisecond)
// 模拟虚假唤醒:发送信号但不改变条件
fmt.Println("干扰者: 发送虚假信号(条件仍为false)")
cond.Signal()
time.Sleep(100 * time.Millisecond)
// 再次虚假唤醒
fmt.Println("干扰者: 再次发送虚假信号")
cond.Signal()
time.Sleep(100 * time.Millisecond)
// 最终真正满足条件
cond.L.Lock()
condition = true
fmt.Println("设置者: 条件设置为true并发送信号")
cond.Signal()
cond.L.Unlock()
wg.Wait()
}::: :::
面试题 2:Cond的实际应用场景和模式
难度级别:⭐⭐⭐⭐⭐
考察范围:实际应用/设计模式
技术标签:producer-consumer reader-writer barrier pattern event notification thread pool
问题分析
理解Cond在实际场景中的应用,掌握常见的并发设计模式和最佳实践。
详细解答
1. 生产者-消费者模式
点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateProducerConsumer() {
fmt.Println("\n=== 生产者-消费者模式 ===")
// 有界缓冲区的实现
demonstrateBoundedBuffer()
// 多生产者多消费者模式
demonstrateMultiProducerConsumer()
}
func demonstrateBoundedBuffer() {
fmt.Println("\n--- 有界缓冲区实现 ---")
type BoundedBuffer struct {
buffer []interface{}
capacity int
count int
in int // 生产者索引
out int // 消费者索引
mu sync.Mutex
notFull *sync.Cond // 缓冲区不满的条件
notEmpty *sync.Cond // 缓冲区不空的条件
}
NewBoundedBuffer := func(capacity int) *BoundedBuffer {
bb := &BoundedBuffer{
buffer: make([]interface{}, capacity),
capacity: capacity,
}
bb.notFull = sync.NewCond(&bb.mu)
bb.notEmpty = sync.NewCond(&bb.mu)
return bb
}
// 生产者操作
Put := func(bb *BoundedBuffer, item interface{}) {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待缓冲区不满
for bb.count == bb.capacity {
fmt.Printf("生产者: 缓冲区已满,等待空间\n")
bb.notFull.Wait()
}
// 放入数据
bb.buffer[bb.in] = item
bb.in = (bb.in + 1) % bb.capacity
bb.count++
fmt.Printf("生产者: 放入 %v,缓冲区大小: %d/%d\n",
item, bb.count, bb.capacity)
// 通知消费者
bb.notEmpty.Signal()
}
// 消费者操作
Get := func(bb *BoundedBuffer) interface{} {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待缓冲区不空
for bb.count == 0 {
fmt.Printf("消费者: 缓冲区为空,等待数据\n")
bb.notEmpty.Wait()
}
// 取出数据
item := bb.buffer[bb.out]
bb.buffer[bb.out] = nil // 清理引用
bb.out = (bb.out + 1) % bb.capacity
bb.count--
fmt.Printf("消费者: 取出 %v,缓冲区大小: %d/%d\n",
item, bb.count, bb.capacity)
// 通知生产者
bb.notFull.Signal()
return item
}
// 演示使用
buffer := NewBoundedBuffer(3)
var wg sync.WaitGroup
// 启动生产者
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i <= 6; i++ {
Put(buffer, fmt.Sprintf("item-%d", i))
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
}
}()
// 启动消费者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 6; i++ {
item := Get(buffer)
fmt.Printf("消费者处理: %v\n", item)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}()
wg.Wait()
}
func demonstrateMultiProducerConsumer() {
fmt.Println("\n--- 多生产者多消费者模式 ---")
type WorkQueue struct {
jobs []string
mu sync.Mutex
notEmpty *sync.Cond
closed bool
}
NewWorkQueue := func() *WorkQueue {
wq := &WorkQueue{}
wq.notEmpty = sync.NewCond(&wq.mu)
return wq
}
// 添加工作
AddJob := func(wq *WorkQueue, job string) bool {
wq.mu.Lock()
defer wq.mu.Unlock()
if wq.closed {
return false
}
wq.jobs = append(wq.jobs, job)
fmt.Printf("添加工作: %s,队列长度: %d\n", job, len(wq.jobs))
wq.notEmpty.Signal() // 通知一个等待的worker
return true
}
// 获取工作
GetJob := func(wq *WorkQueue) (string, bool) {
wq.mu.Lock()
defer wq.mu.Unlock()
// 等待工作或队列关闭
for len(wq.jobs) == 0 && !wq.closed {
wq.notEmpty.Wait()
}
if len(wq.jobs) == 0 && wq.closed {
return "", false // 队列已关闭且无工作
}
job := wq.jobs[0]
wq.jobs = wq.jobs[1:]
fmt.Printf("获取工作: %s,队列长度: %d\n", job, len(wq.jobs))
return job, true
}
// 关闭队列
Close := func(wq *WorkQueue) {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.closed = true
wq.notEmpty.Broadcast() // 唤醒所有等待的worker
fmt.Println("工作队列已关闭")
}
// 演示使用
queue := NewWorkQueue()
var wg sync.WaitGroup
// 启动多个生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(producerID int) {
defer wg.Done()
for j := 1; j <= 3; j++ {
job := fmt.Sprintf("P%d-Job%d", producerID, j)
AddJob(queue, job)
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
}
}(i)
}
// 启动多个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
job, ok := GetJob(queue)
if !ok {
fmt.Printf("Worker %d: 队列已关闭,退出\n", workerID)
break
}
fmt.Printf("Worker %d: 处理 %s\n", workerID, job)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}(i)
}
// 等待所有生产者完成
time.Sleep(2 * time.Second)
Close(queue)
wg.Wait()
}::: :::
2. 同步屏障模式
点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateBarrierPattern() {
fmt.Println("\n=== 同步屏障模式 ===")
// 实现一个同步屏障
demonstrateSyncBarrier()
// 分阶段执行模式
demonstratePhaseExecution()
}
func demonstrateSyncBarrier() {
fmt.Println("\n--- 同步屏障实现 ---")
type Barrier struct {
n int // 需要等待的goroutine数量
count int // 当前已到达的goroutine数量
mu sync.Mutex
cond *sync.Cond
}
NewBarrier := func(n int) *Barrier {
b := &Barrier{n: n}
b.cond = sync.NewCond(&b.mu)
return b
}
// 等待所有goroutine到达屏障点
Wait := func(b *Barrier) {
b.mu.Lock()
defer b.mu.Unlock()
b.count++
fmt.Printf("Goroutine到达屏障,当前: %d/%d\n", b.count, b.n)
if b.count < b.n {
// 还没有全部到达,等待
for b.count < b.n {
b.cond.Wait()
}
} else {
// 最后一个到达,唤醒所有等待的goroutine
fmt.Println("所有goroutine已到达,释放屏障")
b.cond.Broadcast()
}
}
// 演示使用
barrier := NewBarrier(4)
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 第一阶段工作
workTime := time.Duration(rand.Intn(1000)) * time.Millisecond
fmt.Printf("Worker %d: 开始第一阶段工作 (%v)\n", id, workTime)
time.Sleep(workTime)
fmt.Printf("Worker %d: 完成第一阶段工作\n", id)
// 等待所有worker完成第一阶段
Wait(barrier)
// 第二阶段工作
fmt.Printf("Worker %d: 开始第二阶段工作\n", id)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
fmt.Printf("Worker %d: 完成第二阶段工作\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有worker完成所有阶段工作")
}
func demonstratePhaseExecution() {
fmt.Println("\n--- 分阶段执行模式 ---")
type PhaseManager struct {
currentPhase int
participants int
arrived int
mu sync.Mutex
cond *sync.Cond
}
NewPhaseManager := func(participants int) *PhaseManager {
pm := &PhaseManager{
participants: participants,
}
pm.cond = sync.NewCond(&pm.mu)
return pm
}
// 等待进入下一阶段
WaitForPhase := func(pm *PhaseManager, expectedPhase int) {
pm.mu.Lock()
defer pm.mu.Unlock()
// 等待指定阶段到来
for pm.currentPhase < expectedPhase {
pm.cond.Wait()
}
}
// 完成当前阶段
CompletePhase := func(pm *PhaseManager) {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.arrived++
fmt.Printf("参与者完成阶段 %d,已完成: %d/%d\n",
pm.currentPhase, pm.arrived, pm.participants)
if pm.arrived == pm.participants {
// 所有参与者完成当前阶段
pm.currentPhase++
pm.arrived = 0
fmt.Printf("所有参与者完成阶段 %d,进入阶段 %d\n",
pm.currentPhase-1, pm.currentPhase)
pm.cond.Broadcast()
}
}
GetCurrentPhase := func(pm *PhaseManager) int {
pm.mu.Lock()
defer pm.mu.Unlock()
return pm.currentPhase
}
// 演示使用
phaseManager := NewPhaseManager(3)
var wg sync.WaitGroup
phases := []string{"初始化", "数据处理", "结果汇总", "清理"}
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for phase := 0; phase < len(phases); phase++ {
// 等待进入阶段
WaitForPhase(phaseManager, phase)
// 执行阶段工作
fmt.Printf("Worker %d: 执行%s阶段\n", workerID, phases[phase])
workTime := time.Duration(rand.Intn(500)+200) * time.Millisecond
time.Sleep(workTime)
// 完成阶段
CompletePhase(phaseManager)
}
fmt.Printf("Worker %d: 所有阶段完成\n", workerID)
}(i)
}
// 监控进度
wg.Add(1)
go func() {
defer wg.Done()
for {
currentPhase := GetCurrentPhase(phaseManager)
if currentPhase >= len(phases) {
fmt.Println("监控器: 所有阶段完成")
break
}
fmt.Printf("监控器: 当前阶段 %d (%s)\n", currentPhase, phases[currentPhase])
time.Sleep(300 * time.Millisecond)
}
}()
wg.Wait()
}::: :::
3. 事件通知和状态管理
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateEventNotification() {
fmt.Println("\n=== 事件通知和状态管理 ===")
// 状态机实现
demonstrateStateMachine()
// 事件总线实现
demonstrateEventBus()
}
func demonstrateStateMachine() {
fmt.Println("\n--- 状态机实现 ---")
type State int
const (
StateIdle State = iota
StateRunning
StatePaused
StateStopped
)
func (s State) String() string {
switch s {
case StateIdle:
return "Idle"
case StateRunning:
return "Running"
case StatePaused:
return "Paused"
case StateStopped:
return "Stopped"
default:
return "Unknown"
}
}
type StateMachine struct {
currentState State
mu sync.Mutex
stateChanged *sync.Cond
}
NewStateMachine := func() *StateMachine {
sm := &StateMachine{
currentState: StateIdle,
}
sm.stateChanged = sync.NewCond(&sm.mu)
return sm
}
// 获取当前状态
GetState := func(sm *StateMachine) State {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.currentState
}
// 改变状态
ChangeState := func(sm *StateMachine, newState State) bool {
sm.mu.Lock()
defer sm.mu.Unlock()
// 检查状态转换是否合法
if !isValidTransition(sm.currentState, newState) {
return false
}
oldState := sm.currentState
sm.currentState = newState
fmt.Printf("状态转换: %s -> %s\n", oldState, newState)
sm.stateChanged.Broadcast()
return true
}
// 等待特定状态
WaitForState := func(sm *StateMachine, targetState State) {
sm.mu.Lock()
defer sm.mu.Unlock()
for sm.currentState != targetState {
sm.stateChanged.Wait()
}
}
// 等待任意状态变化
WaitForStateChange := func(sm *StateMachine, fromState State) State {
sm.mu.Lock()
defer sm.mu.Unlock()
for sm.currentState == fromState {
sm.stateChanged.Wait()
}
return sm.currentState
}
isValidTransition := func(from, to State) bool {
switch from {
case StateIdle:
return to == StateRunning
case StateRunning:
return to == StatePaused || to == StateStopped
case StatePaused:
return to == StateRunning || to == StateStopped
case StateStopped:
return to == StateIdle
default:
return false
}
}
// 演示使用
sm := NewStateMachine()
var wg sync.WaitGroup
// 状态监控器
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
currentState := GetState(sm)
fmt.Printf("监控器: 当前状态 %s\n", currentState)
newState := WaitForStateChange(sm, currentState)
fmt.Printf("监控器: 状态已变化为 %s\n", newState)
}
}()
// 等待特定状态的观察者
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("观察者: 等待Running状态")
WaitForState(sm, StateRunning)
fmt.Println("观察者: 检测到Running状态")
fmt.Println("观察者: 等待Stopped状态")
WaitForState(sm, StateStopped)
fmt.Println("观察者: 检测到Stopped状态")
}()
// 状态控制器
wg.Add(1)
go func() {
defer wg.Done()
transitions := []State{
StateRunning,
StatePaused,
StateRunning,
StateStopped,
StateIdle,
}
for _, newState := range transitions {
time.Sleep(300 * time.Millisecond)
if ChangeState(sm, newState) {
fmt.Printf("控制器: 成功转换到 %s\n", newState)
} else {
fmt.Printf("控制器: 无法转换到 %s\n", newState)
}
}
}()
wg.Wait()
}
func demonstrateEventBus() {
fmt.Println("\n--- 事件总线实现 ---")
type Event struct {
Type string
Data interface{}
}
type EventBus struct {
subscribers map[string][]*Subscriber
mu sync.Mutex
eventCond *sync.Cond
}
type Subscriber struct {
ID string
EventCh chan Event
Active bool
cond *sync.Cond
}
NewEventBus := func() *EventBus {
eb := &EventBus{
subscribers: make(map[string][]*Subscriber),
}
eb.eventCond = sync.NewCond(&eb.mu)
return eb
}
NewSubscriber := func(id string) *Subscriber {
s := &Subscriber{
ID: id,
EventCh: make(chan Event, 10),
Active: true,
}
s.cond = sync.NewCond(&sync.Mutex{})
return s
}
// 订阅事件
Subscribe := func(eb *EventBus, eventType string, subscriber *Subscriber) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.subscribers[eventType] = append(eb.subscribers[eventType], subscriber)
fmt.Printf("订阅者 %s 订阅了事件类型: %s\n", subscriber.ID, eventType)
}
// 发布事件
Publish := func(eb *EventBus, event Event) {
eb.mu.Lock()
subscribers := eb.subscribers[event.Type]
eb.mu.Unlock()
fmt.Printf("发布事件: %s, 数据: %v\n", event.Type, event.Data)
for _, subscriber := range subscribers {
if subscriber.Active {
select {
case subscriber.EventCh <- event:
fmt.Printf("事件已发送给订阅者 %s\n", subscriber.ID)
default:
fmt.Printf("订阅者 %s 的通道已满,丢弃事件\n", subscriber.ID)
}
}
}
eb.mu.Lock()
eb.eventCond.Broadcast()
eb.mu.Unlock()
}
// 等待特定事件
WaitForEvent := func(eb *EventBus, eventType string) {
eb.mu.Lock()
defer eb.mu.Unlock()
// 简化的实现:在实际使用中需要更复杂的事件匹配
eb.eventCond.Wait()
}
// 演示使用
eventBus := NewEventBus()
var wg sync.WaitGroup
// 创建订阅者
subscriber1 := NewSubscriber("Logger")
subscriber2 := NewSubscriber("MetricsCollector")
subscriber3 := NewSubscriber("Alerter")
// 订阅不同事件
Subscribe(eventBus, "user.login", subscriber1)
Subscribe(eventBus, "user.login", subscriber2)
Subscribe(eventBus, "system.error", subscriber1)
Subscribe(eventBus, "system.error", subscriber3)
// 启动订阅者处理器
for _, sub := range []*Subscriber{subscriber1, subscriber2, subscriber3} {
wg.Add(1)
go func(s *Subscriber) {
defer wg.Done()
defer close(s.EventCh)
fmt.Printf("订阅者 %s 开始处理事件\n", s.ID)
eventCount := 0
for event := range s.EventCh {
eventCount++
fmt.Printf("订阅者 %s 处理事件: %s, 数据: %v\n",
s.ID, event.Type, event.Data)
// 模拟处理时间
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
if eventCount >= 3 { // 处理3个事件后退出
s.Active = false
break
}
}
fmt.Printf("订阅者 %s 停止处理事件\n", s.ID)
}(sub)
}
// 发布事件
wg.Add(1)
go func() {
defer wg.Done()
events := []Event{
{Type: "user.login", Data: map[string]string{"user": "alice", "ip": "192.168.1.1"}},
{Type: "system.error", Data: map[string]string{"error": "database connection failed"}},
{Type: "user.login", Data: map[string]string{"user": "bob", "ip": "192.168.1.2"}},
{Type: "system.error", Data: map[string]string{"error": "memory usage high"}},
{Type: "user.login", Data: map[string]string{"user": "charlie", "ip": "192.168.1.3"}},
}
for _, event := range events {
time.Sleep(400 * time.Millisecond)
Publish(eventBus, event)
}
}()
wg.Wait()
}
func main() {
rand.Seed(time.Now().UnixNano())
demonstrateCondBasics()
demonstrateCondInternals()
demonstrateProducerConsumer()
demonstrateBarrierPattern()
demonstrateEventNotification()
}:::
🎯 核心知识点总结
Cond基础要点
- 等待-通知模式: Cond实现了经典的等待-通知同步模式
- 必须配合锁使用: Cond必须与Mutex或RWMutex配合使用
- 三个核心方法: Wait()、Signal()、Broadcast()
- 条件检查: 使用for循环而不是if检查条件,防止虚假唤醒
工作机制要点
- Wait()的三步操作: 释放锁、等待、重新获取锁
- Signal vs Broadcast: Signal唤醒一个,Broadcast唤醒所有
- 虚假唤醒: 系统可能在条件未满足时唤醒goroutine
- 原子性: 条件检查和Wait()调用需要在同一锁保护下
应用场景要点
- 生产者-消费者: 经典的缓冲区管理模式
- 同步屏障: 等待所有线程到达同步点
- 状态机: 等待状态变化的通知机制
- 事件通知: 实现事件驱动的架构模式
最佳实践要点
- 正确的条件检查: 始终使用for循环检查条件
- 避免死锁: 确保Wait()和Signal()在相同锁保护下
- 合理的粒度: 不要在持有锁时执行长时间操作
- 资源清理: 确保goroutine能够正确退出和清理
🔍 面试准备建议
- 理解设计原理: 深入理解条件变量的设计理念和工作机制
- 掌握经典模式: 熟练使用生产者-消费者、屏障等经典模式
- 避免常见陷阱: 了解虚假唤醒、死锁等常见问题
- 实际应用能力: 能够在实际场景中正确使用Cond
- 性能考虑: 理解Cond的性能特点和适用场景
