Skip to content

Goroutine泄漏检测详解 - Golang并发编程面试题

Goroutine泄漏是Go并发编程中的常见问题,会导致内存泄漏和性能下降。本章深入探讨Goroutine泄漏的原因、检测方法和预防策略。

📋 重点面试题

面试题 1:Goroutine泄漏的常见原因和识别

难度级别:⭐⭐⭐⭐
考察范围:内存管理/调试技巧
技术标签goroutine leak memory leak channel blocking infinite loop resource management

问题分析

理解Goroutine泄漏的根本原因对于编写可靠的Go程序至关重要,这涉及正确的资源管理和并发控制。

详细解答

1. 常见的Goroutine泄漏场景

点击查看完整代码实现
点击查看完整代码实现
go
package main

import (
    "context"
    "fmt"
    "runtime"
    "sync"
    "time"
)

func demonstrateCommonLeakScenarios() {
    fmt.Println("=== 常见Goroutine泄漏场景 ===")
    
    // 场景1:永远阻塞的channel操作
    demonstrateChannelBlockingLeak()
    
    // 场景2:无限循环
    demonstrateInfiniteLoopLeak()
    
    // 场景3:未正确关闭的资源
    demonstrateResourceLeak()
    
    // 场景4:context未正确传播
    demonstrateContextLeak()
}

func demonstrateChannelBlockingLeak() {
    fmt.Println("\n--- 场景1:Channel阻塞导致的泄漏 ---")
    
    initialGoroutines := runtime.NumGoroutine()
    fmt.Printf("初始Goroutine数量: %d\n", initialGoroutines)
    
    // 错误示例:发送到无缓冲channel但没有接收者
    func() {
        ch := make(chan int)
        
        // 这个goroutine会永远阻塞
        go func() {
            fmt.Println("尝试发送数据...")
            ch <- 42 // 永远阻塞,因为没有接收者
            fmt.Println("发送完成") // 永远不会执行
        }()
        
        // 主函数退出,但goroutine仍然阻塞
        time.Sleep(100 * time.Millisecond)
    }()
    
    afterLeakGoroutines := runtime.NumGoroutine()
    fmt.Printf("创建泄漏后Goroutine数量: %d\n", afterLeakGoroutines)
    fmt.Printf("泄漏的Goroutine数量: %d\n", afterLeakGoroutines-initialGoroutines)
    
    // 正确示例:使用context控制goroutine生命周期
    fmt.Println("\n正确的处理方式:")
    func() {
        ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
        defer cancel()
        
        ch := make(chan int)
        
        go func() {
            select {
            case ch <- 42:
                fmt.Println("发送成功")
            case <-ctx.Done():
                fmt.Println("发送被取消")
                return
            }
        }()
        
        time.Sleep(100 * time.Millisecond)
        // context超时会自动取消goroutine
    }()
    
    time.Sleep(300 * time.Millisecond) // 等待context超时
    finalGoroutines := runtime.NumGoroutine()
    fmt.Printf("修复后Goroutine数量: %d\n", finalGoroutines)
}

func demonstrateInfiniteLoopLeak() {
    fmt.Println("\n--- 场景2:无限循环导致的泄漏 ---")
    
    initialGoroutines := runtime.NumGoroutine()
    fmt.Printf("初始Goroutine数量: %d\n", initialGoroutines)
    
    // 错误示例:没有退出条件的无限循环
    func() {
        running := true
        
        go func() {
            for running { // 问题:外部变量可能永远不变
                // 执行一些工作
                time.Sleep(10 * time.Millisecond)
                fmt.Print(".")
            }
            fmt.Println("\n无限循环goroutine退出")
        }()
        
        time.Sleep(100 * time.Millisecond)
        // 忘记设置running = false,goroutine永远不会退出
    }()
    
    afterLeakGoroutines := runtime.NumGoroutine()
    fmt.Printf("\n无限循环后Goroutine数量: %d\n", afterLeakGoroutines)
    
    // 正确示例:使用channel或context控制循环
    fmt.Println("\n正确的处理方式:")
    func() {
        done := make(chan struct{})
        
        go func() {
            defer fmt.Println("受控循环goroutine退出")
            
            for {
                select {
                case <-done:
                    return
                default:
                    // 执行工作
                    time.Sleep(10 * time.Millisecond)
                    fmt.Print("+")
                }
            }
        }()
        
        time.Sleep(100 * time.Millisecond)
        close(done) // 正确关闭
        time.Sleep(50 * time.Millisecond) // 等待goroutine退出
    }()
    
    fixedGoroutines := runtime.NumGoroutine()
    fmt.Printf("\n修复后Goroutine数量: %d\n", fixedGoroutines)
}

func demonstrateResourceLeak() {
    fmt.Println("\n--- 场景3:资源未正确关闭导致的泄漏 ---")
    
    initialGoroutines := runtime.NumGoroutine()
    fmt.Printf("初始Goroutine数量: %d\n", initialGoroutines)
    
    // 错误示例:忘记关闭ticker
    func() {
        ticker := time.NewTicker(50 * time.Millisecond)
        // 忘记调用 ticker.Stop()
        
        go func() {
            for range ticker.C {
                fmt.Print("T")
            }
            fmt.Println("\nTicker goroutine退出")
        }()
        
        time.Sleep(200 * time.Millisecond)
        // ticker没有被停止,goroutine会继续运行
    }()
    
    afterLeakGoroutines := runtime.NumGoroutine()
    fmt.Printf("\nTicker泄漏后Goroutine数量: %d\n", afterLeakGoroutines)
    
    // 正确示例:properly close resources
    fmt.Println("\n正确的处理方式:")
    func() {
        ticker := time.NewTicker(50 * time.Millisecond)
        defer ticker.Stop() // 确保资源被释放
        
        done := make(chan struct{})
        
        go func() {
            defer fmt.Println("受控Ticker goroutine退出")
            
            for {
                select {
                case <-ticker.C:
                    fmt.Print("t")
                case <-done:
                    return
                }
            }
        }()
        
        time.Sleep(200 * time.Millisecond)
        close(done)
        time.Sleep(100 * time.Millisecond) // 等待清理
    }()
    
    fixedGoroutines := runtime.NumGoroutine()
    fmt.Printf("\n修复后Goroutine数量: %d\n", fixedGoroutines)
}

func demonstrateContextLeak() {
    fmt.Println("\n--- 场景4:Context未正确传播导致的泄漏 ---")
    
    initialGoroutines := runtime.NumGoroutine()
    fmt.Printf("初始Goroutine数量: %d\n", initialGoroutines)
    
    // 错误示例:不传播context
    func() {
        ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
        defer cancel()
        
        // 启动工作,但没有传递context
        startWorkerWithoutContext()
        
        // 即使主context被取消,worker仍然运行
        <-ctx.Done()
        fmt.Println("主context超时")
    }()
    
    time.Sleep(200 * time.Millisecond)
    afterLeakGoroutines := runtime.NumGoroutine()
    fmt.Printf("Context泄漏后Goroutine数量: %d\n", afterLeakGoroutines)
    
    // 正确示例:传播context
    fmt.Println("\n正确的处理方式:")
    func() {
        ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
        defer cancel()
        
        // 正确传递context
        startWorkerWithContext(ctx)
        
        <-ctx.Done()
        fmt.Println("主context超时,所有worker应该停止")
    }()
    
    time.Sleep(200 * time.Millisecond)
    fixedGoroutines := runtime.NumGoroutine()
    fmt.Printf("修复后Goroutine数量: %d\n", fixedGoroutines)
}

func startWorkerWithoutContext() {
    go func() {
        for {
            // 没有退出机制的工作循环
            time.Sleep(50 * time.Millisecond)
            fmt.Print("W")
        }
    }()
}

func startWorkerWithContext(ctx context.Context) {
    go func() {
        defer fmt.Println("\nWorker with context 退出")
        
        for {
            select {
            case <-ctx.Done():
                return
            default:
                time.Sleep(50 * time.Millisecond)
                fmt.Print("w")
            }
        }
    }()
}

:::

2. Goroutine泄漏检测工具和方法

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateLeakDetection() {
    fmt.Println("\n=== Goroutine泄漏检测方法 ===")
    
    // 方法1:基本计数检测
    demonstrateBasicCounting()
    
    // 方法2:详细堆栈分析
    demonstrateStackAnalysis()
    
    // 方法3:自动化检测工具
    demonstrateAutomatedDetection()
}

func demonstrateBasicCounting() {
    fmt.Println("\n--- 基本计数检测 ---")
    
    type GoroutineChecker struct {
        initialCount int
        name         string
    }
    
    NewGoroutineChecker := func(name string) *GoroutineChecker {
        return &GoroutineChecker{
            initialCount: runtime.NumGoroutine(),
            name:         name,
        }
    }
    
    Check := func(gc *GoroutineChecker) {
        currentCount := runtime.NumGoroutine()
        diff := currentCount - gc.initialCount
        
        if diff > 0 {
            fmt.Printf("⚠️  %s: 检测到 %d 个可能的泄漏goroutine\n", gc.name, diff)
        } else {
            fmt.Printf("✅ %s: 没有检测到goroutine泄漏\n", gc.name)
        }
    }
    
    // 测试正常情况
    checker1 := NewGoroutineChecker("正常测试")
    func() {
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(50 * time.Millisecond)
        }()
        wg.Wait()
    }()
    Check(checker1)
    
    // 测试泄漏情况
    checker2 := NewGoroutineChecker("泄漏测试")
    func() {
        ch := make(chan int)
        go func() {
            <-ch // 永远阻塞
        }()
        time.Sleep(50 * time.Millisecond)
    }()
    Check(checker2)
}

func demonstrateStackAnalysis() {
    fmt.Println("\n--- 堆栈分析检测 ---")
    
    // 创建一些不同类型的goroutine
    blockingCh := make(chan int)
    
    // 阻塞的goroutine
    go func() {
        <-blockingCh
    }()
    
    // 无限循环的goroutine
    go func() {
        for {
            time.Sleep(time.Second)
        }
    }()
    
    // 正常的goroutine
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(100 * time.Millisecond)
    }()
    
    time.Sleep(200 * time.Millisecond)
    
    // 获取所有goroutine的堆栈信息
    buf := make([]byte, 1024*1024)
    stackSize := runtime.Stack(buf, true)
    
    fmt.Printf("当前Goroutine数量: %d\n", runtime.NumGoroutine())
    fmt.Println("Goroutine堆栈信息(部分):")
    
    // 打印前1000字符的堆栈信息
    stackStr := string(buf[:stackSize])
    if len(stackStr) > 1000 {
        fmt.Printf("%s...\n", stackStr[:1000])
    } else {
        fmt.Println(stackStr)
    }
    
    wg.Wait()
}

func demonstrateAutomatedDetection() {
    fmt.Println("\n--- 自动化检测工具 ---")
    
    // 实现一个简单的goroutine监控器
    type GoroutineMonitor struct {
        baseline    int
        threshold   int
        checkPeriod time.Duration
        stopCh      chan struct{}
    }
    
    NewMonitor := func(threshold int, checkPeriod time.Duration) *GoroutineMonitor {
        return &GoroutineMonitor{
            baseline:    runtime.NumGoroutine(),
            threshold:   threshold,
            checkPeriod: checkPeriod,
            stopCh:      make(chan struct{}),
        }
    }
    
    Start := func(m *GoroutineMonitor) {
        go func() {
            ticker := time.NewTicker(m.checkPeriod)
            defer ticker.Stop()
            
            for {
                select {
                case <-ticker.C:
                    current := runtime.NumGoroutine()
                    diff := current - m.baseline
                    
                    if diff > m.threshold {
                        fmt.Printf("🚨 检测到可能的goroutine泄漏: 基线 %d, 当前 %d, 差异 %d\n",
                            m.baseline, current, diff)
                    } else {
                        fmt.Printf("📊 Goroutine监控: 基线 %d, 当前 %d, 差异 %d\n",
                            m.baseline, current, diff)
                    }
                    
                case <-m.stopCh:
                    fmt.Println("Goroutine监控器停止")
                    return
                }
            }
        }()
    }
    
    Stop := func(m *GoroutineMonitor) {
        close(m.stopCh)
    }
    
    // 启动监控器
    monitor := NewMonitor(2, 500*time.Millisecond)
    Start(monitor)
    
    // 模拟一些goroutine活动
    time.Sleep(600 * time.Millisecond)
    
    // 创建一些短生命周期的goroutine
    for i := 0; i < 3; i++ {
        go func(id int) {
            time.Sleep(200 * time.Millisecond)
            fmt.Printf("短期goroutine %d 完成\n", id)
        }(i)
    }
    
    time.Sleep(1 * time.Second)
    
    // 创建泄漏的goroutine
    for i := 0; i < 5; i++ {
        ch := make(chan int)
        go func(id int) {
            <-ch // 永远阻塞
        }(i)
    }
    
    time.Sleep(1 * time.Second)
    Stop(monitor)
    time.Sleep(100 * time.Millisecond)
}

::: :::

面试题 2:预防Goroutine泄漏的最佳实践

难度级别:⭐⭐⭐⭐⭐
考察范围:最佳实践/架构设计
技术标签leak prevention context management resource cleanup graceful shutdown timeout handling

问题分析

预防胜于检测,掌握预防Goroutine泄漏的最佳实践是编写高质量Go代码的关键。

详细解答

1. Context驱动的生命周期管理

点击查看完整代码实现
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateContextDrivenLifecycle() {
    fmt.Println("\n=== Context驱动的生命周期管理 ===")
    
    // 模式1:使用context.WithCancel
    demonstrateCancelContext()
    
    // 模式2:使用context.WithTimeout
    demonstrateTimeoutContext()
    
    // 模式3:使用context.WithDeadline
    demonstrateDeadlineContext()
    
    // 模式4:Context链式传播
    demonstrateContextChaining()
}

func demonstrateCancelContext() {
    fmt.Println("\n--- Cancel Context模式 ---")
    
    ctx, cancel := context.WithCancel(context.Background())
    
    // 启动多个受控的goroutine
    var wg sync.WaitGroup
    
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go controlledWorker(ctx, &wg, fmt.Sprintf("worker-%d", i))
    }
    
    // 让worker运行一段时间
    time.Sleep(500 * time.Millisecond)
    
    // 取消所有worker
    fmt.Println("取消所有worker...")
    cancel()
    
    // 等待所有worker优雅退出
    wg.Wait()
    fmt.Println("所有worker已退出")
}

func controlledWorker(ctx context.Context, wg *sync.WaitGroup, name string) {
    defer wg.Done()
    defer fmt.Printf("%s 退出\n", name)
    
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("%s 收到取消信号: %v\n", name, ctx.Err())
            return
            
        case <-ticker.C:
            fmt.Printf("%s 工作中...\n", name)
        }
    }
}

func demonstrateTimeoutContext() {
    fmt.Println("\n--- Timeout Context模式 ---")
    
    // 为每个任务设置超时
    tasks := []struct {
        name    string
        timeout time.Duration
        work    time.Duration
    }{
        {"快速任务", 200 * time.Millisecond, 100 * time.Millisecond},
        {"慢速任务", 200 * time.Millisecond, 300 * time.Millisecond},
        {"中等任务", 500 * time.Millisecond, 250 * time.Millisecond},
    }
    
    var wg sync.WaitGroup
    
    for _, task := range tasks {
        wg.Add(1)
        go func(t struct {
            name    string
            timeout time.Duration
            work    time.Duration
        }) {
            defer wg.Done()
            
            ctx, cancel := context.WithTimeout(context.Background(), t.timeout)
            defer cancel()
            
            executeTaskWithTimeout(ctx, t.name, t.work)
        }(task)
    }
    
    wg.Wait()
}

func executeTaskWithTimeout(ctx context.Context, name string, workDuration time.Duration) {
    fmt.Printf("%s 开始执行 (预计耗时: %v)\n", name, workDuration)
    
    done := make(chan struct{})
    
    go func() {
        defer close(done)
        // 模拟工作
        time.Sleep(workDuration)
    }()
    
    select {
    case <-done:
        fmt.Printf("%s 成功完成\n", name)
    case <-ctx.Done():
        fmt.Printf("%s 超时取消: %v\n", name, ctx.Err())
    }
}

func demonstrateDeadlineContext() {
    fmt.Println("\n--- Deadline Context模式 ---")
    
    // 设置绝对截止时间
    deadline := time.Now().Add(1 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    
    var wg sync.WaitGroup
    
    // 启动多个任务,但都受同一截止时间约束
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            workDuration := time.Duration(id*200+300) * time.Millisecond
            executeTaskWithDeadline(ctx, fmt.Sprintf("任务-%d", id), workDuration)
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("所有任务完成,距离截止时间还有: %v\n", time.Until(deadline))
}

func executeTaskWithDeadline(ctx context.Context, name string, workDuration time.Duration) {
    fmt.Printf("%s 开始 (工作时长: %v)\n", name, workDuration)
    
    timer := time.NewTimer(workDuration)
    defer timer.Stop()
    
    select {
    case <-timer.C:
        fmt.Printf("%s 完成\n", name)
    case <-ctx.Done():
        fmt.Printf("%s 被截止时间取消: %v\n", name, ctx.Err())
    }
}

func demonstrateContextChaining() {
    fmt.Println("\n--- Context链式传播 ---")
    
    // 根context
    rootCtx, rootCancel := context.WithCancel(context.Background())
    defer rootCancel()
    
    // 启动服务管理器
    var wg sync.WaitGroup
    wg.Add(1)
    go serviceManager(rootCtx, &wg)
    
    // 运行一段时间后关闭
    time.Sleep(1 * time.Second)
    fmt.Println("开始优雅关闭...")
    rootCancel()
    
    wg.Wait()
    fmt.Println("服务管理器已完全关闭")
}

func serviceManager(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    defer fmt.Println("服务管理器退出")
    
    // 为每个服务创建子context
    var serviceWg sync.WaitGroup
    
    services := []string{"web-server", "db-connector", "message-processor"}
    
    for _, serviceName := range services {
        serviceCtx, serviceCancel := context.WithCancel(ctx)
        
        serviceWg.Add(1)
        go func(name string, sCtx context.Context, cancel context.CancelFunc) {
            defer serviceWg.Done()
            defer cancel()
            
            runService(sCtx, name)
        }(serviceName, serviceCtx, serviceCancel)
    }
    
    // 等待根context取消
    <-ctx.Done()
    fmt.Println("服务管理器收到关闭信号,等待所有服务停止...")
    
    // 等待所有服务停止
    serviceWg.Wait()
}

func runService(ctx context.Context, name string) {
    defer fmt.Printf("服务 %s 已停止\n", name)
    
    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("服务 %s 收到停止信号\n", name)
            return
            
        case <-ticker.C:
            fmt.Printf("服务 %s 运行中...\n", name)
        }
    }
}

::: :::

2. 安全的Channel模式

点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateSafeChannelPatterns() {
    fmt.Println("\n=== 安全的Channel模式 ===")
    
    // 模式1:超时保护的channel操作
    demonstrateTimeoutProtectedChannels()
    
    // 模式2:带取消的channel操作
    demonstrateCancelableChannels()
    
    // 模式3:安全的生产者-消费者模式
    demonstrateSafeProducerConsumer()
    
    // 模式4:扇入扇出模式
    demonstrateFanInFanOut()
}

func demonstrateTimeoutProtectedChannels() {
    fmt.Println("\n--- 超时保护的Channel操作 ---")
    
    // 发送端超时保护
    ch := make(chan string, 1)
    
    // 安全的发送操作
    safeSend := func(ch chan<- string, msg string, timeout time.Duration) bool {
        timer := time.NewTimer(timeout)
        defer timer.Stop()
        
        select {
        case ch <- msg:
            fmt.Printf("成功发送: %s\n", msg)
            return true
        case <-timer.C:
            fmt.Printf("发送超时: %s\n", msg)
            return false
        }
    }
    
    // 安全的接收操作
    safeReceive := func(ch <-chan string, timeout time.Duration) (string, bool) {
        timer := time.NewTimer(timeout)
        defer timer.Stop()
        
        select {
        case msg := <-ch:
            fmt.Printf("成功接收: %s\n", msg)
            return msg, true
        case <-timer.C:
            fmt.Println("接收超时")
            return "", false
        }
    }
    
    // 测试场景
    go func() {
        time.Sleep(200 * time.Millisecond)
        safeSend(ch, "延迟消息", 100*time.Millisecond) // 这会超时
    }()
    
    safeSend(ch, "即时消息", 500*time.Millisecond)
    safeReceive(ch, 300*time.Millisecond)
    safeReceive(ch, 100*time.Millisecond) // 这会超时
    
    time.Sleep(300 * time.Millisecond)
}

func demonstrateCancelableChannels() {
    fmt.Println("\n--- 带取消的Channel操作 ---")
    
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    data := make(chan int, 5)
    results := make(chan int, 5)
    
    // 可取消的生产者
    go func() {
        defer close(data)
        defer fmt.Println("生产者退出")
        
        for i := 0; i < 10; i++ {
            select {
            case data <- i:
                fmt.Printf("生产: %d\n", i)
                time.Sleep(100 * time.Millisecond)
                
            case <-ctx.Done():
                fmt.Println("生产者被取消")
                return
            }
        }
    }()
    
    // 可取消的消费者
    go func() {
        defer close(results)
        defer fmt.Println("消费者退出")
        
        for {
            select {
            case item, ok := <-data:
                if !ok {
                    fmt.Println("数据通道关闭,消费者正常退出")
                    return
                }
                
                // 处理数据
                processed := item * 2
                
                select {
                case results <- processed:
                    fmt.Printf("处理: %d -> %d\n", item, processed)
                case <-ctx.Done():
                    fmt.Println("消费者被取消")
                    return
                }
                
            case <-ctx.Done():
                fmt.Println("消费者被取消")
                return
            }
        }
    }()
    
    // 收集结果
    go func() {
        defer fmt.Println("结果收集器退出")
        
        for {
            select {
            case result, ok := <-results:
                if !ok {
                    fmt.Println("结果通道关闭,收集器正常退出")
                    return
                }
                fmt.Printf("收集结果: %d\n", result)
                
            case <-ctx.Done():
                fmt.Println("结果收集器被取消")
                return
            }
        }
    }()
    
    // 运行一段时间后取消
    time.Sleep(500 * time.Millisecond)
    fmt.Println("开始取消操作...")
    cancel()
    
    time.Sleep(200 * time.Millisecond)
}

func demonstrateSafeProducerConsumer() {
    fmt.Println("\n--- 安全的生产者-消费者模式 ---")
    
    type SafeQueue struct {
        items   chan interface{}
        ctx     context.Context
        cancel  context.CancelFunc
        wg      sync.WaitGroup
    }
    
    NewSafeQueue := func(bufferSize int) *SafeQueue {
        ctx, cancel := context.WithCancel(context.Background())
        return &SafeQueue{
            items:  make(chan interface{}, bufferSize),
            ctx:    ctx,
            cancel: cancel,
        }
    }
    
    StartProducer := func(sq *SafeQueue, producer func() interface{}) {
        sq.wg.Add(1)
        go func() {
            defer sq.wg.Done()
            defer fmt.Println("生产者goroutine退出")
            
            for {
                select {
                case <-sq.ctx.Done():
                    return
                default:
                    item := producer()
                    if item == nil {
                        return // 生产者完成
                    }
                    
                    select {
                    case sq.items <- item:
                        fmt.Printf("生产项目: %v\n", item)
                    case <-sq.ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    
    StartConsumer := func(sq *SafeQueue, consumer func(interface{})) {
        sq.wg.Add(1)
        go func() {
            defer sq.wg.Done()
            defer fmt.Println("消费者goroutine退出")
            
            for {
                select {
                case item, ok := <-sq.items:
                    if !ok {
                        return // 队列关闭
                    }
                    consumer(item)
                    
                case <-sq.ctx.Done():
                    return
                }
            }
        }()
    }
    
    Close := func(sq *SafeQueue) {
        sq.cancel()
        close(sq.items)
        sq.wg.Wait()
    }
    
    // 使用安全队列
    queue := NewSafeQueue(5)
    
    // 启动生产者
    counter := 0
    StartProducer(queue, func() interface{} {
        if counter >= 5 {
            return nil // 完成生产
        }
        counter++
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("item-%d", counter)
    })
    
    // 启动消费者
    StartConsumer(queue, func(item interface{}) {
        fmt.Printf("消费项目: %v\n", item)
        time.Sleep(150 * time.Millisecond)
    })
    
    // 等待一段时间后关闭
    time.Sleep(1 * time.Second)
    fmt.Println("关闭安全队列...")
    Close(queue)
}

func demonstrateFanInFanOut() {
    fmt.Println("\n--- 扇入扇出模式 ---")
    
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    // 扇出:将单个输入分发到多个worker
    fanOut := func(ctx context.Context, input <-chan int, numWorkers int) []<-chan int {
        outputs := make([]<-chan int, numWorkers)
        
        for i := 0; i < numWorkers; i++ {
            output := make(chan int)
            outputs[i] = output
            
            go func(workerID int, out chan<- int) {
                defer close(out)
                defer fmt.Printf("Worker %d 退出\n", workerID)
                
                for {
                    select {
                    case item, ok := <-input:
                        if !ok {
                            return
                        }
                        
                        // 处理项目
                        processed := item * (workerID + 1)
                        fmt.Printf("Worker %d 处理: %d -> %d\n", workerID, item, processed)
                        
                        select {
                        case out <- processed:
                        case <-ctx.Done():
                            return
                        }
                        
                    case <-ctx.Done():
                        return
                    }
                }
            }(i, output)
        }
        
        return outputs
    }
    
    // 扇入:将多个worker的输出合并到单个通道
    fanIn := func(ctx context.Context, inputs ...<-chan int) <-chan int {
        output := make(chan int)
        var wg sync.WaitGroup
        
        for i, input := range inputs {
            wg.Add(1)
            go func(inputID int, in <-chan int) {
                defer wg.Done()
                defer fmt.Printf("FanIn worker %d 退出\n", inputID)
                
                for {
                    select {
                    case item, ok := <-in:
                        if !ok {
                            return
                        }
                        
                        select {
                        case output <- item:
                            fmt.Printf("FanIn收集: %d (来自worker %d)\n", item, inputID)
                        case <-ctx.Done():
                            return
                        }
                        
                    case <-ctx.Done():
                        return
                    }
                }
            }(i, input)
        }
        
        go func() {
            wg.Wait()
            close(output)
            fmt.Println("FanIn输出通道关闭")
        }()
        
        return output
    }
    
    // 创建输入数据
    input := make(chan int, 5)
    go func() {
        defer close(input)
        for i := 1; i <= 6; i++ {
            select {
            case input <- i:
                fmt.Printf("输入: %d\n", i)
                time.Sleep(200 * time.Millisecond)
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // 扇出到3个worker
    workerOutputs := fanOut(ctx, input, 3)
    
    // 扇入合并结果
    finalOutput := fanIn(ctx, workerOutputs...)
    
    // 收集最终结果
    go func() {
        defer fmt.Println("结果收集完成")
        
        for {
            select {
            case result, ok := <-finalOutput:
                if !ok {
                    return
                }
                fmt.Printf("最终结果: %d\n", result)
                
            case <-ctx.Done():
                fmt.Println("结果收集被取消")
                return
            }
        }
    }()
    
    // 等待context超时
    <-ctx.Done()
    time.Sleep(200 * time.Millisecond) // 等待清理
}

func main() {
    demonstrateCommonLeakScenarios()
    demonstrateLeakDetection()
    demonstrateContextDrivenLifecycle()
    demonstrateSafeChannelPatterns()
}

:::

🎯 核心知识点总结

泄漏原因要点

  1. Channel阻塞: 发送或接收操作永远阻塞
  2. 无限循环: 没有正确的退出条件
  3. 资源未释放: Ticker、Timer等资源未正确关闭
  4. Context未传播: 取消信号无法传递到子goroutine

检测方法要点

  1. 基本计数: 监控goroutine数量变化
  2. 堆栈分析: 分析阻塞的goroutine堆栈
  3. 自动化工具: 使用监控器持续检测
  4. 性能分析: 使用pprof等工具深度分析

预防策略要点

  1. Context管理: 使用context控制goroutine生命周期
  2. 超时保护: 为所有阻塞操作设置超时
  3. 优雅关闭: 实现正确的资源清理机制
  4. 安全模式: 使用proven的并发模式和最佳实践

最佳实践要点

  1. 总是有退出机制: 每个goroutine都应该有明确的退出路径
  2. 传播取消信号: 使用context在调用链中传播取消
  3. 资源清理: 使用defer确保资源被正确释放
  4. 监控和测试: 定期检查goroutine数量和资源使用

🔍 面试准备建议

  1. 理解泄漏原因: 深入理解各种导致goroutine泄漏的场景
  2. 掌握检测方法: 学会使用各种工具和方法检测泄漏
  3. 实践预防措施: 在实际项目中应用泄漏预防最佳实践
  4. 熟悉调试工具: 掌握pprof、trace等Go调试工具的使用
  5. 代码审查经验: 能够在代码审查中识别潜在的泄漏风险

正在精进