Skip to content

线程池

1. 简单的Goroutine池实现

go
import (
    "context"
    "errors"
    "sync"
    "sync/atomic"
)

// 任务接口
type Task interface {
    Execute() error
}

// 函数任务
type FuncTask struct {
    fn func() error
}

func (ft *FuncTask) Execute() error {
    return ft.fn()
}

// NewFuncTask 创建函数任务
func NewFuncTask(fn func() error) Task {
    return &FuncTask{fn: fn}
}

// 简单的Goroutine池
type SimpleGoroutinePool struct {
    workers    int
    taskQueue  chan Task
    wg         sync.WaitGroup
    ctx        context.Context
    cancel     context.CancelFunc
    closed     int32
}

func NewSimpleGoroutinePool(workers, queueSize int) *SimpleGoroutinePool {
    ctx, cancel := context.WithCancel(context.Background())
    
    pool := &SimpleGoroutinePool{
        workers:   workers,
        taskQueue: make(chan Task, queueSize),
        ctx:       ctx,
        cancel:    cancel,
    }
    
    // 启动工作goroutine
    for i := 0; i < workers; i++ {
        pool.wg.Add(1)
        go pool.worker(i)
    }
    
    return pool
}

func (p *SimpleGoroutinePool) worker(id int) {
    defer p.wg.Done()
    
    fmt.Printf("工作者 %d 启动\n", id)
    defer fmt.Printf("工作者 %d 退出\n", id)
    
    for {
        select {
        case task, ok := <-p.taskQueue:
            if !ok {
                return // 任务队列关闭
            }
            
            if err := task.Execute(); err != nil {
                fmt.Printf("工作者 %d 执行任务失败: %v\n", id, err)
            }
            
        case <-p.ctx.Done():
            return // 池被关闭
        }
    }
}

func (p *SimpleGoroutinePool) Submit(task Task) error {
    if atomic.LoadInt32(&p.closed) == 1 {
        return errors.New("pool is closed")
    }
    
    select {
    case p.taskQueue <- task:
        return nil
    case <-p.ctx.Done():
        return errors.New("pool is closed")
    default:
        return errors.New("task queue is full")
    }
}

func (p *SimpleGoroutinePool) SubmitFunc(fn func() error) error {
    return p.Submit(NewFuncTask(fn))
}

func (p *SimpleGoroutinePool) Close() {
    if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
        return // 已经关闭
    }
    
    close(p.taskQueue)
    p.cancel()
    p.wg.Wait()
}

func demonstrateSimplePool() {
    fmt.Println("\n=== 简单Goroutine池演示 ===")
    
    // 创建池
    pool := NewSimpleGoroutinePool(3, 10)
    defer pool.Close()
    
    // 提交任务
    for i := 0; i < 10; i++ {
        taskID := i
        err := pool.SubmitFunc(func() error {
            fmt.Printf("执行任务 %d\n", taskID)
            time.Sleep(time.Duration(taskID*100) * time.Millisecond)
            return nil
        })
        
        if err != nil {
            fmt.Printf("提交任务 %d 失败: %v\n", taskID, err)
        }
    }
    
    // 等待一段时间让任务完成
    time.Sleep(2 * time.Second)
}

::: :::

2. 高级Goroutine池实现

点击查看完整代码实现
点击查看完整代码实现
go
// 任务结果
type TaskResult struct {
    Result interface{}
    Error  error
}

// 带结果的任务
type TaskWithResult struct {
    fn     func() (interface{}, error)
    result chan TaskResult
}

func (t *TaskWithResult) Execute() error {
    result, err := t.fn()
    t.result <- TaskResult{Result: result, Error: err}
    return err
}

// 高级Goroutine池
type AdvancedGoroutinePool struct {
    workers     int
    maxWorkers  int
    taskQueue   chan Task
    workerQueue chan chan Task
    wg          sync.WaitGroup
    ctx         context.Context
    cancel      context.CancelFunc
    closed      int32
    
    // 统计信息
    submittedTasks int64
    completedTasks int64
    failedTasks    int64
    activeWorkers  int64
}

func NewAdvancedGoroutinePool(minWorkers, maxWorkers, queueSize int) *AdvancedGoroutinePool {
    ctx, cancel := context.WithCancel(context.Background())
    
    pool := &AdvancedGoroutinePool{
        workers:     minWorkers,
        maxWorkers:  maxWorkers,
        taskQueue:   make(chan Task, queueSize),
        workerQueue: make(chan chan Task, maxWorkers),
        ctx:         ctx,
        cancel:      cancel,
    }
    
    // 启动调度器
    go pool.dispatcher()
    
    // 启动最小数量的工作者
    for i := 0; i < minWorkers; i++ {
        go pool.startWorker(i)
    }
    
    return pool
}

func (p *AdvancedGoroutinePool) dispatcher() {
    for {
        select {
        case task := <-p.taskQueue:
            // 尝试获取可用的工作者
            select {
            case workerChannel := <-p.workerQueue:
                // 有可用工作者,分配任务
                workerChannel <- task
                
            default:
                // 没有可用工作者,检查是否可以创建新的
                currentWorkers := atomic.LoadInt64(&p.activeWorkers)
                if int(currentWorkers) < p.maxWorkers {
                    // 创建新的工作者
                    go p.startWorker(int(currentWorkers))
                    
                    // 等待新工作者注册
                    workerChannel := <-p.workerQueue
                    workerChannel <- task
                } else {
                    // 等待工作者可用
                    workerChannel := <-p.workerQueue
                    workerChannel <- task
                }
            }
            
        case <-p.ctx.Done():
            return
        }
    }
}

func (p *AdvancedGoroutinePool) startWorker(id int) {
    atomic.AddInt64(&p.activeWorkers, 1)
    defer atomic.AddInt64(&p.activeWorkers, -1)
    
    p.wg.Add(1)
    defer p.wg.Done()
    
    fmt.Printf("高级工作者 %d 启动\n", id)
    defer fmt.Printf("高级工作者 %d 退出\n", id)
    
    // 工作者的任务通道
    taskChannel := make(chan Task)
    
    for {
        // 注册到工作者队列
        select {
        case p.workerQueue <- taskChannel:
            // 成功注册,等待任务
            select {
            case task := <-taskChannel:
                // 执行任务
                if err := task.Execute(); err != nil {
                    atomic.AddInt64(&p.failedTasks, 1)
                    fmt.Printf("高级工作者 %d 任务失败: %v\n", id, err)
                } else {
                    atomic.AddInt64(&p.completedTasks, 1)
                }
                
            case <-p.ctx.Done():
                return
            }
            
        case <-p.ctx.Done():
            return
        }
    }
}

func (p *AdvancedGoroutinePool) Submit(task Task) error {
    if atomic.LoadInt32(&p.closed) == 1 {
        return errors.New("pool is closed")
    }
    
    atomic.AddInt64(&p.submittedTasks, 1)
    
    select {
    case p.taskQueue <- task:
        return nil
    case <-p.ctx.Done():
        return errors.New("pool is closed")
    default:
        return errors.New("task queue is full")
    }
}

func (p *AdvancedGoroutinePool) SubmitWithResult(fn func() (interface{}, error)) (interface{}, error) {
    task := &TaskWithResult{
        fn:     fn,
        result: make(chan TaskResult, 1),
    }
    
    if err := p.Submit(task); err != nil {
        return nil, err
    }
    
    result := <-task.result
    return result.Result, result.Error
}

func (p *AdvancedGoroutinePool) Stats() (submitted, completed, failed, active int64) {
    return atomic.LoadInt64(&p.submittedTasks),
           atomic.LoadInt64(&p.completedTasks),
           atomic.LoadInt64(&p.failedTasks),
           atomic.LoadInt64(&p.activeWorkers)
}

func (p *AdvancedGoroutinePool) Close() {
    if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
        return
    }
    
    close(p.taskQueue)
    p.cancel()
    p.wg.Wait()
}

func demonstrateAdvancedPool() {
    fmt.Println("\n=== 高级Goroutine池演示 ===")
    
    // 创建高级池
    pool := NewAdvancedGoroutinePool(2, 5, 20)
    defer pool.Close()
    
    // 提交不同类型的任务
    var wg sync.WaitGroup
    
    // 提交普通任务
    for i := 0; i < 15; i++ {
        taskID := i
        pool.SubmitFunc(func() error {
            fmt.Printf("执行普通任务 %d\n", taskID)
            time.Sleep(time.Duration(taskID%5*100) * time.Millisecond)
            return nil
        })
    }
    
    // 提交带结果的任务
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            result, err := pool.SubmitWithResult(func() (interface{}, error) {
                sum := 0
                for j := 0; j < id*10; j++ {
                    sum += j
                }
                return sum, nil
            })
            
            if err != nil {
                fmt.Printf("带结果任务 %d 失败: %v\n", id, err)
            } else {
                fmt.Printf("带结果任务 %d 结果: %v\n", id, result)
            }
        }(i)
    }
    
    // 定期打印统计信息
    go func() {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                submitted, completed, failed, active := pool.Stats()
                fmt.Printf("池统计 - 提交: %d, 完成: %d, 失败: %d, 活跃工作者: %d\n",
                    submitted, completed, failed, active)
                
            case <-pool.ctx.Done():
                return
            }
        }
    }()
    
    wg.Wait()
    time.Sleep(2 * time.Second)
    
    // 最终统计
    submitted, completed, failed, active := pool.Stats()
    fmt.Printf("最终统计 - 提交: %d, 完成: %d, 失败: %d, 活跃工作者: %d\n",
        submitted, completed, failed, active)
}

:::

3. 性能测试和对比

点击查看完整代码实现
go
func demonstratePoolPerformance() {
    fmt.Println("\n=== Goroutine池性能测试 ===")
    
    const numTasks = 10000
    
    // 测试1:直接创建goroutine
    start := time.Now()
    testDirectGoroutines(numTasks)
    directTime := time.Since(start)
    
    // 测试2:使用简单池
    start = time.Now()
    testSimplePool(numTasks)
    simplePoolTime := time.Since(start)
    
    // 测试3:使用高级池
    start = time.Now()
    testAdvancedPool(numTasks)
    advancedPoolTime := time.Since(start)
    
    fmt.Printf("\n性能对比 (%d 个任务):\n", numTasks)
    fmt.Printf("直接创建goroutine: %v\n", directTime)
    fmt.Printf("简单池: %v (%.2fx)\n", simplePoolTime, 
        float64(directTime)/float64(simplePoolTime))
    fmt.Printf("高级池: %v (%.2fx)\n", advancedPoolTime,
        float64(directTime)/float64(advancedPoolTime))
}

func testDirectGoroutines(numTasks int) {
    var wg sync.WaitGroup
    
    for i := 0; i < numTasks; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            // 简单的计算任务
            sum := 0
            for j := 0; j < 100; j++ {
                sum += j
            }
        }(i)
    }
    
    wg.Wait()
}

func testSimplePool(numTasks int) {
    pool := NewSimpleGoroutinePool(10, 100)
    defer pool.Close()
    
    var wg sync.WaitGroup
    
    for i := 0; i < numTasks; i++ {
        wg.Add(1)
        pool.SubmitFunc(func() error {
            defer wg.Done()
            // 简单的计算任务
            sum := 0
            for j := 0; j < 100; j++ {
                sum += j
            }
            return nil
        })
    }
    
    wg.Wait()
}

func testAdvancedPool(numTasks int) {
    pool := NewAdvancedGoroutinePool(5, 15, 200)
    defer pool.Close()
    
    var wg sync.WaitGroup
    
    for i := 0; i < numTasks; i++ {
        wg.Add(1)
        pool.SubmitFunc(func() error {
            defer wg.Done()
            // 简单的计算任务
            sum := 0
            for j := 0; j < 100; j++ {
                sum += j
            }
            return nil
        })
    }
    
    wg.Wait()
}

func main() {
    demonstrateGoroutineBasics()
    demonstrateGoroutineCharacteristics()
    demonstrateGoroutineLifecycle()
    demonstrateStackManagement()
    demonstrateMemoryModel()
    demonstrateSimplePool()
    demonstrateAdvancedPool()
    demonstratePoolPerformance()
}

2. 安全的Channel模式

点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateSafeChannelPatterns() {
    fmt.Println("\n=== 安全的Channel模式 ===")
    
    // 模式1:超时保护的channel操作
    demonstrateTimeoutProtectedChannels()
    
    // 模式2:带取消的channel操作
    demonstrateCancelableChannels()
    
    // 模式3:安全的生产者-消费者模式
    demonstrateSafeProducerConsumer()
    
    // 模式4:扇入扇出模式
    demonstrateFanInFanOut()
}

func demonstrateTimeoutProtectedChannels() {
    fmt.Println("\n--- 超时保护的Channel操作 ---")
    
    // 发送端超时保护
    ch := make(chan string, 1)
    
    // 安全的发送操作
    safeSend := func(ch chan<- string, msg string, timeout time.Duration) bool {
        timer := time.NewTimer(timeout)
        defer timer.Stop()
        
        select {
        case ch <- msg:
            fmt.Printf("成功发送: %s\n", msg)
            return true
        case <-timer.C:
            fmt.Printf("发送超时: %s\n", msg)
            return false
        }
    }
    
    // 安全的接收操作
    safeReceive := func(ch <-chan string, timeout time.Duration) (string, bool) {
        timer := time.NewTimer(timeout)
        defer timer.Stop()
        
        select {
        case msg := <-ch:
            fmt.Printf("成功接收: %s\n", msg)
            return msg, true
        case <-timer.C:
            fmt.Println("接收超时")
            return "", false
        }
    }
    
    // 测试场景
    go func() {
        time.Sleep(200 * time.Millisecond)
        safeSend(ch, "延迟消息", 100*time.Millisecond) // 这会超时
    }()
    
    safeSend(ch, "即时消息", 500*time.Millisecond)
    safeReceive(ch, 300*time.Millisecond)
    safeReceive(ch, 100*time.Millisecond) // 这会超时
    
    time.Sleep(300 * time.Millisecond)
}

func demonstrateCancelableChannels() {
    fmt.Println("\n--- 带取消的Channel操作 ---")
    
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    data := make(chan int, 5)
    results := make(chan int, 5)
    
    // 可取消的生产者
    go func() {
        defer close(data)
        defer fmt.Println("生产者退出")
        
        for i := 0; i < 10; i++ {
            select {
            case data <- i:
                fmt.Printf("生产: %d\n", i)
                time.Sleep(100 * time.Millisecond)
                
            case <-ctx.Done():
                fmt.Println("生产者被取消")
                return
            }
        }
    }()
    
    // 可取消的消费者
    go func() {
        defer close(results)
        defer fmt.Println("消费者退出")
        
        for {
            select {
            case item, ok := <-data:
                if !ok {
                    fmt.Println("数据通道关闭,消费者正常退出")
                    return
                }
                
                // 处理数据
                processed := item * 2
                
                select {
                case results <- processed:
                    fmt.Printf("处理: %d -> %d\n", item, processed)
                case <-ctx.Done():
                    fmt.Println("消费者被取消")
                    return
                }
                
            case <-ctx.Done():
                fmt.Println("消费者被取消")
                return
            }
        }
    }()
    
    // 收集结果
    go func() {
        defer fmt.Println("结果收集器退出")
        
        for {
            select {
            case result, ok := <-results:
                if !ok {
                    fmt.Println("结果通道关闭,收集器正常退出")
                    return
                }
                fmt.Printf("收集结果: %d\n", result)
                
            case <-ctx.Done():
                fmt.Println("结果收集器被取消")
                return
            }
        }
    }()
    
    // 运行一段时间后取消
    time.Sleep(500 * time.Millisecond)
    fmt.Println("开始取消操作...")
    cancel()
    
    time.Sleep(200 * time.Millisecond)
}

func demonstrateSafeProducerConsumer() {
    fmt.Println("\n--- 安全的生产者-消费者模式 ---")
    
    type SafeQueue struct {
        items   chan interface{}
        ctx     context.Context
        cancel  context.CancelFunc
        wg      sync.WaitGroup
    }
    
    NewSafeQueue := func(bufferSize int) *SafeQueue {
        ctx, cancel := context.WithCancel(context.Background())
        return &SafeQueue{
            items:  make(chan interface{}, bufferSize),
            ctx:    ctx,
            cancel: cancel,
        }
    }
    
    StartProducer := func(sq *SafeQueue, producer func() interface{}) {
        sq.wg.Add(1)
        go func() {
            defer sq.wg.Done()
            defer fmt.Println("生产者goroutine退出")
            
            for {
                select {
                case <-sq.ctx.Done():
                    return
                default:
                    item := producer()
                    if item == nil {
                        return // 生产者完成
                    }
                    
                    select {
                    case sq.items <- item:
                        fmt.Printf("生产项目: %v\n", item)
                    case <-sq.ctx.Done():
                        return
                    }
                }
            }
        }()
    }
    
    StartConsumer := func(sq *SafeQueue, consumer func(interface{})) {
        sq.wg.Add(1)
        go func() {
            defer sq.wg.Done()
            defer fmt.Println("消费者goroutine退出")
            
            for {
                select {
                case item, ok := <-sq.items:
                    if !ok {
                        return // 队列关闭
                    }
                    consumer(item)
                    
                case <-sq.ctx.Done():
                    return
                }
            }
        }()
    }
    
    Close := func(sq *SafeQueue) {
        sq.cancel()
        close(sq.items)
        sq.wg.Wait()
    }
    
    // 使用安全队列
    queue := NewSafeQueue(5)
    
    // 启动生产者
    counter := 0
    StartProducer(queue, func() interface{} {
        if counter >= 5 {
            return nil // 完成生产
        }
        counter++
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("item-%d", counter)
    })
    
    // 启动消费者
    StartConsumer(queue, func(item interface{}) {
        fmt.Printf("消费项目: %v\n", item)
        time.Sleep(150 * time.Millisecond)
    })
    
    // 等待一段时间后关闭
    time.Sleep(1 * time.Second)
    fmt.Println("关闭安全队列...")
    Close(queue)
}

func demonstrateFanInFanOut() {
    fmt.Println("\n--- 扇入扇出模式 ---")
    
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    // 扇出:将单个输入分发到多个worker
    fanOut := func(ctx context.Context, input <-chan int, numWorkers int) []<-chan int {
        outputs := make([]<-chan int, numWorkers)
        
        for i := 0; i < numWorkers; i++ {
            output := make(chan int)
            outputs[i] = output
            
            go func(workerID int, out chan<- int) {
                defer close(out)
                defer fmt.Printf("Worker %d 退出\n", workerID)
                
                for {
                    select {
                    case item, ok := <-input:
                        if !ok {
                            return
                        }
                        
                        // 处理项目
                        processed := item * (workerID + 1)
                        fmt.Printf("Worker %d 处理: %d -> %d\n", workerID, item, processed)
                        
                        select {
                        case out <- processed:
                        case <-ctx.Done():
                            return
                        }
                        
                    case <-ctx.Done():
                        return
                    }
                }
            }(i, output)
        }
        
        return outputs
    }
    
    // 扇入:将多个worker的输出合并到单个通道
    fanIn := func(ctx context.Context, inputs ...<-chan int) <-chan int {
        output := make(chan int)
        var wg sync.WaitGroup
        
        for i, input := range inputs {
            wg.Add(1)
            go func(inputID int, in <-chan int) {
                defer wg.Done()
                defer fmt.Printf("FanIn worker %d 退出\n", inputID)
                
                for {
                    select {
                    case item, ok := <-in:
                        if !ok {
                            return
                        }
                        
                        select {
                        case output <- item:
                            fmt.Printf("FanIn收集: %d (来自worker %d)\n", item, inputID)
                        case <-ctx.Done():
                            return
                        }
                        
                    case <-ctx.Done():
                        return
                    }
                }
            }(i, input)
        }
        
        go func() {
            wg.Wait()
            close(output)
            fmt.Println("FanIn输出通道关闭")
        }()
        
        return output
    }
    
    // 创建输入数据
    input := make(chan int, 5)
    go func() {
        defer close(input)
        for i := 1; i <= 6; i++ {
            select {
            case input <- i:
                fmt.Printf("输入: %d\n", i)
                time.Sleep(200 * time.Millisecond)
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // 扇出到3个worker
    workerOutputs := fanOut(ctx, input, 3)
    
    // 扇入合并结果
    finalOutput := fanIn(ctx, workerOutputs...)
    
    // 收集最终结果
    go func() {
        defer fmt.Println("结果收集完成")
        
        for {
            select {
            case result, ok := <-finalOutput:
                if !ok {
                    return
                }
                fmt.Printf("最终结果: %d\n", result)
                
            case <-ctx.Done():
                fmt.Println("结果收集被取消")
                return
            }
        }
    }()
    
    // 等待context超时
    <-ctx.Done()
    time.Sleep(200 * time.Millisecond) // 等待清理
}

func main() {
    demonstrateCommonLeakScenarios()
    demonstrateLeakDetection()
    demonstrateContextDrivenLifecycle()
    demonstrateSafeChannelPatterns()
}

:::

生产者消费者

点击查看完整代码实现
go
func demonstrateProducerConsumer() {
    fmt.Println("\n=== 生产者-消费者模式 ===")
    
    // 有界缓冲区的实现
    demonstrateBoundedBuffer()
    
    // 多生产者多消费者模式
    demonstrateMultiProducerConsumer()
}

func demonstrateBoundedBuffer() {
    fmt.Println("\n--- 有界缓冲区实现 ---")
    
    type BoundedBuffer struct {
        buffer   []interface{}
        capacity int
        count    int
        in       int  // 生产者索引
        out      int  // 消费者索引
        mu       sync.Mutex
        notFull  *sync.Cond // 缓冲区不满的条件
        notEmpty *sync.Cond // 缓冲区不空的条件
    }
    
    NewBoundedBuffer := func(capacity int) *BoundedBuffer {
        bb := &BoundedBuffer{
            buffer:   make([]interface{}, capacity),
            capacity: capacity,
        }
        bb.notFull = sync.NewCond(&bb.mu)
        bb.notEmpty = sync.NewCond(&bb.mu)
        return bb
    }
    
    // 生产者操作
    Put := func(bb *BoundedBuffer, item interface{}) {
        bb.mu.Lock()
        defer bb.mu.Unlock()
        
        // 等待缓冲区不满
        for bb.count == bb.capacity {
            fmt.Printf("生产者: 缓冲区已满,等待空间\n")
            bb.notFull.Wait()
        }
        
        // 放入数据
        bb.buffer[bb.in] = item
        bb.in = (bb.in + 1) % bb.capacity
        bb.count++
        
        fmt.Printf("生产者: 放入 %v,缓冲区大小: %d/%d\n", 
            item, bb.count, bb.capacity)
        
        // 通知消费者
        bb.notEmpty.Signal()
    }
    
    // 消费者操作
    Get := func(bb *BoundedBuffer) interface{} {
        bb.mu.Lock()
        defer bb.mu.Unlock()
        
        // 等待缓冲区不空
        for bb.count == 0 {
            fmt.Printf("消费者: 缓冲区为空,等待数据\n")
            bb.notEmpty.Wait()
        }
        
        // 取出数据
        item := bb.buffer[bb.out]
        bb.buffer[bb.out] = nil // 清理引用
        bb.out = (bb.out + 1) % bb.capacity
        bb.count--
        
        fmt.Printf("消费者: 取出 %v,缓冲区大小: %d/%d\n", 
            item, bb.count, bb.capacity)
        
        // 通知生产者
        bb.notFull.Signal()
        
        return item
    }
    
    // 演示使用
    buffer := NewBoundedBuffer(3)
    var wg sync.WaitGroup
    
    // 启动生产者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 1; i <= 6; i++ {
            Put(buffer, fmt.Sprintf("item-%d", i))
            time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
        }
    }()
    
    // 启动消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 0; i < 6; i++ {
            item := Get(buffer)
            fmt.Printf("消费者处理: %v\n", item)
            time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
        }
    }()
    
    wg.Wait()
}

func demonstrateMultiProducerConsumer() {
    fmt.Println("\n--- 多生产者多消费者模式 ---")
    
    type WorkQueue struct {
        jobs     []string
        mu       sync.Mutex
        notEmpty *sync.Cond
        closed   bool
    }
    
    NewWorkQueue := func() *WorkQueue {
        wq := &WorkQueue{}
        wq.notEmpty = sync.NewCond(&wq.mu)
        return wq
    }
    
    // 添加工作
    AddJob := func(wq *WorkQueue, job string) bool {
        wq.mu.Lock()
        defer wq.mu.Unlock()
        
        if wq.closed {
            return false
        }
        
        wq.jobs = append(wq.jobs, job)
        fmt.Printf("添加工作: %s,队列长度: %d\n", job, len(wq.jobs))
        
        wq.notEmpty.Signal() // 通知一个等待的worker
        return true
    }
    
    // 获取工作
    GetJob := func(wq *WorkQueue) (string, bool) {
        wq.mu.Lock()
        defer wq.mu.Unlock()
        
        // 等待工作或队列关闭
        for len(wq.jobs) == 0 && !wq.closed {
            wq.notEmpty.Wait()
        }
        
        if len(wq.jobs) == 0 && wq.closed {
            return "", false // 队列已关闭且无工作
        }
        
        job := wq.jobs[0]
        wq.jobs = wq.jobs[1:]
        
        fmt.Printf("获取工作: %s,队列长度: %d\n", job, len(wq.jobs))
        return job, true
    }
    
    // 关闭队列
    Close := func(wq *WorkQueue) {
        wq.mu.Lock()
        defer wq.mu.Unlock()
        
        wq.closed = true
        wq.notEmpty.Broadcast() // 唤醒所有等待的worker
        fmt.Println("工作队列已关闭")
    }
    
    // 演示使用
    queue := NewWorkQueue()
    var wg sync.WaitGroup
    
    // 启动多个生产者
    for i := 1; i <= 2; i++ {
        wg.Add(1)
        go func(producerID int) {
            defer wg.Done()
            
            for j := 1; j <= 3; j++ {
                job := fmt.Sprintf("P%d-Job%d", producerID, j)
                AddJob(queue, job)
                time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
            }
        }(i)
    }
    
    // 启动多个消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for {
                job, ok := GetJob(queue)
                if !ok {
                    fmt.Printf("Worker %d: 队列已关闭,退出\n", workerID)
                    break
                }
                
                fmt.Printf("Worker %d: 处理 %s\n", workerID, job)
                time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
            }
        }(i)
    }
    
    // 等待所有生产者完成
    time.Sleep(2 * time.Second)
    Close(queue)
    
    wg.Wait()
}

事件通知和状态管理

点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateEventNotification() {
    fmt.Println("\n=== 事件通知和状态管理 ===")
    
    // 状态机实现
    demonstrateStateMachine()
    
    // 事件总线实现
    demonstrateEventBus()
}

func demonstrateStateMachine() {
    fmt.Println("\n--- 状态机实现 ---")
    
    type State int
    
    const (
        StateIdle State = iota
        StateRunning
        StatePaused
        StateStopped
    )
    
    func (s State) String() string {
        switch s {
        case StateIdle:
            return "Idle"
        case StateRunning:
            return "Running"
        case StatePaused:
            return "Paused"
        case StateStopped:
            return "Stopped"
        default:
            return "Unknown"
        }
    }
    
    type StateMachine struct {
        currentState State
        mu           sync.Mutex
        stateChanged *sync.Cond
    }
    
    NewStateMachine := func() *StateMachine {
        sm := &StateMachine{
            currentState: StateIdle,
        }
        sm.stateChanged = sync.NewCond(&sm.mu)
        return sm
    }
    
    // 获取当前状态
    GetState := func(sm *StateMachine) State {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        return sm.currentState
    }
    
    // 改变状态
    ChangeState := func(sm *StateMachine, newState State) bool {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        
        // 检查状态转换是否合法
        if !isValidTransition(sm.currentState, newState) {
            return false
        }
        
        oldState := sm.currentState
        sm.currentState = newState
        
        fmt.Printf("状态转换: %s -> %s\n", oldState, newState)
        sm.stateChanged.Broadcast()
        return true
    }
    
    // 等待特定状态
    WaitForState := func(sm *StateMachine, targetState State) {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        
        for sm.currentState != targetState {
            sm.stateChanged.Wait()
        }
    }
    
    // 等待任意状态变化
    WaitForStateChange := func(sm *StateMachine, fromState State) State {
        sm.mu.Lock()
        defer sm.mu.Unlock()
        
        for sm.currentState == fromState {
            sm.stateChanged.Wait()
        }
        
        return sm.currentState
    }
    
    isValidTransition := func(from, to State) bool {
        switch from {
        case StateIdle:
            return to == StateRunning
        case StateRunning:
            return to == StatePaused || to == StateStopped
        case StatePaused:
            return to == StateRunning || to == StateStopped
        case StateStopped:
            return to == StateIdle
        default:
            return false
        }
    }
    
    // 演示使用
    sm := NewStateMachine()
    var wg sync.WaitGroup
    
    // 状态监控器
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        for i := 0; i < 5; i++ {
            currentState := GetState(sm)
            fmt.Printf("监控器: 当前状态 %s\n", currentState)
            
            newState := WaitForStateChange(sm, currentState)
            fmt.Printf("监控器: 状态已变化为 %s\n", newState)
        }
    }()
    
    // 等待特定状态的观察者
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        fmt.Println("观察者: 等待Running状态")
        WaitForState(sm, StateRunning)
        fmt.Println("观察者: 检测到Running状态")
        
        fmt.Println("观察者: 等待Stopped状态")
        WaitForState(sm, StateStopped)
        fmt.Println("观察者: 检测到Stopped状态")
    }()
    
    // 状态控制器
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        transitions := []State{
            StateRunning,
            StatePaused,
            StateRunning,
            StateStopped,
            StateIdle,
        }
        
        for _, newState := range transitions {
            time.Sleep(300 * time.Millisecond)
            
            if ChangeState(sm, newState) {
                fmt.Printf("控制器: 成功转换到 %s\n", newState)
            } else {
                fmt.Printf("控制器: 无法转换到 %s\n", newState)
            }
        }
    }()
    
    wg.Wait()
}

func demonstrateEventBus() {
    fmt.Println("\n--- 事件总线实现 ---")
    
    type Event struct {
        Type string
        Data interface{}
    }
    
    type EventBus struct {
        subscribers map[string][]*Subscriber
        mu          sync.Mutex
        eventCond   *sync.Cond
    }
    
    type Subscriber struct {
        ID      string
        EventCh chan Event
        Active  bool
        cond    *sync.Cond
    }
    
    NewEventBus := func() *EventBus {
        eb := &EventBus{
            subscribers: make(map[string][]*Subscriber),
        }
        eb.eventCond = sync.NewCond(&eb.mu)
        return eb
    }
    
    NewSubscriber := func(id string) *Subscriber {
        s := &Subscriber{
            ID:      id,
            EventCh: make(chan Event, 10),
            Active:  true,
        }
        s.cond = sync.NewCond(&sync.Mutex{})
        return s
    }
    
    // 订阅事件
    Subscribe := func(eb *EventBus, eventType string, subscriber *Subscriber) {
        eb.mu.Lock()
        defer eb.mu.Unlock()
        
        eb.subscribers[eventType] = append(eb.subscribers[eventType], subscriber)
        fmt.Printf("订阅者 %s 订阅了事件类型: %s\n", subscriber.ID, eventType)
    }
    
    // 发布事件
    Publish := func(eb *EventBus, event Event) {
        eb.mu.Lock()
        subscribers := eb.subscribers[event.Type]
        eb.mu.Unlock()
        
        fmt.Printf("发布事件: %s, 数据: %v\n", event.Type, event.Data)
        
        for _, subscriber := range subscribers {
            if subscriber.Active {
                select {
                case subscriber.EventCh <- event:
                    fmt.Printf("事件已发送给订阅者 %s\n", subscriber.ID)
                default:
                    fmt.Printf("订阅者 %s 的通道已满,丢弃事件\n", subscriber.ID)
                }
            }
        }
        
        eb.mu.Lock()
        eb.eventCond.Broadcast()
        eb.mu.Unlock()
    }
    
    // 等待特定事件
    WaitForEvent := func(eb *EventBus, eventType string) {
        eb.mu.Lock()
        defer eb.mu.Unlock()
        
        // 简化的实现:在实际使用中需要更复杂的事件匹配
        eb.eventCond.Wait()
    }
    
    // 演示使用
    eventBus := NewEventBus()
    var wg sync.WaitGroup
    
    // 创建订阅者
    subscriber1 := NewSubscriber("Logger")
    subscriber2 := NewSubscriber("MetricsCollector")
    subscriber3 := NewSubscriber("Alerter")
    
    // 订阅不同事件
    Subscribe(eventBus, "user.login", subscriber1)
    Subscribe(eventBus, "user.login", subscriber2)
    Subscribe(eventBus, "system.error", subscriber1)
    Subscribe(eventBus, "system.error", subscriber3)
    
    // 启动订阅者处理器
    for _, sub := range []*Subscriber{subscriber1, subscriber2, subscriber3} {
        wg.Add(1)
        go func(s *Subscriber) {
            defer wg.Done()
            defer close(s.EventCh)
            
            fmt.Printf("订阅者 %s 开始处理事件\n", s.ID)
            
            eventCount := 0
            for event := range s.EventCh {
                eventCount++
                fmt.Printf("订阅者 %s 处理事件: %s, 数据: %v\n", 
                    s.ID, event.Type, event.Data)
                
                // 模拟处理时间
                time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
                
                if eventCount >= 3 { // 处理3个事件后退出
                    s.Active = false
                    break
                }
            }
            
            fmt.Printf("订阅者 %s 停止处理事件\n", s.ID)
        }(sub)
    }
    
    // 发布事件
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        events := []Event{
            {Type: "user.login", Data: map[string]string{"user": "alice", "ip": "192.168.1.1"}},
            {Type: "system.error", Data: map[string]string{"error": "database connection failed"}},
            {Type: "user.login", Data: map[string]string{"user": "bob", "ip": "192.168.1.2"}},
            {Type: "system.error", Data: map[string]string{"error": "memory usage high"}},
            {Type: "user.login", Data: map[string]string{"user": "charlie", "ip": "192.168.1.3"}},
        }
        
        for _, event := range events {
            time.Sleep(400 * time.Millisecond)
            Publish(eventBus, event)
        }
    }()
    
    wg.Wait()
}

func main() {
    rand.Seed(time.Now().UnixNano())
    
    demonstrateCondBasics()
    demonstrateCondInternals()
    demonstrateProducerConsumer()
    demonstrateBarrierPattern()
    demonstrateEventNotification()
}

:::

1. Worker Pool模式

点击查看完整代码实现
go
import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 工作任务定义
type Task struct {
    ID   int
    Data string
}

// Worker Pool使用WaitGroup
func workerPoolWithWaitGroup() {
    const numWorkers = 3
    const numTasks = 10
    
    tasks := make(chan Task, numTasks)
    var wg sync.WaitGroup
    
    // 启动worker goroutine
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            worker(workerID, tasks)
        }(i)
    }
    
    // 发送任务
    for i := 0; i < numTasks; i++ {
        tasks <- Task{ID: i, Data: fmt.Sprintf("task-%d", i)}
    }
    close(tasks) // 关闭任务通道,worker会自动退出
    
    wg.Wait() // 等待所有worker完成
    fmt.Println("所有任务处理完成")
}

func worker(id int, tasks <-chan Task) {
    for task := range tasks {
        fmt.Printf("Worker %d 处理任务 %d: %s\n", id, task.ID, task.Data)
        time.Sleep(200 * time.Millisecond) // 模拟处理时间
        fmt.Printf("Worker %d 完成任务 %d\n", id, task.ID)
    }
    fmt.Printf("Worker %d 退出\n", id)
}

错误

点击查看完整代码实现
go
package main

import (
    "context"
    "fmt"
    "log"
    "runtime"
    "strconv"
    "strings"
    "sync"
    "time"
)

func demonstrateErrorHandlingStrategy() {
    fmt.Println("=== Go错误处理策略 ===")
    
    /*
    错误处理策略核心要素:
    
    1. 错误分类体系:
       - 业务错误:用户输入错误、权限错误等
       - 系统错误:网络错误、数据库错误等
       - 致命错误:内存不足、配置错误等
       - 临时错误:超时、限流等可重试错误
    
    2. 错误信息设计:
       - 错误码:唯一标识错误类型
       - 错误消息:用户友好的描述
       - 技术细节:开发者需要的详细信息
       - 上下文信息:请求ID、用户ID等
    
    3. 错误处理策略:
       - 快速失败:立即返回错误
       - 重试机制:自动重试临时错误
       - 降级处理:提供替代方案
       - 熔断保护:防止错误传播
    
    4. 错误监控告警:
       - 错误统计:错误率、错误分布
       - 异常检测:异常模式识别
       - 告警机制:及时通知相关人员
       - 错误追踪:完整的错误链路
    */
    
    demonstrateErrorClassification()
    demonstrateCustomErrors()
    demonstrateErrorPropagation()
    demonstrateErrorRecovery()
}

func demonstrateErrorClassification() {
    fmt.Println("\n--- 错误分类体系 ---")
    
    /*
    错误分类要点:
    
    1. 按错误性质分类:业务、系统、配置
    2. 按严重程度分类:致命、错误、警告
    3. 按处理方式分类:可重试、不可重试
    4. 按影响范围分类:用户级、服务级、系统级
    */
    
    // 错误类型定义
    type ErrorType int
    
    const (
        ErrorTypeBusiness ErrorType = iota // 业务错误
        ErrorTypeSystem                    // 系统错误
        ErrorTypeNetwork                   // 网络错误
        ErrorTypeValidation                // 验证错误
        ErrorTypePermission                // 权限错误
        ErrorTypeConfiguration             // 配置错误
        ErrorTypeTimeout                   // 超时错误
        ErrorTypeResource                  // 资源错误
    )
    
    func (et ErrorType) String() string {
        switch et {
        case ErrorTypeBusiness:
            return "BUSINESS"
        case ErrorTypeSystem:
            return "SYSTEM"
        case ErrorTypeNetwork:
            return "NETWORK"
        case ErrorTypeValidation:
            return "VALIDATION"
        case ErrorTypePermission:
            return "PERMISSION"
        case ErrorTypeConfiguration:
            return "CONFIGURATION"
        case ErrorTypeTimeout:
            return "TIMEOUT"
        case ErrorTypeResource:
            return "RESOURCE"
        default:
            return "UNKNOWN"
        }
    }
    
    // 错误严重级别
    type ErrorSeverity int
    
    const (
        SeverityLow ErrorSeverity = iota
        SeverityMedium
        SeverityHigh
        SeverityCritical
    )
    
    func (es ErrorSeverity) String() string {
        switch es {
        case SeverityLow:
            return "LOW"
        case SeverityMedium:
            return "MEDIUM"
        case SeverityHigh:
            return "HIGH"
        case SeverityCritical:
            return "CRITICAL"
        default:
            return "UNKNOWN"
        }
    }
    
    // 分类错误接口
    type ClassifiedError interface {
        error
        Code() string
        Type() ErrorType
        Severity() ErrorSeverity
        IsRetryable() bool
        IsTemporary() bool
        Details() map[string]interface{}
        StackTrace() string
    }
    
    // 基础分类错误实现
    type BaseClassifiedError struct {
        code       string
        message    string
        errorType  ErrorType
        severity   ErrorSeverity
        retryable  bool
        temporary  bool
        details    map[string]interface{}
        stackTrace string
        cause      error
    }
    
    func NewClassifiedError(code, message string, errorType ErrorType, severity ErrorSeverity) *BaseClassifiedError {
        return &BaseClassifiedError{
            code:       code,
            message:    message,
            errorType:  errorType,
            severity:   severity,
            retryable:  false,
            temporary:  false,
            details:    make(map[string]interface{}),
            stackTrace: captureStackTrace(),
        }
    }
    
    func (bce *BaseClassifiedError) Error() string {
        return fmt.Sprintf("[%s] %s: %s", bce.code, bce.errorType.String(), bce.message)
    }
    
    func (bce *BaseClassifiedError) Code() string {
        return bce.code
    }
    
    func (bce *BaseClassifiedError) Type() ErrorType {
        return bce.errorType
    }
    
    func (bce *BaseClassifiedError) Severity() ErrorSeverity {
        return bce.severity
    }
    
    func (bce *BaseClassifiedError) IsRetryable() bool {
        return bce.retryable
    }
    
    func (bce *BaseClassifiedError) IsTemporary() bool {
        return bce.temporary
    }
    
    func (bce *BaseClassifiedError) Details() map[string]interface{} {
        return bce.details
    }
    
    func (bce *BaseClassifiedError) StackTrace() string {
        return bce.stackTrace
    }
    
    func (bce *BaseClassifiedError) WithCause(cause error) *BaseClassifiedError {
        bce.cause = cause
        return bce
    }
    
    func (bce *BaseClassifiedError) WithDetail(key string, value interface{}) *BaseClassifiedError {
        bce.details[key] = value
        return bce
    }
    
    func (bce *BaseClassifiedError) SetRetryable(retryable bool) *BaseClassifiedError {
        bce.retryable = retryable
        return bce
    }
    
    func (bce *BaseClassifiedError) SetTemporary(temporary bool) *BaseClassifiedError {
        bce.temporary = temporary
        return bce
    }
    
    func captureStackTrace() string {
        var buf strings.Builder
        for i := 2; i < 10; i++ { // 跳过当前函数和调用者
            pc, file, line, ok := runtime.Caller(i)
            if !ok {
                break
            }
            
            fn := runtime.FuncForPC(pc)
            if fn == nil {
                continue
            }
            
            funcName := fn.Name()
            if idx := strings.LastIndex(funcName, "/"); idx != -1 {
                funcName = funcName[idx+1:]
            }
            
            if idx := strings.LastIndex(file, "/"); idx != -1 {
                file = file[idx+1:]
            }
            
            buf.WriteString(fmt.Sprintf("  %s:%d %s\n", file, line, funcName))
        }
        return buf.String()
    }
    
    // 具体错误类型工厂
    type ErrorFactory struct{}
    
    func (ef *ErrorFactory) NewValidationError(message string) *BaseClassifiedError {
        return NewClassifiedError("VALIDATION_001", message, ErrorTypeValidation, SeverityMedium)
    }
    
    func (ef *ErrorFactory) NewPermissionError(message string) *BaseClassifiedError {
        return NewClassifiedError("PERMISSION_001", message, ErrorTypePermission, SeverityHigh)
    }
    
    func (ef *ErrorFactory) NewNetworkError(message string) *BaseClassifiedError {
        return NewClassifiedError("NETWORK_001", message, ErrorTypeNetwork, SeverityMedium).
            SetRetryable(true).
            SetTemporary(true)
    }
    
    func (ef *ErrorFactory) NewDatabaseError(message string) *BaseClassifiedError {
        return NewClassifiedError("DATABASE_001", message, ErrorTypeSystem, SeverityHigh).
            SetRetryable(true)
    }
    
    func (ef *ErrorFactory) NewTimeoutError(message string) *BaseClassifiedError {
        return NewClassifiedError("TIMEOUT_001", message, ErrorTypeTimeout, SeverityMedium).
            SetRetryable(true).
            SetTemporary(true)
    }
    
    func (ef *ErrorFactory) NewBusinessError(code, message string) *BaseClassifiedError {
        return NewClassifiedError(code, message, ErrorTypeBusiness, SeverityLow)
    }
    
    // 演示错误分类
    fmt.Printf("错误分类体系演示:\n")
    
    factory := &ErrorFactory{}
    
    // 创建不同类型的错误
    errors := []ClassifiedError{
        factory.NewValidationError("用户名不能为空"),
        factory.NewPermissionError("权限不足,无法访问该资源"),
        factory.NewNetworkError("网络连接超时").WithDetail("timeout", "5s"),
        factory.NewDatabaseError("数据库连接失败").WithDetail("host", "localhost:5432"),
        factory.NewTimeoutError("请求处理超时").WithDetail("duration", "30s"),
        factory.NewBusinessError("ORDER_001", "订单状态不允许取消"),
    }
    
    fmt.Printf("  📊 错误分类统计:\n")
    
    typeCount := make(map[ErrorType]int)
    severityCount := make(map[ErrorSeverity]int)
    retryableCount := 0
    temporaryCount := 0
    
    for i, err := range errors {
        fmt.Printf("    %d. %s\n", i+1, err.Error())
        fmt.Printf("       类型: %s, 严重程度: %s\n", err.Type().String(), err.Severity().String())
        fmt.Printf("       可重试: %t, 临时性: %t\n", err.IsRetryable(), err.IsTemporary())
        
        if len(err.Details()) > 0 {
            fmt.Printf("       详细信息: %v\n", err.Details())
        }
        
        // 统计
        typeCount[err.Type()]++
        severityCount[err.Severity()]++
        if err.IsRetryable() {
            retryableCount++
        }
        if err.IsTemporary() {
            temporaryCount++
        }
        
        fmt.Println()
    }
    
    fmt.Printf("  📈 统计摘要:\n")
    fmt.Printf("    按类型分布:\n")
    for errorType, count := range typeCount {
        fmt.Printf("      %s: %d\n", errorType.String(), count)
    }
    
    fmt.Printf("    按严重程度分布:\n")
    for severity, count := range severityCount {
        fmt.Printf("      %s: %d\n", severity.String(), count)
    }
    
    fmt.Printf("    可重试错误: %d/%d\n", retryableCount, len(errors))
    fmt.Printf("    临时性错误: %d/%d\n", temporaryCount, len(errors))
}

func demonstrateCustomErrors() {
    fmt.Println("\n--- 自定义错误设计 ---")
    
    /*
    自定义错误要点:
    
    1. 错误包装:保留原始错误信息
    2. 上下文传递:携带请求上下文
    3. 错误链:构建错误调用链
    4. 元数据:附加诊断信息
    */
    
    // 上下文错误
    type ContextualError struct {
        BaseError     error
        Context       context.Context
        RequestID     string
        UserID        string
        Operation     string
        Timestamp     time.Time
        AdditionalInfo map[string]interface{}
    }
    
    func NewContextualError(ctx context.Context, operation string, baseError error) *ContextualError {
        ce := &ContextualError{
            BaseError:      baseError,
            Context:        ctx,
            Operation:      operation,
            Timestamp:      time.Now(),
            AdditionalInfo: make(map[string]interface{}),
        }
        
        // 从上下文中提取信息
        if requestID := ctx.Value("request_id"); requestID != nil {
            if id, ok := requestID.(string); ok {
                ce.RequestID = id
            }
        }
        
        if userID := ctx.Value("user_id"); userID != nil {
            if id, ok := userID.(string); ok {
                ce.UserID = id
            }
        }
        
        return ce
    }
    
    func (ce *ContextualError) Error() string {
        return fmt.Sprintf("[%s] %s: %v", ce.Operation, ce.RequestID, ce.BaseError)
    }
    
    func (ce *ContextualError) Unwrap() error {
        return ce.BaseError
    }
    
    func (ce *ContextualError) WithInfo(key string, value interface{}) *ContextualError {
        ce.AdditionalInfo[key] = value
        return ce
    }
    
    // 错误链构建器
    type ErrorChain struct {
        errors []error
        mutex  sync.RWMutex
    }
    
    func NewErrorChain() *ErrorChain {
        return &ErrorChain{
            errors: make([]error, 0),
        }
    }
    
    func (ec *ErrorChain) Add(err error) *ErrorChain {
        if err == nil {
            return ec
        }
        
        ec.mutex.Lock()
        defer ec.mutex.Unlock()
        ec.errors = append(ec.errors, err)
        return ec
    }
    
    func (ec *ErrorChain) HasErrors() bool {
        ec.mutex.RLock()
        defer ec.mutex.RUnlock()
        return len(ec.errors) > 0
    }
    
    func (ec *ErrorChain) Error() string {
        ec.mutex.RLock()
        defer ec.mutex.RUnlock()
        
        if len(ec.errors) == 0 {
            return "no errors"
        }
        
        if len(ec.errors) == 1 {
            return ec.errors[0].Error()
        }
        
        var builder strings.Builder
        builder.WriteString("multiple errors occurred:\n")
        for i, err := range ec.errors {
            builder.WriteString(fmt.Sprintf("  %d. %s\n", i+1, err.Error()))
        }
        
        return builder.String()
    }
    
    func (ec *ErrorChain) Errors() []error {
        ec.mutex.RLock()
        defer ec.mutex.RUnlock()
        
        result := make([]error, len(ec.errors))
        copy(result, ec.errors)
        return result
    }
    
    func (ec *ErrorChain) First() error {
        ec.mutex.RLock()
        defer ec.mutex.RUnlock()
        
        if len(ec.errors) > 0 {
            return ec.errors[0]
        }
        return nil
    }
    
    func (ec *ErrorChain) Last() error {
        ec.mutex.RLock()
        defer ec.mutex.RUnlock()
        
        if len(ec.errors) > 0 {
            return ec.errors[len(ec.errors)-1]
        }
        return nil
    }
    
    // 业务错误包装器
    type BusinessErrorWrapper struct {
        UserMessage    string // 用户友好的消息
        TechnicalError error  // 技术错误详情
        ErrorCode      string // 业务错误码
        Suggestions    []string // 解决建议
    }
    
    func NewBusinessErrorWrapper(code, userMessage string, technicalError error) *BusinessErrorWrapper {
        return &BusinessErrorWrapper{
            UserMessage:    userMessage,
            TechnicalError: technicalError,
            ErrorCode:      code,
            Suggestions:    make([]string, 0),
        }
    }
    
    func (bew *BusinessErrorWrapper) Error() string {
        return fmt.Sprintf("[%s] %s", bew.ErrorCode, bew.UserMessage)
    }
    
    func (bew *BusinessErrorWrapper) Unwrap() error {
        return bew.TechnicalError
    }
    
    func (bew *BusinessErrorWrapper) AddSuggestion(suggestion string) *BusinessErrorWrapper {
        bew.Suggestions = append(bew.Suggestions, suggestion)
        return bew
    }
    
    func (bew *BusinessErrorWrapper) GetUserMessage() string {
        return bew.UserMessage
    }
    
    func (bew *BusinessErrorWrapper) GetTechnicalDetails() string {
        if bew.TechnicalError != nil {
            return bew.TechnicalError.Error()
        }
        return ""
    }
    
    func (bew *BusinessErrorWrapper) GetSuggestions() []string {
        return bew.Suggestions
    }
    
    // 演示自定义错误
    fmt.Printf("自定义错误设计演示:\n")
    
    // 模拟业务操作
    simulateBusinessOperation := func() error {
        // 创建带上下文的请求
        ctx := context.Background()
        ctx = context.WithValue(ctx, "request_id", "req-12345")
        ctx = context.WithValue(ctx, "user_id", "user-67890")
        
        // 模拟数据库错误
        dbError := fmt.Errorf("connection timeout after 5s")
        
        // 包装为上下文错误
        contextError := NewContextualError(ctx, "user_lookup", dbError).
            WithInfo("table", "users").
            WithInfo("query_duration", "5.2s")
        
        // 包装为业务错误
        businessError := NewBusinessErrorWrapper(
            "USER_LOOKUP_FAILED",
            "用户信息获取失败,请稍后重试",
            contextError,
        ).AddSuggestion("检查网络连接").
            AddSuggestion("联系技术支持")
        
        return businessError
    }
    
    // 模拟多个错误的聚合
    simulateMultipleErrors := func() error {
        chain := NewErrorChain()
        
        // 模拟验证错误
        chain.Add(fmt.Errorf("用户名不能为空"))
        chain.Add(fmt.Errorf("密码长度不足8位"))
        chain.Add(fmt.Errorf("邮箱格式不正确"))
        
        if chain.HasErrors() {
            return chain
        }
        return nil
    }
    
    fmt.Printf("  🔍 上下文错误示例:\n")
    if err := simulateBusinessOperation(); err != nil {
        fmt.Printf("    错误: %s\n", err.Error())
        
        // 类型断言获取详细信息
        if businessErr, ok := err.(*BusinessErrorWrapper); ok {
            fmt.Printf("    用户消息: %s\n", businessErr.GetUserMessage())
            fmt.Printf("    技术详情: %s\n", businessErr.GetTechnicalDetails())
            fmt.Printf("    解决建议: %v\n", businessErr.GetSuggestions())
            
            // 解包获取原始错误
            if contextErr, ok := businessErr.Unwrap().(*ContextualError); ok {
                fmt.Printf("    请求ID: %s\n", contextErr.RequestID)
                fmt.Printf("    用户ID: %s\n", contextErr.UserID)
                fmt.Printf("    操作: %s\n", contextErr.Operation)
                fmt.Printf("    时间戳: %s\n", contextErr.Timestamp.Format(time.RFC3339))
                fmt.Printf("    附加信息: %v\n", contextErr.AdditionalInfo)
            }
        }
    }
    
    fmt.Printf("\n  📝 错误链示例:\n")
    if err := simulateMultipleErrors(); err != nil {
        fmt.Printf("    %s", err.Error())
        
        if errorChain, ok := err.(*ErrorChain); ok {
            fmt.Printf("    错误总数: %d\n", len(errorChain.Errors()))
            fmt.Printf("    首个错误: %s\n", errorChain.First().Error())
            fmt.Printf("    最后错误: %s\n", errorChain.Last().Error())
        }
    }
}

func demonstrateErrorPropagation() {
    fmt.Println("\n--- 错误传播机制 ---")
    
    /*
    错误传播要点:
    
    1. 错误包装:保留调用栈信息
    2. 错误转换:适配不同层级的错误
    3. 错误过滤:决定哪些错误需要传播
    4. 错误增强:添加上下文信息
    */
    
    // 错误传播器
    type ErrorPropagator struct {
        filters []ErrorFilter
        transformers []ErrorTransformer
        enhancers []ErrorEnhancer
    }
    
    type ErrorFilter interface {
        ShouldPropagate(err error) bool
    }
    
    type ErrorTransformer interface {
        Transform(err error) error
    }
    
    type ErrorEnhancer interface {
        Enhance(err error, context map[string]interface{}) error
    }
    
    // 严重性过滤器
    type SeverityFilter struct {
        MinSeverity ErrorSeverity
    }
    
    func (sf *SeverityFilter) ShouldPropagate(err error) bool {
        if classifiedErr, ok := err.(ClassifiedError); ok {
            return classifiedErr.Severity() >= sf.MinSeverity
        }
        return true // 未分类错误默认传播
    }
    
    // 错误类型转换器
    type ErrorTypeTransformer struct {
        FromType ErrorType
        ToType   ErrorType
    }
    
    func (ett *ErrorTypeTransformer) Transform(err error) error {
        if classifiedErr, ok := err.(*BaseClassifiedError); ok {
            if classifiedErr.Type() == ett.FromType {
                // 创建新的错误类型
                newErr := NewClassifiedError(
                    classifiedErr.Code(),
                    classifiedErr.message,
                    ett.ToType,
                    classifiedErr.Severity(),
                )
                newErr.details = classifiedErr.details
                return newErr
            }
        }
        return err
    }
    
    // 上下文增强器
    type ContextEnhancer struct {
        Service string
        Version string
    }
    
    func (ce *ContextEnhancer) Enhance(err error, context map[string]interface{}) error {
        // 创建增强的错误包装
        enhanced := &EnhancedError{
            OriginalError: err,
            Service:       ce.Service,
            Version:       ce.Version,
            Context:       context,
            Timestamp:     time.Now(),
        }
        return enhanced
    }
    
    type EnhancedError struct {
        OriginalError error
        Service       string
        Version       string
        Context       map[string]interface{}
        Timestamp     time.Time
    }
    
    func (ee *EnhancedError) Error() string {
        return fmt.Sprintf("[%s@%s] %s", ee.Service, ee.Version, ee.OriginalError.Error())
    }
    
    func (ee *EnhancedError) Unwrap() error {
        return ee.OriginalError
    }
    
    func NewErrorPropagator() *ErrorPropagator {
        return &ErrorPropagator{
            filters:      make([]ErrorFilter, 0),
            transformers: make([]ErrorTransformer, 0),
            enhancers:    make([]ErrorEnhancer, 0),
        }
    }
    
    func (ep *ErrorPropagator) AddFilter(filter ErrorFilter) {
        ep.filters = append(ep.filters, filter)
    }
    
    func (ep *ErrorPropagator) AddTransformer(transformer ErrorTransformer) {
        ep.transformers = append(ep.transformers, transformer)
    }
    
    func (ep *ErrorPropagator) AddEnhancer(enhancer ErrorEnhancer) {
        ep.enhancers = append(ep.enhancers, enhancer)
    }
    
    func (ep *ErrorPropagator) Propagate(err error, context map[string]interface{}) error {
        if err == nil {
            return nil
        }
        
        // 应用过滤器
        for _, filter := range ep.filters {
            if !filter.ShouldPropagate(err) {
                return nil // 不传播此错误
            }
        }
        
        // 应用转换器
        for _, transformer := range ep.transformers {
            err = transformer.Transform(err)
        }
        
        // 应用增强器
        for _, enhancer := range ep.enhancers {
            err = enhancer.Enhance(err, context)
        }
        
        return err
    }
    
    // 演示错误传播
    fmt.Printf("错误传播机制演示:\n")
    
    // 创建错误传播器
    propagator := NewErrorPropagator()
    
    // 添加严重性过滤器(只传播中等以上严重度的错误)
    propagator.AddFilter(&SeverityFilter{MinSeverity: SeverityMedium})
    
    // 添加错误类型转换器(将网络错误转换为系统错误)
    propagator.AddTransformer(&ErrorTypeTransformer{
        FromType: ErrorTypeNetwork,
        ToType:   ErrorTypeSystem,
    })
    
    // 添加上下文增强器
    propagator.AddEnhancer(&ContextEnhancer{
        Service: "user-service",
        Version: "v1.2.3",
    })
    
    // 测试不同类型的错误传播
    factory := &ErrorFactory{}
    testErrors := []error{
        factory.NewValidationError("输入验证失败"),                    // 中等严重度,应该传播
        NewClassifiedError("TRACE_001", "调试信息", ErrorTypeBusiness, SeverityLow), // 低严重度,应该被过滤
        factory.NewNetworkError("网络连接失败"),                      // 网络错误,应该被转换为系统错误
        factory.NewDatabaseError("数据库查询失败"),                    // 高严重度,应该传播
    }
    
    context := map[string]interface{}{
        "request_id": "req-98765",
        "user_id":    "user-12345",
        "operation":  "get_user_profile",
    }
    
    fmt.Printf("  🚀 错误传播测试:\n")
    for i, originalErr := range testErrors {
        fmt.Printf("    测试 %d - 原始错误: %s\n", i+1, originalErr.Error())
        
        propagatedErr := propagator.Propagate(originalErr, context)
        
        if propagatedErr == nil {
            fmt.Printf("      结果: 错误被过滤,未传播\n")
        } else {
            fmt.Printf("      结果: %s\n", propagatedErr.Error())
            
            // 检查是否被增强
            if enhanced, ok := propagatedErr.(*EnhancedError); ok {
                fmt.Printf("      服务: %s\n", enhanced.Service)
                fmt.Printf("      版本: %s\n", enhanced.Version)
                fmt.Printf("      时间戳: %s\n", enhanced.Timestamp.Format("15:04:05"))
                fmt.Printf("      上下文: %v\n", enhanced.Context)
            }
        }
        fmt.Println()
    }
}

func demonstrateErrorRecovery() {
    fmt.Println("\n--- 错误恢复策略 ---")
    
    /*
    错误恢复要点:
    
    1. 重试机制:指数退避、最大重试次数
    2. 熔断保护:防止级联失败
    3. 降级处理:提供备用方案
    4. 超时控制:避免无限等待
    */
    
    // 重试策略
    type RetryStrategy interface {
        ShouldRetry(attempt int, err error) bool
        NextDelay(attempt int) time.Duration
        MaxAttempts() int
    }
    
    // 指数退避重试策略
    type ExponentialBackoffStrategy struct {
        InitialDelay time.Duration
        MaxDelay     time.Duration
        Multiplier   float64
        MaxRetries   int
    }
    
    func (ebs *ExponentialBackoffStrategy) ShouldRetry(attempt int, err error) bool {
        if attempt >= ebs.MaxRetries {
            return false
        }
        
        // 检查错误是否可重试
        if classifiedErr, ok := err.(ClassifiedError); ok {
            return classifiedErr.IsRetryable()
        }
        
        return false
    }
    
    func (ebs *ExponentialBackoffStrategy) NextDelay(attempt int) time.Duration {
        delay := time.Duration(float64(ebs.InitialDelay) * 
            pow(ebs.Multiplier, float64(attempt)))
        
        if delay > ebs.MaxDelay {
            delay = ebs.MaxDelay
        }
        
        return delay
    }
    
    func (ebs *ExponentialBackoffStrategy) MaxAttempts() int {
        return ebs.MaxRetries
    }
    
    // 简单的幂运算实现
    func pow(base, exp float64) float64 {
        if exp == 0 {
            return 1
        }
        result := base
        for i := 1; i < int(exp); i++ {
            result *= base
        }
        return result
    }
    
    // 重试执行器
    type RetryExecutor struct {
        strategy RetryStrategy
        logger   *log.Logger
    }
    
    func NewRetryExecutor(strategy RetryStrategy) *RetryExecutor {
        return &RetryExecutor{
            strategy: strategy,
            logger:   log.New(log.Writer(), "[RETRY] ", log.LstdFlags),
        }
    }
    
    func (re *RetryExecutor) Execute(ctx context.Context, operation func() error) error {
        var lastErr error
        
        for attempt := 0; attempt < re.strategy.MaxAttempts(); attempt++ {
            // 检查上下文是否已取消
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }
            
            // 执行操作
            err := operation()
            if err == nil {
                if attempt > 0 {
                    re.logger.Printf("操作在第 %d 次重试后成功", attempt+1)
                }
                return nil
            }
            
            lastErr = err
            
            // 检查是否应该重试
            if !re.strategy.ShouldRetry(attempt, err) {
                re.logger.Printf("错误不可重试或达到最大重试次数: %v", err)
                break
            }
            
            // 计算延迟时间
            delay := re.strategy.NextDelay(attempt)
            re.logger.Printf("第 %d 次重试失败: %v, %v 后重试", attempt+1, err, delay)
            
            // 等待重试
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(delay):
                // 继续重试
            }
        }
        
        return fmt.Errorf("重试 %d 次后仍然失败: %v", re.strategy.MaxAttempts(), lastErr)
    }
    
    // 熔断器(简化版)
    type CircuitBreaker struct {
        name          string
        maxFailures   int
        resetTimeout  time.Duration
        failures      int
        lastFailTime  time.Time
        state         string
        mutex         sync.RWMutex
    }
    
    func NewCircuitBreaker(name string, maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
        return &CircuitBreaker{
            name:         name,
            maxFailures:  maxFailures,
            resetTimeout: resetTimeout,
            state:        "CLOSED",
        }
    }
    
    func (cb *CircuitBreaker) Execute(operation func() error) error {
        cb.mutex.RLock()
        state := cb.state
        failures := cb.failures
        lastFailTime := cb.lastFailTime
        cb.mutex.RUnlock()
        
        // 检查熔断器状态
        if state == "OPEN" {
            if time.Since(lastFailTime) > cb.resetTimeout {
                cb.mutex.Lock()
                cb.state = "HALF_OPEN"
                cb.mutex.Unlock()
            } else {
                return fmt.Errorf("熔断器 %s 已开启", cb.name)
            }
        }
        
        // 执行操作
        err := operation()
        
        cb.mutex.Lock()
        defer cb.mutex.Unlock()
        
        if err != nil {
            cb.failures++
            cb.lastFailTime = time.Now()
            
            if cb.failures >= cb.maxFailures {
                cb.state = "OPEN"
                log.Printf("熔断器 %s 开启,失败次数: %d", cb.name, cb.failures)
            }
            
            return err
        }
        
        // 成功时重置
        if cb.state == "HALF_OPEN" {
            cb.state = "CLOSED"
            log.Printf("熔断器 %s 恢复正常", cb.name)
        }
        cb.failures = 0
        
        return nil
    }
    
    // 降级处理器
    type FallbackHandler struct {
        primary   func() (interface{}, error)
        fallback  func() (interface{}, error)
        threshold time.Duration
    }
    
    func NewFallbackHandler(primary, fallback func() (interface{}, error), threshold time.Duration) *FallbackHandler {
        return &FallbackHandler{
            primary:   primary,
            fallback:  fallback,
            threshold: threshold,
        }
    }
    
    func (fh *FallbackHandler) Execute() (interface{}, error) {
        // 设置超时
        ctx, cancel := context.WithTimeout(context.Background(), fh.threshold)
        defer cancel()
        
        resultCh := make(chan interface{}, 1)
        errorCh := make(chan error, 1)
        
        // 执行主要操作
        go func() {
            result, err := fh.primary()
            if err != nil {
                errorCh <- err
            } else {
                resultCh <- result
            }
        }()
        
        select {
        case result := <-resultCh:
            return result, nil
        case err := <-errorCh:
            log.Printf("主要操作失败,执行降级: %v", err)
            return fh.fallback()
        case <-ctx.Done():
            log.Printf("主要操作超时,执行降级")
            return fh.fallback()
        }
    }
    
    // 演示错误恢复
    fmt.Printf("错误恢复策略演示:\n")
    
    // 1. 重试机制演示
    fmt.Printf("  🔄 重试机制测试:\n")
    
    retryStrategy := &ExponentialBackoffStrategy{
        InitialDelay: 100 * time.Millisecond,
        MaxDelay:     1 * time.Second,
        Multiplier:   2.0,
        MaxRetries:   3,
    }
    
    executor := NewRetryExecutor(retryStrategy)
    
    // 模拟不稳定的操作
    callCount := 0
    unstableOperation := func() error {
        callCount++
        if callCount < 3 { // 前两次失败
            factory := &ErrorFactory{}
            return factory.NewNetworkError(fmt.Sprintf("网络错误 #%d", callCount))
        }
        return nil // 第三次成功
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    if err := executor.Execute(ctx, unstableOperation); err != nil {
        fmt.Printf("    ❌ 重试失败: %v\n", err)
    } else {
        fmt.Printf("    ✅ 重试成功,总调用次数: %d\n", callCount)
    }
    
    // 2. 熔断器演示
    fmt.Printf("\n  ⚡ 熔断器测试:\n")
    
    breaker := NewCircuitBreaker("test-service", 2, 2*time.Second)
    
    // 模拟失败操作
    failingOperation := func() error {
        return fmt.Errorf("服务不可用")
    }
    
    // 连续失败触发熔断
    for i := 1; i <= 5; i++ {
        err := breaker.Execute(failingOperation)
        if err != nil {
            fmt.Printf("    调用 %d: ❌ %v\n", i, err)
        }
    }
    
    // 3. 降级处理演示
    fmt.Printf("\n  📉 降级处理测试:\n")
    
    // 主要服务(模拟慢响应)
    primaryService := func() (interface{}, error) {
        time.Sleep(200 * time.Millisecond) // 模拟慢响应
        return "来自主要服务的数据", nil
    }
    
    // 降级服务(快速响应)
    fallbackService := func() (interface{}, error) {
        return "来自缓存的数据", nil
    }
    
    handler := NewFallbackHandler(primaryService, fallbackService, 100*time.Millisecond)
    
    result, err := handler.Execute()
    if err != nil {
        fmt.Printf("    ❌ 降级处理失败: %v\n", err)
    } else {
        fmt.Printf("    ✅ 降级处理成功: %v\n", result)
    }
    
    fmt.Printf("\n  📋 错误恢复最佳实践:\n")
    fmt.Printf("    1. 识别可重试的错误类型\n")
    fmt.Printf("    2. 实现指数退避重试策略\n")
    fmt.Printf("    3. 使用熔断器防止级联失败\n")
    fmt.Printf("    4. 提供降级和备用方案\n")
    fmt.Printf("    5. 设置合理的超时时间\n")
    fmt.Printf("    6. 监控错误率和恢复情况\n")
}

func main() {
    demonstrateErrorHandlingStrategy()
}

日志

点击查看完整代码实现
go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "os"
    "runtime"
    "strings"
    "sync"
    "time"
)

func demonstrateLoggingManagement() {
    fmt.Println("=== Go日志管理策略 ===")
    
    /*
    日志管理核心要素:
    
    1. 日志级别管理:
       - DEBUG: 详细的调试信息
       - INFO: 一般信息记录
       - WARN: 警告信息
       - ERROR: 错误信息
       - FATAL: 致命错误
    
    2. 结构化日志:
       - JSON格式输出
       - 字段标准化
       - 上下文信息
       - 链路追踪ID
    
    3. 日志输出管理:
       - 多目标输出
       - 异步写入
       - 缓冲机制
       - 轮转策略
    
    4. 性能优化:
       - 惰性求值
       - 批量写入
       - 压缩存储
       - 采样机制
    */
    
    demonstrateStructuredLogging()
    demonstrateLogLevels()
    demonstrateAsyncLogging()
    demonstrateLogAggregation()
}

func demonstrateStructuredLogging() {
    fmt.Println("\n--- 结构化日志设计 ---")
    
    /*
    结构化日志要点:
    
    1. 标准字段:时间戳、级别、消息、调用者信息
    2. 上下文字段:请求ID、用户ID、会话ID
    3. 业务字段:订单ID、产品ID等业务相关信息
    4. 元数据:服务名、版本号、环境信息
    */
    
    // 日志级别定义
    type LogLevel int
    
    const (
        DEBUG LogLevel = iota
        INFO
        WARN
        ERROR
        FATAL
    )
    
    func (l LogLevel) String() string {
        switch l {
        case DEBUG:
            return "DEBUG"
        case INFO:
            return "INFO"
        case WARN:
            return "WARN"
        case ERROR:
            return "ERROR"
        case FATAL:
            return "FATAL"
        default:
            return "UNKNOWN"
        }
    }
    
    // 日志条目结构
    type LogEntry struct {
        Timestamp   time.Time              `json:"timestamp"`
        Level       string                 `json:"level"`
        Message     string                 `json:"message"`
        Service     string                 `json:"service"`
        Version     string                 `json:"version"`
        Environment string                 `json:"environment"`
        RequestID   string                 `json:"request_id,omitempty"`
        UserID      string                 `json:"user_id,omitempty"`
        SessionID   string                 `json:"session_id,omitempty"`
        Caller      CallerInfo             `json:"caller"`
        Fields      map[string]interface{} `json:"fields,omitempty"`
        Error       *ErrorInfo             `json:"error,omitempty"`
    }
    
    type CallerInfo struct {
        File     string `json:"file"`
        Line     int    `json:"line"`
        Function string `json:"function"`
    }
    
    type ErrorInfo struct {
        Type       string `json:"type"`
        Message    string `json:"message"`
        StackTrace string `json:"stack_trace,omitempty"`
    }
    
    // 结构化日志器
    type StructuredLogger struct {
        service     string
        version     string
        environment string
        minLevel    LogLevel
        outputs     []io.Writer
        fields      map[string]interface{}
        mutex       sync.RWMutex
    }
    
    func NewStructuredLogger(service, version, environment string) *StructuredLogger {
        return &StructuredLogger{
            service:     service,
            version:     version,
            environment: environment,
            minLevel:    INFO,
            outputs:     []io.Writer{os.Stdout},
            fields:      make(map[string]interface{}),
        }
    }
    
    func (sl *StructuredLogger) SetLevel(level LogLevel) {
        sl.mutex.Lock()
        defer sl.mutex.Unlock()
        sl.minLevel = level
    }
    
    func (sl *StructuredLogger) AddOutput(output io.Writer) {
        sl.mutex.Lock()
        defer sl.mutex.Unlock()
        sl.outputs = append(sl.outputs, output)
    }
    
    func (sl *StructuredLogger) WithField(key string, value interface{}) *StructuredLogger {
        sl.mutex.RLock()
        fields := make(map[string]interface{})
        for k, v := range sl.fields {
            fields[k] = v
        }
        sl.mutex.RUnlock()
        
        fields[key] = value
        
        return &StructuredLogger{
            service:     sl.service,
            version:     sl.version,
            environment: sl.environment,
            minLevel:    sl.minLevel,
            outputs:     sl.outputs,
            fields:      fields,
        }
    }
    
    func (sl *StructuredLogger) WithFields(fields map[string]interface{}) *StructuredLogger {
        sl.mutex.RLock()
        newFields := make(map[string]interface{})
        for k, v := range sl.fields {
            newFields[k] = v
        }
        sl.mutex.RUnlock()
        
        for k, v := range fields {
            newFields[k] = v
        }
        
        return &StructuredLogger{
            service:     sl.service,
            version:     sl.version,
            environment: sl.environment,
            minLevel:    sl.minLevel,
            outputs:     sl.outputs,
            fields:      newFields,
        }
    }
    
    func (sl *StructuredLogger) log(level LogLevel, message string, err error) {
        sl.mutex.RLock()
        if level < sl.minLevel {
            sl.mutex.RUnlock()
            return
        }
        
        outputs := make([]io.Writer, len(sl.outputs))
        copy(outputs, sl.outputs)
        sl.mutex.RUnlock()
        
        // 获取调用者信息
        caller := sl.getCaller()
        
        // 构建日志条目
        entry := LogEntry{
            Timestamp:   time.Now(),
            Level:       level.String(),
            Message:     message,
            Service:     sl.service,
            Version:     sl.version,
            Environment: sl.environment,
            Caller:      caller,
        }
        
        // 添加上下文字段
        if len(sl.fields) > 0 {
            entry.Fields = make(map[string]interface{})
            for k, v := range sl.fields {
                entry.Fields[k] = v
            }
        }
        
        // 添加错误信息
        if err != nil {
            entry.Error = &ErrorInfo{
                Type:    fmt.Sprintf("%T", err),
                Message: err.Error(),
            }
        }
        
        // 序列化为JSON
        data, marshalErr := json.Marshal(entry)
        if marshalErr != nil {
            // 降级处理
            fallbackLog := fmt.Sprintf(`{"timestamp":"%s","level":"%s","message":"JSON marshal error: %v","service":"%s"}`,
                entry.Timestamp.Format(time.RFC3339),
                entry.Level,
                marshalErr,
                entry.Service)
            data = []byte(fallbackLog)
        }
        
        data = append(data, '\n')
        
        // 写入所有输出
        for _, output := range outputs {
            output.Write(data)
        }
    }
    
    func (sl *StructuredLogger) getCaller() CallerInfo {
        // 跳过当前函数和log函数
        pc, file, line, ok := runtime.Caller(3)
        if !ok {
            return CallerInfo{}
        }
        
        // 获取函数名
        fn := runtime.FuncForPC(pc)
        funcName := "unknown"
        if fn != nil {
            funcName = fn.Name()
            // 简化函数名
            if idx := strings.LastIndex(funcName, "/"); idx != -1 {
                funcName = funcName[idx+1:]
            }
        }
        
        // 简化文件路径
        if idx := strings.LastIndex(file, "/"); idx != -1 {
            file = file[idx+1:]
        }
        
        return CallerInfo{
            File:     file,
            Line:     line,
            Function: funcName,
        }
    }
    
    func (sl *StructuredLogger) Debug(message string) {
        sl.log(DEBUG, message, nil)
    }
    
    func (sl *StructuredLogger) Info(message string) {
        sl.log(INFO, message, nil)
    }
    
    func (sl *StructuredLogger) Warn(message string) {
        sl.log(WARN, message, nil)
    }
    
    func (sl *StructuredLogger) Error(message string, err error) {
        sl.log(ERROR, message, err)
    }
    
    func (sl *StructuredLogger) Fatal(message string, err error) {
        sl.log(FATAL, message, err)
        os.Exit(1)
    }
    
    // 演示结构化日志
    fmt.Printf("结构化日志演示:\n")
    
    logger := NewStructuredLogger("user-service", "v1.2.3", "production")
    logger.SetLevel(DEBUG)
    
    // 基础日志
    logger.Info("服务启动成功")
    logger.Debug("数据库连接池初始化")
    
    // 带字段的日志
    userLogger := logger.WithFields(map[string]interface{}{
        "request_id": "req-12345",
        "user_id":    "user-67890",
        "session_id": "sess-abcdef",
    })
    
    userLogger.Info("用户登录成功")
    
    // 带错误的日志
    err := fmt.Errorf("数据库连接超时")
    userLogger.Error("用户查询失败", err)
    
    // 业务操作日志
    orderLogger := userLogger.WithFields(map[string]interface{}{
        "order_id":    "order-123",
        "product_id":  "prod-456",
        "amount":      99.99,
        "currency":    "USD",
    })
    
    orderLogger.Info("订单创建成功")
    
    // 性能日志
    performanceLogger := logger.WithFields(map[string]interface{}{
        "operation":   "database_query",
        "duration_ms": 150,
        "rows":        42,
    })
    
    performanceLogger.Warn("数据库查询耗时较长")
}

func demonstrateLogLevels() {
    fmt.Println("\n--- 日志级别管理 ---")
    
    /*
    日志级别管理要点:
    
    1. 动态级别调整:运行时调整日志级别
    2. 模块级别控制:不同模块设置不同级别
    3. 环境相关配置:开发、测试、生产环境
    4. 采样和限流:高频日志的控制机制
    */
    
    // 日志级别管理器
    type LogLevelManager struct {
        globalLevel  LogLevel
        moduleLevel  map[string]LogLevel
        sampling     map[LogLevel]int // 采样率:1表示记录所有,10表示每10条记录1条
        counters     map[string]int64 // 计数器
        mutex        sync.RWMutex
    }
    
    func NewLogLevelManager() *LogLevelManager {
        return &LogLevelManager{
            globalLevel: INFO,
            moduleLevel: make(map[string]LogLevel),
            sampling: map[LogLevel]int{
                DEBUG: 1,
                INFO:  1,
                WARN:  1,
                ERROR: 1,
                FATAL: 1,
            },
            counters: make(map[string]int64),
        }
    }
    
    func (llm *LogLevelManager) SetGlobalLevel(level LogLevel) {
        llm.mutex.Lock()
        defer llm.mutex.Unlock()
        llm.globalLevel = level
    }
    
    func (llm *LogLevelManager) SetModuleLevel(module string, level LogLevel) {
        llm.mutex.Lock()
        defer llm.mutex.Unlock()
        llm.moduleLevel[module] = level
    }
    
    func (llm *LogLevelManager) SetSampling(level LogLevel, rate int) {
        llm.mutex.Lock()
        defer llm.mutex.Unlock()
        if rate > 0 {
            llm.sampling[level] = rate
        }
    }
    
    func (llm *LogLevelManager) ShouldLog(module string, level LogLevel) bool {
        llm.mutex.RLock()
        defer llm.mutex.RUnlock()
        
        // 检查模块级别
        if moduleLevel, exists := llm.moduleLevel[module]; exists {
            if level < moduleLevel {
                return false
            }
        } else if level < llm.globalLevel {
            return false
        }
        
        // 检查采样率
        if samplingRate, exists := llm.sampling[level]; exists && samplingRate > 1 {
            counterKey := fmt.Sprintf("%s_%s", module, level.String())
            llm.counters[counterKey]++
            return llm.counters[counterKey]%int64(samplingRate) == 0
        }
        
        return true
    }
    
    func (llm *LogLevelManager) GetStats() map[string]interface{} {
        llm.mutex.RLock()
        defer llm.mutex.RUnlock()
        
        stats := map[string]interface{}{
            "global_level": llm.globalLevel.String(),
            "module_levels": make(map[string]string),
            "sampling_rates": make(map[string]int),
            "counters": make(map[string]int64),
        }
        
        for module, level := range llm.moduleLevel {
            stats["module_levels"].(map[string]string)[module] = level.String()
        }
        
        for level, rate := range llm.sampling {
            stats["sampling_rates"].(map[string]int)[level.String()] = rate
        }
        
        for key, count := range llm.counters {
            stats["counters"].(map[string]int64)[key] = count
        }
        
        return stats
    }
    
    // 支持级别管理的日志器
    type LevelManagedLogger struct {
        module      string
        levelMgr    *LogLevelManager
        baseLogger  *StructuredLogger
    }
    
    func NewLevelManagedLogger(module string, levelMgr *LogLevelManager, baseLogger *StructuredLogger) *LevelManagedLogger {
        return &LevelManagedLogger{
            module:     module,
            levelMgr:   levelMgr,
            baseLogger: baseLogger,
        }
    }
    
    func (lml *LevelManagedLogger) log(level LogLevel, message string, err error) {
        if !lml.levelMgr.ShouldLog(lml.module, level) {
            return
        }
        
        // 添加模块信息
        logger := lml.baseLogger.WithField("module", lml.module)
        
        switch level {
        case DEBUG:
            logger.Debug(message)
        case INFO:
            logger.Info(message)
        case WARN:
            logger.Warn(message)
        case ERROR:
            logger.Error(message, err)
        case FATAL:
            logger.Fatal(message, err)
        }
    }
    
    func (lml *LevelManagedLogger) Debug(message string) {
        lml.log(DEBUG, message, nil)
    }
    
    func (lml *LevelManagedLogger) Info(message string) {
        lml.log(INFO, message, nil)
    }
    
    func (lml *LevelManagedLogger) Warn(message string) {
        lml.log(WARN, message, nil)
    }
    
    func (lml *LevelManagedLogger) Error(message string, err error) {
        lml.log(ERROR, message, err)
    }
    
    // 演示日志级别管理
    fmt.Printf("日志级别管理演示:\n")
    
    levelManager := NewLogLevelManager()
    baseLogger := NewStructuredLogger("demo-service", "v1.0.0", "development")
    
    // 创建不同模块的日志器
    authLogger := NewLevelManagedLogger("auth", levelManager, baseLogger)
    dbLogger := NewLevelManagedLogger("database", levelManager, baseLogger)
    apiLogger := NewLevelManagedLogger("api", levelManager, baseLogger)
    
    fmt.Printf("  📊 默认配置测试:\n")
    
    // 默认级别测试(INFO级别)
    authLogger.Debug("认证模块调试信息") // 不会输出
    authLogger.Info("用户认证成功")      // 会输出
    dbLogger.Info("数据库连接建立")      // 会输出
    
    fmt.Printf("\n  🔧 动态级别调整:\n")
    
    // 调整全局级别为DEBUG
    levelManager.SetGlobalLevel(DEBUG)
    authLogger.Debug("现在可以看到调试信息了")
    
    // 设置特定模块级别
    levelManager.SetModuleLevel("database", WARN)
    dbLogger.Info("数据库信息日志")   // 不会输出(级别不够)
    dbLogger.Warn("数据库连接缓慢") // 会输出
    
    fmt.Printf("\n  📉 日志采样测试:\n")
    
    // 设置DEBUG级别采样(每5条记录1条)
    levelManager.SetSampling(DEBUG, 5)
    
    fmt.Printf("    发送10条DEBUG日志(应该只记录2条):\n")
    for i := 1; i <= 10; i++ {
        authLogger.Debug(fmt.Sprintf("调试信息 #%d", i))
    }
    
    // 显示统计信息
    fmt.Printf("\n  📈 日志统计信息:\n")
    stats := levelManager.GetStats()
    for key, value := range stats {
        switch v := value.(type) {
        case map[string]string:
            fmt.Printf("    %s:\n", key)
            for k, val := range v {
                fmt.Printf("      %s: %s\n", k, val)
            }
        case map[string]int:
            fmt.Printf("    %s:\n", key)
            for k, val := range v {
                fmt.Printf("      %s: %d\n", k, val)
            }
        case map[string]int64:
            fmt.Printf("    %s:\n", key)
            for k, val := range v {
                fmt.Printf("      %s: %d\n", k, val)
            }
        default:
            fmt.Printf("    %s: %v\n", key, v)
        }
    }
}

func demonstrateAsyncLogging() {
    fmt.Println("\n--- 异步日志处理 ---")
    
    /*
    异步日志要点:
    
    1. 缓冲机制:批量写入提高性能
    2. 背压处理:缓冲区满时的策略
    3. 优雅关闭:确保所有日志都被写入
    4. 错误处理:异步写入失败的处理
    */
    
    // 异步日志条目
    type AsyncLogEntry struct {
        Data      []byte
        Timestamp time.Time
        Retry     int
    }
    
    // 异步日志器
    type AsyncLogger struct {
        buffer     chan AsyncLogEntry
        writers    []io.Writer
        batchSize  int
        flushInterval time.Duration
        maxRetries int
        stopCh     chan struct{}
        doneCh     chan struct{}
        errorCh    chan error
        stats      *AsyncLoggerStats
        mutex      sync.RWMutex
    }
    
    type AsyncLoggerStats struct {
        TotalLogs    int64
        DroppedLogs  int64
        FailedWrites int64
        BatchWrites  int64
    }
    
    func NewAsyncLogger(bufferSize, batchSize int, flushInterval time.Duration) *AsyncLogger {
        return &AsyncLogger{
            buffer:        make(chan AsyncLogEntry, bufferSize),
            batchSize:     batchSize,
            flushInterval: flushInterval,
            maxRetries:    3,
            stopCh:        make(chan struct{}),
            doneCh:        make(chan struct{}),
            errorCh:       make(chan error, 100),
            stats:         &AsyncLoggerStats{},
        }
    }
    
    func (al *AsyncLogger) AddWriter(writer io.Writer) {
        al.mutex.Lock()
        defer al.mutex.Unlock()
        al.writers = append(al.writers, writer)
    }
    
    func (al *AsyncLogger) Start() {
        go al.processLoop()
    }
    
    func (al *AsyncLogger) Stop() error {
        close(al.stopCh)
        
        // 等待处理完成
        select {
        case <-al.doneCh:
            return nil
        case <-time.After(5 * time.Second):
            return fmt.Errorf("异步日志器关闭超时")
        }
    }
    
    func (al *AsyncLogger) Log(data []byte) error {
        entry := AsyncLogEntry{
            Data:      data,
            Timestamp: time.Now(),
            Retry:     0,
        }
        
        select {
        case al.buffer <- entry:
            al.stats.TotalLogs++
            return nil
        default:
            // 缓冲区满,丢弃日志
            al.stats.DroppedLogs++
            return fmt.Errorf("日志缓冲区已满")
        }
    }
    
    func (al *AsyncLogger) processLoop() {
        defer close(al.doneCh)
        
        ticker := time.NewTicker(al.flushInterval)
        defer ticker.Stop()
        
        batch := make([]AsyncLogEntry, 0, al.batchSize)
        
        for {
            select {
            case <-al.stopCh:
                // 处理剩余的日志
                al.flushRemaining()
                return
                
            case entry := <-al.buffer:
                batch = append(batch, entry)
                if len(batch) >= al.batchSize {
                    al.writeBatch(batch)
                    batch = batch[:0] // 重置批次
                }
                
            case <-ticker.C:
                if len(batch) > 0 {
                    al.writeBatch(batch)
                    batch = batch[:0]
                }
            }
        }
    }
    
    func (al *AsyncLogger) writeBatch(batch []AsyncLogEntry) {
        if len(batch) == 0 {
            return
        }
        
        al.mutex.RLock()
        writers := make([]io.Writer, len(al.writers))
        copy(writers, al.writers)
        al.mutex.RUnlock()
        
        // 合并批次数据
        var combinedData []byte
        for _, entry := range batch {
            combinedData = append(combinedData, entry.Data...)
        }
        
        // 写入所有目标
        hasError := false
        for _, writer := range writers {
            if _, err := writer.Write(combinedData); err != nil {
                al.errorCh <- fmt.Errorf("写入失败: %v", err)
                al.stats.FailedWrites++
                hasError = true
            }
        }
        
        if !hasError {
            al.stats.BatchWrites++
        }
    }
    
    func (al *AsyncLogger) flushRemaining() {
        // 处理缓冲区中剩余的所有日志
        remaining := make([]AsyncLogEntry, 0)
        
        for {
            select {
            case entry := <-al.buffer:
                remaining = append(remaining, entry)
            default:
                if len(remaining) > 0 {
                    al.writeBatch(remaining)
                }
                return
            }
        }
    }
    
    func (al *AsyncLogger) GetStats() AsyncLoggerStats {
        return *al.stats
    }
    
    func (al *AsyncLogger) GetErrors() []error {
        var errors []error
        for {
            select {
            case err := <-al.errorCh:
                errors = append(errors, err)
            default:
                return errors
            }
        }
    }
    
    // 演示异步日志
    fmt.Printf("异步日志处理演示:\n")
    
    // 创建异步日志器
    asyncLogger := NewAsyncLogger(1000, 10, 100*time.Millisecond)
    
    // 添加输出目标
    asyncLogger.AddWriter(os.Stdout)
    
    // 启动异步处理
    asyncLogger.Start()
    
    fmt.Printf("  📝 发送异步日志:\n")
    
    // 发送大量日志测试性能
    start := time.Now()
    for i := 1; i <= 50; i++ {
        logData := fmt.Sprintf(`{"timestamp":"%s","level":"INFO","message":"异步日志消息 #%d","service":"test"}%s`,
            time.Now().Format(time.RFC3339), i, "\n")
        
        if err := asyncLogger.Log([]byte(logData)); err != nil {
            fmt.Printf("    ❌ 日志发送失败: %v\n", err)
        }
        
        // 模拟一些处理时间
        if i%10 == 0 {
            time.Sleep(10 * time.Millisecond)
        }
    }
    
    duration := time.Since(start)
    
    // 等待一段时间让异步处理完成
    time.Sleep(500 * time.Millisecond)
    
    // 停止异步日志器
    if err := asyncLogger.Stop(); err != nil {
        fmt.Printf("  ❌ 异步日志器停止失败: %v\n", err)
    }
    
    // 显示统计信息
    stats := asyncLogger.GetStats()
    fmt.Printf("\n  📊 异步日志统计:\n")
    fmt.Printf("    发送耗时: %v\n", duration)
    fmt.Printf("    总日志数: %d\n", stats.TotalLogs)
    fmt.Printf("    丢弃日志数: %d\n", stats.DroppedLogs)
    fmt.Printf("    批次写入数: %d\n", stats.BatchWrites)
    fmt.Printf("    写入失败数: %d\n", stats.FailedWrites)
    
    // 检查错误
    errors := asyncLogger.GetErrors()
    if len(errors) > 0 {
        fmt.Printf("    错误数: %d\n", len(errors))
        for i, err := range errors {
            if i < 3 { // 只显示前3个错误
                fmt.Printf("      %d: %v\n", i+1, err)
            }
        }
    }
    
    fmt.Printf("  ✅ 异步日志处理完成\n")
}

func demonstrateLogAggregation() {
    fmt.Println("\n--- 日志聚合和分析 ---")
    
    /*
    日志聚合要点:
    
    1. 日志收集:从多个源收集日志
    2. 格式标准化:统一日志格式
    3. 索引和搜索:支持快速查询
    4. 监控和告警:基于日志的监控
    */
    
    // 日志聚合器
    type LogAggregator struct {
        entries    []LogEntry
        indices    map[string][]int // 字段索引
        stats      map[string]int64 // 统计信息
        mutex      sync.RWMutex
    }
    
    func NewLogAggregator() *LogAggregator {
        return &LogAggregator{
            entries: make([]LogEntry, 0),
            indices: make(map[string][]int),
            stats:   make(map[string]int64),
        }
    }
    
    func (la *LogAggregator) AddEntry(entry LogEntry) {
        la.mutex.Lock()
        defer la.mutex.Unlock()
        
        index := len(la.entries)
        la.entries = append(la.entries, entry)
        
        // 更新索引
        la.updateIndex("level", entry.Level, index)
        la.updateIndex("service", entry.Service, index)
        la.updateIndex("environment", entry.Environment, index)
        
        if entry.RequestID != "" {
            la.updateIndex("request_id", entry.RequestID, index)
        }
        
        if entry.UserID != "" {
            la.updateIndex("user_id", entry.UserID, index)
        }
        
        // 更新统计信息
        la.stats["total"]++
        la.stats["level_"+entry.Level]++
        la.stats["service_"+entry.Service]++
    }
    
    func (la *LogAggregator) updateIndex(field, value string, index int) {
        key := field + ":" + value
        if _, exists := la.indices[key]; !exists {
            la.indices[key] = make([]int, 0)
        }
        la.indices[key] = append(la.indices[key], index)
    }
    
    func (la *LogAggregator) Search(field, value string) []LogEntry {
        la.mutex.RLock()
        defer la.mutex.RUnlock()
        
        key := field + ":" + value
        indices, exists := la.indices[key]
        if !exists {
            return nil
        }
        
        results := make([]LogEntry, 0, len(indices))
        for _, index := range indices {
            if index < len(la.entries) {
                results = append(results, la.entries[index])
            }
        }
        
        return results
    }
    
    func (la *LogAggregator) GetTimeRange(start, end time.Time) []LogEntry {
        la.mutex.RLock()
        defer la.mutex.RUnlock()
        
        var results []LogEntry
        for _, entry := range la.entries {
            if (entry.Timestamp.Equal(start) || entry.Timestamp.After(start)) &&
               (entry.Timestamp.Equal(end) || entry.Timestamp.Before(end)) {
                results = append(results, entry)
            }
        }
        
        return results
    }
    
    func (la *LogAggregator) GetStats() map[string]int64 {
        la.mutex.RLock()
        defer la.mutex.RUnlock()
        
        stats := make(map[string]int64)
        for k, v := range la.stats {
            stats[k] = v
        }
        
        return stats
    }
    
    func (la *LogAggregator) AnalyzeErrorPatterns() map[string]int {
        la.mutex.RLock()
        defer la.mutex.RUnlock()
        
        patterns := make(map[string]int)
        
        for _, entry := range la.entries {
            if entry.Level == "ERROR" && entry.Error != nil {
                patterns[entry.Error.Type]++
            }
        }
        
        return patterns
    }
    
    func (la *LogAggregator) GetTopUsers(limit int) []struct {
        UserID string
        Count  int
    } {
        la.mutex.RLock()
        defer la.mutex.RUnlock()
        
        userCounts := make(map[string]int)
        
        for _, entry := range la.entries {
            if entry.UserID != "" {
                userCounts[entry.UserID]++
            }
        }
        
        // 简单排序(实际应用中应使用更高效的排序)
        type userCount struct {
            UserID string
            Count  int
        }
        
        var users []userCount
        for userID, count := range userCounts {
            users = append(users, userCount{UserID: userID, Count: count})
        }
        
        // 简单冒泡排序(仅用于演示)
        for i := 0; i < len(users)-1; i++ {
            for j := 0; j < len(users)-i-1; j++ {
                if users[j].Count < users[j+1].Count {
                    users[j], users[j+1] = users[j+1], users[j]
                }
            }
        }
        
        result := make([]struct {
            UserID string
            Count  int
        }, 0, limit)
        
        for i := 0; i < len(users) && i < limit; i++ {
            result = append(result, struct {
                UserID string
                Count  int
            }{UserID: users[i].UserID, Count: users[i].Count})
        }
        
        return result
    }
    
    // 演示日志聚合
    fmt.Printf("日志聚合和分析演示:\n")
    
    aggregator := NewLogAggregator()
    
    // 模拟添加各种日志
    baseTime := time.Now().Add(-time.Hour)
    
    logEntries := []LogEntry{
        {
            Timestamp: baseTime,
            Level:     "INFO",
            Message:   "用户登录成功",
            Service:   "auth-service",
            Environment: "production",
            RequestID: "req-001",
            UserID:    "user-123",
        },
        {
            Timestamp: baseTime.Add(5 * time.Minute),
            Level:     "ERROR",
            Message:   "数据库连接失败",
            Service:   "user-service",
            Environment: "production",
            RequestID: "req-002",
            Error: &ErrorInfo{
                Type:    "DatabaseError",
                Message: "connection timeout",
            },
        },
        {
            Timestamp: baseTime.Add(10 * time.Minute),
            Level:     "WARN",
            Message:   "API请求频率过高",
            Service:   "api-gateway",
            Environment: "production",
            RequestID: "req-003",
            UserID:    "user-123",
        },
        {
            Timestamp: baseTime.Add(15 * time.Minute),
            Level:     "INFO",
            Message:   "订单创建成功",
            Service:   "order-service",
            Environment: "production",
            RequestID: "req-004",
            UserID:    "user-456",
        },
        {
            Timestamp: baseTime.Add(20 * time.Minute),
            Level:     "ERROR",
            Message:   "支付处理失败",
            Service:   "payment-service",
            Environment: "production",
            RequestID: "req-005",
            UserID:    "user-789",
            Error: &ErrorInfo{
                Type:    "PaymentError",
                Message: "insufficient funds",
            },
        },
    }
    
    // 添加日志条目
    fmt.Printf("  📥 添加日志条目:\n")
    for _, entry := range logEntries {
        aggregator.AddEntry(entry)
        fmt.Printf("    添加: %s [%s] %s\n", entry.Timestamp.Format("15:04:05"), entry.Level, entry.Message)
    }
    
    // 搜索测试
    fmt.Printf("\n  🔍 搜索测试:\n")
    
    // 按级别搜索
    errorLogs := aggregator.Search("level", "ERROR")
    fmt.Printf("    ERROR级别日志: %d\n", len(errorLogs))
    for _, log := range errorLogs {
        fmt.Printf("      - %s: %s\n", log.Service, log.Message)
    }
    
    // 按用户搜索
    userLogs := aggregator.Search("user_id", "user-123")
    fmt.Printf("    用户user-123的日志: %d\n", len(userLogs))
    
    // 按服务搜索
    serviceLogs := aggregator.Search("service", "user-service")
    fmt.Printf("    user-service的日志: %d\n", len(serviceLogs))
    
    // 时间范围搜索
    fmt.Printf("\n  ⏰ 时间范围搜索:\n")
    recentLogs := aggregator.GetTimeRange(baseTime.Add(10*time.Minute), baseTime.Add(25*time.Minute))
    fmt.Printf("    最近15分钟的日志: %d\n", len(recentLogs))
    
    // 统计分析
    fmt.Printf("\n  📊 统计分析:\n")
    stats := aggregator.GetStats()
    for key, value := range stats {
        fmt.Printf("    %s: %d\n", key, value)
    }
    
    // 错误模式分析
    fmt.Printf("\n  🚨 错误模式分析:\n")
    errorPatterns := aggregator.AnalyzeErrorPatterns()
    for errorType, count := range errorPatterns {
        fmt.Printf("    %s: %d\n", errorType, count)
    }
    
    // 活跃用户分析
    fmt.Printf("\n  👥 活跃用户分析:\n")
    topUsers := aggregator.GetTopUsers(3)
    for i, user := range topUsers {
        fmt.Printf("    %d. %s: %d次活动\n", i+1, user.UserID, user.Count)
    }
    
    fmt.Printf("\n  📋 日志聚合最佳实践:\n")
    fmt.Printf("    1. 建立统一的日志格式和字段标准\n")
    fmt.Printf("    2. 实现高效的索引和搜索机制\n")
    fmt.Printf("    3. 提供丰富的查询和分析功能\n")
    fmt.Printf("    4. 建立基于日志的监控和告警\n")
    fmt.Printf("    5. 考虑日志的存储和归档策略\n")
}

func main() {
    demonstrateLoggingManagement()
}

正在精进