Skip to content

Go并发设计模式详解 - Golang并发编程面试题

Go并发设计模式是构建高效并发程序的基础。本章深入探讨常见的并发模式、设计原则和实现技巧。

📋 重点面试题

面试题 1:经典并发设计模式

难度级别:⭐⭐⭐⭐⭐
考察范围:并发设计/设计模式
技术标签concurrency patterns design patterns goroutine patterns channel patterns

详细解答

1. 基础并发模式

点击查看完整代码实现
点击查看完整代码实现
go
package main

import (
    "context"
    "fmt"
    "math/rand"
    "runtime"
    "sync"
    "time"
)

func demonstrateConcurrencyPatterns() {
    fmt.Println("=== Go并发设计模式演示 ===")
    
    /*
    经典并发模式分类:
    
    1. 创建型模式:
       - Worker Pool:工作池模式
       - Pipeline:管道模式
       - Fan-in/Fan-out:扇入扇出模式
    
    2. 同步模式:
       - Producer-Consumer:生产者消费者
       - Publisher-Subscriber:发布订阅
       - Request-Response:请求响应
    
    3. 控制型模式:
       - Circuit Breaker:熔断器
       - Rate Limiter:限流器
       - Timeout:超时控制
    
    4. 数据流模式:
       - Stream Processing:流处理
       - Map-Reduce:映射归约
       - Scatter-Gather:分散聚集
    */
    
    // 演示各种并发模式
    demonstrateWorkerPool()
    demonstratePipeline()
    demonstrateFanInFanOut()
    demonstrateProducerConsumer()
}

func demonstrateWorkerPool() {
    fmt.Println("\n--- Worker Pool 模式 ---")
    
    /*
    Worker Pool模式:
    - 固定数量的worker goroutine
    - 通过channel分发任务
    - 控制并发度,避免goroutine泄漏
    */
    
    type Job struct {
        ID   int
        Data string
    }
    
    type Result struct {
        JobID  int
        Output string
        Error  error
    }
    
    type WorkerPool struct {
        workerCount int
        jobQueue    chan Job
        resultQueue chan Result
        quit        chan bool
        wg          sync.WaitGroup
    }
    
    func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
        return &WorkerPool{
            workerCount: workerCount,
            jobQueue:    make(chan Job, queueSize),
            resultQueue: make(chan Result, queueSize),
            quit:        make(chan bool),
        }
    }
    
    func (wp *WorkerPool) Start() {
        for i := 0; i < wp.workerCount; i++ {
            wp.wg.Add(1)
            go wp.worker(i)
        }
    }
    
    func (wp *WorkerPool) worker(workerID int) {
        defer wp.wg.Done()
        
        for {
            select {
            case job := <-wp.jobQueue:
                result := wp.processJob(workerID, job)
                wp.resultQueue <- result
                
            case <-wp.quit:
                fmt.Printf("Worker %d 退出\n", workerID)
                return
            }
        }
    }
    
    func (wp *WorkerPool) processJob(workerID int, job Job) Result {
        // 模拟工作处理
        processingTime := time.Duration(rand.Intn(100)) * time.Millisecond
        time.Sleep(processingTime)
        
        output := fmt.Sprintf("Worker %d 处理了 Job %d: %s", 
            workerID, job.ID, job.Data)
        
        return Result{
            JobID:  job.ID,
            Output: output,
            Error:  nil,
        }
    }
    
    func (wp *WorkerPool) Submit(job Job) {
        wp.jobQueue <- job
    }
    
    func (wp *WorkerPool) GetResult() Result {
        return <-wp.resultQueue
    }
    
    func (wp *WorkerPool) Stop() {
        close(wp.quit)
        wp.wg.Wait()
        close(wp.jobQueue)
        close(wp.resultQueue)
    }
    
    // 演示Worker Pool使用
    pool := NewWorkerPool(3, 10)
    pool.Start()
    
    // 提交任务
    go func() {
        for i := 0; i < 10; i++ {
            job := Job{
                ID:   i,
                Data: fmt.Sprintf("task-%d", i),
            }
            pool.Submit(job)
            fmt.Printf("提交任务 %d\n", i)
        }
    }()
    
    // 收集结果
    go func() {
        for i := 0; i < 10; i++ {
            result := pool.GetResult()
            fmt.Printf("收到结果: %s\n", result.Output)
        }
    }()
    
    time.Sleep(2 * time.Second)
    pool.Stop()
    fmt.Println("Worker Pool 演示完成")
}

func demonstratePipeline() {
    fmt.Println("\n--- Pipeline 模式 ---")
    
    /*
    Pipeline模式:
    - 将复杂处理分解为多个阶段
    - 每个阶段由独立的goroutine处理
    - 通过channel连接各个阶段
    */
    
    // 阶段1:数据生成
    generateNumbers := func(ctx context.Context) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for i := 1; i <= 20; i++ {
                select {
                case out <- i:
                    fmt.Printf("生成: %d\n", i)
                case <-ctx.Done():
                    return
                }
                time.Sleep(50 * time.Millisecond)
            }
        }()
        return out
    }
    
    // 阶段2:数据转换
    transform := func(ctx context.Context, in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for {
                select {
                case num, ok := <-in:
                    if !ok {
                        return
                    }
                    transformed := num * num
                    fmt.Printf("转换: %d -> %d\n", num, transformed)
                    
                    select {
                    case out <- transformed:
                    case <-ctx.Done():
                        return
                    }
                    
                case <-ctx.Done():
                    return
                }
            }
        }()
        return out
    }
    
    // 阶段3:数据过滤
    filter := func(ctx context.Context, in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for {
                select {
                case num, ok := <-in:
                    if !ok {
                        return
                    }
                    if num%2 == 0 { // 只保留偶数
                        fmt.Printf("过滤保留: %d\n", num)
                        select {
                        case out <- num:
                        case <-ctx.Done():
                            return
                        }
                    } else {
                        fmt.Printf("过滤丢弃: %d\n", num)
                    }
                    
                case <-ctx.Done():
                    return
                }
            }
        }()
        return out
    }
    
    // 阶段4:结果收集
    collect := func(ctx context.Context, in <-chan int) {
        results := make([]int, 0)
        for {
            select {
            case num, ok := <-in:
                if !ok {
                    fmt.Printf("收集完成,结果: %v\n", results)
                    return
                }
                results = append(results, num)
                fmt.Printf("收集: %d\n", num)
                
            case <-ctx.Done():
                fmt.Printf("收集被取消,当前结果: %v\n", results)
                return
            }
        }
    }
    
    // 构建并运行管道
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // 连接各个阶段
    stage1 := generateNumbers(ctx)
    stage2 := transform(ctx, stage1)
    stage3 := filter(ctx, stage2)
    
    // 收集最终结果
    collect(ctx, stage3)
    
    fmt.Println("Pipeline 演示完成")
}

func demonstrateFanInFanOut() {
    fmt.Println("\n--- Fan-in/Fan-out 模式 ---")
    
    /*
    Fan-out模式:一个数据源分发给多个处理器
    Fan-in模式:多个数据源合并到一个处理器
    */
    
    // Fan-out:分发任务到多个worker
    fanOut := func(ctx context.Context, in <-chan int, numWorkers int) []<-chan int {
        outputs := make([]<-chan int, numWorkers)
        
        for i := 0; i < numWorkers; i++ {
            output := make(chan int)
            outputs[i] = output
            
            go func(workerID int, out chan<- int) {
                defer close(out)
                
                for {
                    select {
                    case data, ok := <-in:
                        if !ok {
                            fmt.Printf("Fan-out Worker %d 结束\n", workerID)
                            return
                        }
                        
                        // 模拟处理
                        processed := data * (workerID + 1)
                        fmt.Printf("Worker %d 处理: %d -> %d\n", 
                            workerID, data, processed)
                        
                        select {
                        case out <- processed:
                        case <-ctx.Done():
                            return
                        }
                        
                    case <-ctx.Done():
                        return
                    }
                }
            }(i, output)
        }
        
        return outputs
    }
    
    // Fan-in:合并多个channel的输出
    fanIn := func(ctx context.Context, inputs ...<-chan int) <-chan int {
        output := make(chan int)
        var wg sync.WaitGroup
        
        // 为每个输入channel启动一个goroutine
        for i, input := range inputs {
            wg.Add(1)
            go func(id int, in <-chan int) {
                defer wg.Done()
                
                for {
                    select {
                    case data, ok := <-in:
                        if !ok {
                            fmt.Printf("Fan-in 输入 %d 结束\n", id)
                            return
                        }
                        
                        select {
                        case output <- data:
                            fmt.Printf("Fan-in 合并来自输入 %d 的数据: %d\n", 
                                id, data)
                        case <-ctx.Done():
                            return
                        }
                        
                    case <-ctx.Done():
                        return
                    }
                }
            }(i, input)
        }
        
        // 等待所有输入完成后关闭输出
        go func() {
            wg.Wait()
            close(output)
            fmt.Println("Fan-in 合并完成")
        }()
        
        return output
    }
    
    // 数据源
    source := func(ctx context.Context) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for i := 1; i <= 10; i++ {
                select {
                case out <- i:
                    fmt.Printf("源数据: %d\n", i)
                case <-ctx.Done():
                    return
                }
                time.Sleep(100 * time.Millisecond)
            }
        }()
        return out
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    // 构建Fan-out/Fan-in管道
    input := source(ctx)
    outputs := fanOut(ctx, input, 3)        // 分发给3个worker
    merged := fanIn(ctx, outputs...)        // 合并结果
    
    // 收集最终结果
    results := make([]int, 0)
    for result := range merged {
        results = append(results, result)
    }
    
    fmt.Printf("Fan-in/Fan-out 最终结果: %v\n", results)
}

func demonstrateProducerConsumer() {
    fmt.Println("\n--- Producer-Consumer 模式 ---")
    
    /*
    Producer-Consumer模式:
    - 生产者生成数据
    - 消费者消费数据
    - 通过缓冲区解耦
    */
    
    type Message struct {
        ID      int
        Content string
        Timestamp time.Time
    }
    
    // 生产者
    producer := func(ctx context.Context, id int, output chan<- Message) {
        defer func() {
            fmt.Printf("生产者 %d 退出\n", id)
        }()
        
        messageID := 0
        ticker := time.NewTicker(200 * time.Millisecond)
        defer ticker.Stop()
        
        for {
            select {
            case <-ticker.C:
                message := Message{
                    ID:      messageID,
                    Content: fmt.Sprintf("来自生产者 %d 的消息 %d", id, messageID),
                    Timestamp: time.Now(),
                }
                
                select {
                case output <- message:
                    fmt.Printf("生产者 %d 发送: %s\n", id, message.Content)
                    messageID++
                case <-ctx.Done():
                    return
                }
                
            case <-ctx.Done():
                return
            }
        }
    }
    
    // 消费者
    consumer := func(ctx context.Context, id int, input <-chan Message) {
        defer func() {
            fmt.Printf("消费者 %d 退出\n", id)
        }()
        
        processed := 0
        
        for {
            select {
            case message, ok := <-input:
                if !ok {
                    fmt.Printf("消费者 %d 处理了 %d 条消息\n", id, processed)
                    return
                }
                
                // 模拟处理时间
                processingTime := time.Duration(rand.Intn(300)) * time.Millisecond
                time.Sleep(processingTime)
                
                fmt.Printf("消费者 %d 处理: %s (耗时: %v)\n", 
                    id, message.Content, processingTime)
                processed++
                
            case <-ctx.Done():
                fmt.Printf("消费者 %d 被取消,已处理 %d 条消息\n", id, processed)
                return
            }
        }
    }
    
    // 创建带缓冲的channel作为消息队列
    messageQueue := make(chan Message, 5)
    
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    var wg sync.WaitGroup
    
    // 启动2个生产者
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            producer(ctx, id, messageQueue)
        }(i)
    }
    
    // 启动3个消费者
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            consumer(ctx, id, messageQueue)
        }(i)
    }
    
    // 等待超时
    <-ctx.Done()
    close(messageQueue)
    wg.Wait()
    
    fmt.Println("Producer-Consumer 演示完成")
}

:::

面试题 2:高级并发控制模式

难度级别:⭐⭐⭐⭐⭐
考察范围:并发控制/可靠性设计
技术标签circuit breaker rate limiter bulkhead retry patterns

详细解答

1. 并发控制和稳定性模式

点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateAdvancedPatterns() {
    fmt.Println("\n=== 高级并发控制模式 ===")
    
    /*
    高级并发控制模式:
    
    1. 稳定性模式:
       - Circuit Breaker:熔断器
       - Bulkhead:舱壁隔离
       - Timeout:超时控制
    
    2. 流控模式:
       - Rate Limiter:限流器
       - Semaphore:信号量
       - Throttling:节流
    
    3. 重试模式:
       - Exponential Backoff:指数退避
       - Jitter:抖动
       - Dead Letter Queue:死信队列
    */
    
    demonstrateCircuitBreaker()
    demonstrateRateLimiter()
    demonstrateBulkhead()
    demonstrateRetryPattern()
}

func demonstrateCircuitBreaker() {
    fmt.Println("\n--- Circuit Breaker 模式 ---")
    
    /*
    Circuit Breaker模式:
    - 监控服务调用失败率
    - 达到阈值时"开路",快速失败
    - 定期尝试恢复"闭路"状态
    */
    
    type CircuitBreakerState int
    
    const (
        Closed CircuitBreakerState = iota
        Open
        HalfOpen
    )
    
    func (s CircuitBreakerState) String() string {
        switch s {
        case Closed:
            return "CLOSED"
        case Open:
            return "OPEN"
        case HalfOpen:
            return "HALF-OPEN"
        default:
            return "UNKNOWN"
        }
    }
    
    type CircuitBreaker struct {
        maxFailures     int
        timeout         time.Duration
        state           CircuitBreakerState
        failures        int
        lastFailureTime time.Time
        mutex           sync.RWMutex
    }
    
    func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
        return &CircuitBreaker{
            maxFailures: maxFailures,
            timeout:     timeout,
            state:       Closed,
        }
    }
    
    func (cb *CircuitBreaker) Call(fn func() error) error {
        cb.mutex.Lock()
        defer cb.mutex.Unlock()
        
        // 检查是否可以调用
        if !cb.canCall() {
            return fmt.Errorf("circuit breaker is %s", cb.state)
        }
        
        // 执行调用
        err := fn()
        
        // 记录结果
        cb.recordResult(err)
        
        return err
    }
    
    func (cb *CircuitBreaker) canCall() bool {
        switch cb.state {
        case Closed:
            return true
        case Open:
            // 检查是否可以转到半开状态
            if time.Since(cb.lastFailureTime) > cb.timeout {
                cb.state = HalfOpen
                fmt.Printf("熔断器状态: %s -> %s\n", Open, HalfOpen)
                return true
            }
            return false
        case HalfOpen:
            return true
        default:
            return false
        }
    }
    
    func (cb *CircuitBreaker) recordResult(err error) {
        if err != nil {
            cb.failures++
            cb.lastFailureTime = time.Now()
            
            if cb.state == HalfOpen || cb.failures >= cb.maxFailures {
                cb.state = Open
                fmt.Printf("熔断器打开: 失败次数 %d, 状态: %s\n", 
                    cb.failures, cb.state)
            }
        } else {
            // 成功调用
            if cb.state == HalfOpen {
                cb.state = Closed
                cb.failures = 0
                fmt.Printf("熔断器关闭: 状态 %s, 失败计数重置\n", cb.state)
            }
        }
    }
    
    func (cb *CircuitBreaker) GetState() CircuitBreakerState {
        cb.mutex.RLock()
        defer cb.mutex.RUnlock()
        return cb.state
    }
    
    // 模拟不稳定的服务
    unreliableService := func() error {
        // 70%的失败率
        if rand.Float32() < 0.7 {
            return fmt.Errorf("service failure")
        }
        return nil
    }
    
    // 演示熔断器使用
    cb := NewCircuitBreaker(3, 2*time.Second)
    
    for i := 0; i < 15; i++ {
        err := cb.Call(unreliableService)
        
        if err != nil {
            fmt.Printf("调用 %d: 失败 - %v (状态: %s)\n", 
                i+1, err, cb.GetState())
        } else {
            fmt.Printf("调用 %d: 成功 (状态: %s)\n", 
                i+1, cb.GetState())
        }
        
        time.Sleep(300 * time.Millisecond)
    }
    
    fmt.Println("Circuit Breaker 演示完成")
}

func demonstrateRateLimiter() {
    fmt.Println("\n--- Rate Limiter 模式 ---")
    
    /*
    Rate Limiter模式:
    - 控制请求速率
    - 防止系统过载
    - 支持突发流量
    */
    
    // Token Bucket算法实现
    type TokenBucket struct {
        capacity   int
        tokens     int
        refillRate time.Duration
        lastRefill time.Time
        mutex      sync.Mutex
    }
    
    func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
        return &TokenBucket{
            capacity:   capacity,
            tokens:     capacity,
            refillRate: refillRate,
            lastRefill: time.Now(),
        }
    }
    
    func (tb *TokenBucket) TryConsume() bool {
        tb.mutex.Lock()
        defer tb.mutex.Unlock()
        
        tb.refill()
        
        if tb.tokens > 0 {
            tb.tokens--
            return true
        }
        
        return false
    }
    
    func (tb *TokenBucket) refill() {
        now := time.Now()
        elapsed := now.Sub(tb.lastRefill)
        
        tokensToAdd := int(elapsed / tb.refillRate)
        if tokensToAdd > 0 {
            tb.tokens += tokensToAdd
            if tb.tokens > tb.capacity {
                tb.tokens = tb.capacity
            }
            tb.lastRefill = now
        }
    }
    
    func (tb *TokenBucket) GetTokens() int {
        tb.mutex.Lock()
        defer tb.mutex.Unlock()
        tb.refill()
        return tb.tokens
    }
    
    // Sliding Window算法实现
    type SlidingWindowLimiter struct {
        maxRequests int
        window      time.Duration
        requests    []time.Time
        mutex       sync.Mutex
    }
    
    func NewSlidingWindowLimiter(maxRequests int, window time.Duration) *SlidingWindowLimiter {
        return &SlidingWindowLimiter{
            maxRequests: maxRequests,
            window:      window,
            requests:    make([]time.Time, 0),
        }
    }
    
    func (swl *SlidingWindowLimiter) TryConsume() bool {
        swl.mutex.Lock()
        defer swl.mutex.Unlock()
        
        now := time.Now()
        cutoff := now.Add(-swl.window)
        
        // 移除窗口外的请求
        var validRequests []time.Time
        for _, req := range swl.requests {
            if req.After(cutoff) {
                validRequests = append(validRequests, req)
            }
        }
        swl.requests = validRequests
        
        // 检查是否可以添加新请求
        if len(swl.requests) < swl.maxRequests {
            swl.requests = append(swl.requests, now)
            return true
        }
        
        return false
    }
    
    func (swl *SlidingWindowLimiter) GetCurrentRequests() int {
        swl.mutex.Lock()
        defer swl.mutex.Unlock()
        
        now := time.Now()
        cutoff := now.Add(-swl.window)
        
        count := 0
        for _, req := range swl.requests {
            if req.After(cutoff) {
                count++
            }
        }
        
        return count
    }
    
    // 演示限流器
    fmt.Println("Token Bucket 限流器测试:")
    tokenBucket := NewTokenBucket(5, 200*time.Millisecond)
    
    for i := 0; i < 10; i++ {
        if tokenBucket.TryConsume() {
            fmt.Printf("请求 %d: 通过 (剩余令牌: %d)\n", 
                i+1, tokenBucket.GetTokens())
        } else {
            fmt.Printf("请求 %d: 被限流 (剩余令牌: %d)\n", 
                i+1, tokenBucket.GetTokens())
        }
        time.Sleep(100 * time.Millisecond)
    }
    
    fmt.Println("\nSliding Window 限流器测试:")
    slidingWindow := NewSlidingWindowLimiter(3, time.Second)
    
    for i := 0; i < 8; i++ {
        if slidingWindow.TryConsume() {
            fmt.Printf("请求 %d: 通过 (窗口内请求: %d)\n", 
                i+1, slidingWindow.GetCurrentRequests())
        } else {
            fmt.Printf("请求 %d: 被限流 (窗口内请求: %d)\n", 
                i+1, slidingWindow.GetCurrentRequests())
        }
        time.Sleep(200 * time.Millisecond)
    }
}

func demonstrateBulkhead() {
    fmt.Println("\n--- Bulkhead 隔离模式 ---")
    
    /*
    Bulkhead模式:
    - 资源隔离,防止级联失败
    - 为不同服务分配独立资源池
    - 一个服务故障不影响其他服务
    */
    
    type ResourcePool struct {
        name      string
        resources chan struct{}
        timeout   time.Duration
    }
    
    func NewResourcePool(name string, size int, timeout time.Duration) *ResourcePool {
        pool := &ResourcePool{
            name:      name,
            resources: make(chan struct{}, size),
            timeout:   timeout,
        }
        
        // 填充资源池
        for i := 0; i < size; i++ {
            pool.resources <- struct{}{}
        }
        
        return pool
    }
    
    func (rp *ResourcePool) Acquire() error {
        select {
        case <-rp.resources:
            return nil
        case <-time.After(rp.timeout):
            return fmt.Errorf("acquire timeout for %s", rp.name)
        }
    }
    
    func (rp *ResourcePool) Release() {
        select {
        case rp.resources <- struct{}{}:
        default:
            // 池满,可能资源泄漏
            fmt.Printf("警告: %s 资源池已满\n", rp.name)
        }
    }
    
    func (rp *ResourcePool) Available() int {
        return len(rp.resources)
    }
    
    // 服务隔离管理器
    type BulkheadManager struct {
        pools map[string]*ResourcePool
        mutex sync.RWMutex
    }
    
    func NewBulkheadManager() *BulkheadManager {
        return &BulkheadManager{
            pools: make(map[string]*ResourcePool),
        }
    }
    
    func (bm *BulkheadManager) AddPool(name string, size int, timeout time.Duration) {
        bm.mutex.Lock()
        defer bm.mutex.Unlock()
        
        bm.pools[name] = NewResourcePool(name, size, timeout)
    }
    
    func (bm *BulkheadManager) Execute(poolName string, fn func() error) error {
        bm.mutex.RLock()
        pool, exists := bm.pools[poolName]
        bm.mutex.RUnlock()
        
        if !exists {
            return fmt.Errorf("pool %s not found", poolName)
        }
        
        // 获取资源
        if err := pool.Acquire(); err != nil {
            return err
        }
        defer pool.Release()
        
        // 执行任务
        return fn()
    }
    
    func (bm *BulkheadManager) GetPoolStatus() map[string]int {
        bm.mutex.RLock()
        defer bm.mutex.RUnlock()
        
        status := make(map[string]int)
        for name, pool := range bm.pools {
            status[name] = pool.Available()
        }
        return status
    }
    
    // 演示舱壁隔离
    manager := NewBulkheadManager()
    
    // 为不同服务创建隔离的资源池
    manager.AddPool("user-service", 3, time.Second)
    manager.AddPool("payment-service", 2, time.Second)
    manager.AddPool("notification-service", 5, time.Second)
    
    var wg sync.WaitGroup
    
    // 模拟不同服务的并发请求
    services := []string{"user-service", "payment-service", "notification-service"}
    
    for _, service := range services {
        for i := 0; i < 5; i++ {
            wg.Add(1)
            go func(svc string, reqID int) {
                defer wg.Done()
                
                err := manager.Execute(svc, func() error {
                    // 模拟服务处理时间
                    delay := time.Duration(rand.Intn(500)) * time.Millisecond
                    time.Sleep(delay)
                    
                    fmt.Printf("%s 请求 %d 完成 (耗时: %v)\n", 
                        svc, reqID, delay)
                    return nil
                })
                
                if err != nil {
                    fmt.Printf("%s 请求 %d 失败: %v\n", svc, reqID, err)
                }
            }(service, i)
            
            time.Sleep(50 * time.Millisecond)
        }
    }
    
    // 监控资源池状态
    go func() {
        for i := 0; i < 10; i++ {
            status := manager.GetPoolStatus()
            fmt.Printf("资源池状态: %v\n", status)
            time.Sleep(200 * time.Millisecond)
        }
    }()
    
    wg.Wait()
    fmt.Println("Bulkhead 隔离演示完成")
}

func demonstrateRetryPattern() {
    fmt.Println("\n--- Retry 重试模式 ---")
    
    /*
    Retry模式:
    - 指数退避重试
    - 抖动避免惊群
    - 最大重试次数限制
    */
    
    type RetryConfig struct {
        MaxAttempts int
        BaseDelay   time.Duration
        MaxDelay    time.Duration
        Multiplier  float64
        Jitter      bool
    }
    
    func RetryWithBackoff(config RetryConfig, fn func() error) error {
        var lastErr error
        delay := config.BaseDelay
        
        for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
            err := fn()
            if err == nil {
                if attempt > 1 {
                    fmt.Printf("重试成功,尝试次数: %d\n", attempt)
                }
                return nil
            }
            
            lastErr = err
            fmt.Printf("尝试 %d/%d 失败: %v\n", 
                attempt, config.MaxAttempts, err)
            
            if attempt == config.MaxAttempts {
                break
            }
            
            // 计算下次重试延迟
            actualDelay := delay
            if config.Jitter {
                // 添加±25%的抖动
                jitter := time.Duration(rand.Float64() * 0.5 * float64(delay))
                if rand.Float64() < 0.5 {
                    actualDelay -= jitter
                } else {
                    actualDelay += jitter
                }
            }
            
            fmt.Printf("等待 %v 后重试...\n", actualDelay)
            time.Sleep(actualDelay)
            
            // 指数退避
            delay = time.Duration(float64(delay) * config.Multiplier)
            if delay > config.MaxDelay {
                delay = config.MaxDelay
            }
        }
        
        return fmt.Errorf("重试 %d 次后仍然失败: %v", 
            config.MaxAttempts, lastErr)
    }
    
    // 模拟不稳定的操作
    attempt := 0
    unstableOperation := func() error {
        attempt++
        // 前3次必定失败,第4次成功
        if attempt < 4 {
            return fmt.Errorf("operation failed (attempt %d)", attempt)
        }
        return nil
    }
    
    // 测试重试机制
    config := RetryConfig{
        MaxAttempts: 5,
        BaseDelay:   100 * time.Millisecond,
        MaxDelay:    2 * time.Second,
        Multiplier:  2.0,
        Jitter:      true,
    }
    
    fmt.Println("开始重试操作:")
    err := RetryWithBackoff(config, unstableOperation)
    
    if err != nil {
        fmt.Printf("最终失败: %v\n", err)
    } else {
        fmt.Println("操作最终成功")
    }
}

func main() {
    demonstrateConcurrencyPatterns()
    demonstrateAdvancedPatterns()
}

:::

面试题 3:响应式编程模式

难度级别:⭐⭐⭐⭐⭐
考察范围:异步编程/事件驱动
技术标签reactive programming event-driven stream processing async patterns

详细解答

1. 响应式和事件驱动模式

点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateReactivePatterns() {
    fmt.Println("\n=== 响应式编程模式 ===")
    
    /*
    响应式编程模式:
    
    1. 事件流处理:
       - Event Stream:事件流
       - Observer:观察者模式
       - Subject:主题模式
    
    2. 异步组合:
       - Future/Promise:异步结果
       - Compose:组合操作
       - Transform:转换操作
    
    3. 背压控制:
       - Backpressure:背压处理
       - Buffer:缓冲策略
       - Drop:丢弃策略
    */
    
    demonstrateEventStream()
    demonstrateObserverPattern()
    demonstrateAsyncComposition()
    demonstrateBackpressure()
}

func demonstrateEventStream() {
    fmt.Println("\n--- Event Stream 事件流 ---")
    
    type Event struct {
        Type      string
        Data      interface{}
        Timestamp time.Time
    }
    
    type EventStream struct {
        subscribers []chan Event
        buffer      []Event
        maxBuffer   int
        mutex       sync.RWMutex
        closed      bool
    }
    
    func NewEventStream(bufferSize int) *EventStream {
        return &EventStream{
            subscribers: make([]chan Event, 0),
            buffer:      make([]Event, 0, bufferSize),
            maxBuffer:   bufferSize,
        }
    }
    
    func (es *EventStream) Subscribe() <-chan Event {
        es.mutex.Lock()
        defer es.mutex.Unlock()
        
        if es.closed {
            ch := make(chan Event)
            close(ch)
            return ch
        }
        
        subscriber := make(chan Event, 10)
        es.subscribers = append(es.subscribers, subscriber)
        
        // 发送缓冲的事件
        for _, event := range es.buffer {
            select {
            case subscriber <- event:
            default:
                // 订阅者满了,跳过
            }
        }
        
        return subscriber
    }
    
    func (es *EventStream) Publish(event Event) {
        es.mutex.RLock()
        defer es.mutex.RUnlock()
        
        if es.closed {
            return
        }
        
        event.Timestamp = time.Now()
        
        // 添加到缓冲区
        if len(es.buffer) < es.maxBuffer {
            es.buffer = append(es.buffer, event)
        } else {
            // 缓冲区满,移除最老的事件
            es.buffer = append(es.buffer[1:], event)
        }
        
        // 发送给所有订阅者
        for i, subscriber := range es.subscribers {
            select {
            case subscriber <- event:
            default:
                // 订阅者阻塞,考虑移除
                fmt.Printf("订阅者 %d 阻塞,跳过事件\n", i)
            }
        }
    }
    
    func (es *EventStream) Close() {
        es.mutex.Lock()
        defer es.mutex.Unlock()
        
        es.closed = true
        
        for _, subscriber := range es.subscribers {
            close(subscriber)
        }
        
        es.subscribers = nil
    }
    
    // 事件过滤器
    func (es *EventStream) Filter(predicate func(Event) bool) *EventStream {
        filtered := NewEventStream(es.maxBuffer)
        
        go func() {
            subscriber := es.Subscribe()
            for event := range subscriber {
                if predicate(event) {
                    filtered.Publish(event)
                }
            }
            filtered.Close()
        }()
        
        return filtered
    }
    
    // 事件转换器
    func (es *EventStream) Map(transform func(Event) Event) *EventStream {
        mapped := NewEventStream(es.maxBuffer)
        
        go func() {
            subscriber := es.Subscribe()
            for event := range subscriber {
                transformed := transform(event)
                mapped.Publish(transformed)
            }
            mapped.Close()
        }()
        
        return mapped
    }
    
    // 演示事件流
    stream := NewEventStream(5)
    
    // 创建过滤流:只处理"user"类型事件
    userStream := stream.Filter(func(e Event) bool {
        return e.Type == "user"
    })
    
    // 创建转换流:添加处理时间戳
    processedStream := userStream.Map(func(e Event) Event {
        return Event{
            Type: e.Type + "_processed",
            Data: map[string]interface{}{
                "original": e.Data,
                "processed_at": time.Now(),
            },
            Timestamp: e.Timestamp,
        }
    })
    
    // 订阅处理后的事件
    go func() {
        subscriber := processedStream.Subscribe()
        for event := range subscriber {
            fmt.Printf("处理后事件: %s, 数据: %v\n", 
                event.Type, event.Data)
        }
    }()
    
    // 发布事件
    events := []Event{
        {Type: "user", Data: "用户登录"},
        {Type: "system", Data: "系统启动"},
        {Type: "user", Data: "用户注销"},
        {Type: "order", Data: "新订单"},
        {Type: "user", Data: "用户注册"},
    }
    
    for _, event := range events {
        stream.Publish(event)
        fmt.Printf("发布事件: %s\n", event.Type)
        time.Sleep(200 * time.Millisecond)
    }
    
    time.Sleep(500 * time.Millisecond)
    stream.Close()
}

func demonstrateObserverPattern() {
    fmt.Println("\n--- Observer 观察者模式 ---")
    
    type Observer interface {
        Update(event interface{})
        ID() string
    }
    
    type Subject interface {
        Attach(observer Observer)
        Detach(observer Observer)
        Notify(event interface{})
    }
    
    // 具体主题实现
    type EventSubject struct {
        observers []Observer
        mutex     sync.RWMutex
    }
    
    func NewEventSubject() *EventSubject {
        return &EventSubject{
            observers: make([]Observer, 0),
        }
    }
    
    func (es *EventSubject) Attach(observer Observer) {
        es.mutex.Lock()
        defer es.mutex.Unlock()
        
        es.observers = append(es.observers, observer)
        fmt.Printf("观察者 %s 已添加\n", observer.ID())
    }
    
    func (es *EventSubject) Detach(observer Observer) {
        es.mutex.Lock()
        defer es.mutex.Unlock()
        
        for i, obs := range es.observers {
            if obs.ID() == observer.ID() {
                es.observers = append(es.observers[:i], es.observers[i+1:]...)
                fmt.Printf("观察者 %s 已移除\n", observer.ID())
                break
            }
        }
    }
    
    func (es *EventSubject) Notify(event interface{}) {
        es.mutex.RLock()
        observers := make([]Observer, len(es.observers))
        copy(observers, es.observers)
        es.mutex.RUnlock()
        
        // 并发通知所有观察者
        var wg sync.WaitGroup
        for _, observer := range observers {
            wg.Add(1)
            go func(obs Observer) {
                defer wg.Done()
                defer func() {
                    if r := recover(); r != nil {
                        fmt.Printf("观察者 %s 处理事件时出现异常: %v\n", 
                            obs.ID(), r)
                    }
                }()
                
                obs.Update(event)
            }(observer)
        }
        wg.Wait()
    }
    
    // 具体观察者实现
    type LogObserver struct {
        id string
    }
    
    func (lo *LogObserver) Update(event interface{}) {
        fmt.Printf("[LOG] %s: %v\n", lo.id, event)
        time.Sleep(50 * time.Millisecond) // 模拟处理时间
    }
    
    func (lo *LogObserver) ID() string {
        return lo.id
    }
    
    type EmailObserver struct {
        id string
    }
    
    func (eo *EmailObserver) Update(event interface{}) {
        fmt.Printf("[EMAIL] %s: 发送邮件通知 - %v\n", eo.id, event)
        time.Sleep(100 * time.Millisecond) // 模拟发送时间
    }
    
    func (eo *EmailObserver) ID() string {
        return eo.id
    }
    
    type MetricsObserver struct {
        id      string
        counter int64
    }
    
    func (mo *MetricsObserver) Update(event interface{}) {
        atomic.AddInt64(&mo.counter, 1)
        fmt.Printf("[METRICS] %s: 事件计数 - %d\n", 
            mo.id, atomic.LoadInt64(&mo.counter))
    }
    
    func (mo *MetricsObserver) ID() string {
        return mo.id
    }
    
    // 演示观察者模式
    subject := NewEventSubject()
    
    // 创建观察者
    logObs := &LogObserver{id: "logger"}
    emailObs := &EmailObserver{id: "emailer"}
    metricsObs := &MetricsObserver{id: "metrics"}
    
    // 注册观察者
    subject.Attach(logObs)
    subject.Attach(emailObs)
    subject.Attach(metricsObs)
    
    // 发送事件
    events := []interface{}{
        "用户登录事件",
        "订单创建事件",
        "支付完成事件",
        "系统警告事件",
    }
    
    for i, event := range events {
        fmt.Printf("\n--- 事件 %d ---\n", i+1)
        subject.Notify(event)
        time.Sleep(200 * time.Millisecond)
    }
    
    // 移除一个观察者
    subject.Detach(emailObs)
    
    fmt.Printf("\n--- 移除邮件观察者后 ---\n")
    subject.Notify("最终事件")
}

func demonstrateAsyncComposition() {
    fmt.Println("\n--- Async Composition 异步组合 ---")
    
    // Future/Promise实现
    type Future struct {
        result chan interface{}
        err    chan error
        done   chan bool
    }
    
    func NewFuture() *Future {
        return &Future{
            result: make(chan interface{}, 1),
            err:    make(chan error, 1),
            done:   make(chan bool, 1),
        }
    }
    
    func (f *Future) Complete(value interface{}) {
        select {
        case f.result <- value:
            close(f.done)
        default:
        }
    }
    
    func (f *Future) Fail(err error) {
        select {
        case f.err <- err:
            close(f.done)
        default:
        }
    }
    
    func (f *Future) Get(timeout time.Duration) (interface{}, error) {
        timer := time.NewTimer(timeout)
        defer timer.Stop()
        
        select {
        case result := <-f.result:
            return result, nil
        case err := <-f.err:
            return nil, err
        case <-timer.C:
            return nil, fmt.Errorf("timeout after %v", timeout)
        }
    }
    
    func (f *Future) Then(transform func(interface{}) interface{}) *Future {
        nextFuture := NewFuture()
        
        go func() {
            result, err := f.Get(10 * time.Second)
            if err != nil {
                nextFuture.Fail(err)
                return
            }
            
            transformed := transform(result)
            nextFuture.Complete(transformed)
        }()
        
        return nextFuture
    }
    
    // 异步操作函数
    asyncOperation := func(id int, delay time.Duration) *Future {
        future := NewFuture()
        
        go func() {
            time.Sleep(delay)
            
            // 模拟随机成功/失败
            if rand.Float32() < 0.8 {
                result := fmt.Sprintf("操作 %d 的结果", id)
                future.Complete(result)
            } else {
                future.Fail(fmt.Errorf("操作 %d 失败", id))
            }
        }()
        
        return future
    }
    
    // 组合多个异步操作
    combineAsync := func(futures ...*Future) *Future {
        combinedFuture := NewFuture()
        
        go func() {
            results := make([]interface{}, len(futures))
            
            for i, future := range futures {
                result, err := future.Get(5 * time.Second)
                if err != nil {
                    combinedFuture.Fail(fmt.Errorf("组合操作失败: %v", err))
                    return
                }
                results[i] = result
            }
            
            combinedFuture.Complete(results)
        }()
        
        return combinedFuture
    }
    
    // 演示异步组合
    fmt.Println("启动多个异步操作:")
    
    future1 := asyncOperation(1, 200*time.Millisecond)
    future2 := asyncOperation(2, 300*time.Millisecond)
    future3 := asyncOperation(3, 150*time.Millisecond)
    
    // 链式组合
    transformedFuture := future1.Then(func(result interface{}) interface{} {
        return fmt.Sprintf("转换后: %s", result)
    })
    
    // 并行组合
    combinedFuture := combineAsync(future2, future3)
    
    // 获取结果
    if result, err := transformedFuture.Get(time.Second); err != nil {
        fmt.Printf("转换操作失败: %v\n", err)
    } else {
        fmt.Printf("转换操作成功: %v\n", result)
    }
    
    if result, err := combinedFuture.Get(time.Second); err != nil {
        fmt.Printf("组合操作失败: %v\n", err)
    } else {
        fmt.Printf("组合操作成功: %v\n", result)
    }
}

func demonstrateBackpressure() {
    fmt.Println("\n--- Backpressure 背压控制 ---")
    
    /*
    背压控制:
    - 当消费者处理速度跟不上生产者时的处理策略
    - Buffer:缓冲策略
    - Drop:丢弃策略  
    - Block:阻塞策略
    */
    
    type BackpressureStrategy int
    
    const (
        Buffer BackpressureStrategy = iota
        DropOldest
        DropNewest
        Block
    )
    
    type BackpressureStream struct {
        input    chan interface{}
        output   chan interface{}
        strategy BackpressureStrategy
        buffer   []interface{}
        maxSize  int
        mutex    sync.Mutex
        stats    struct {
            produced int64
            consumed int64
            dropped  int64
        }
    }
    
    func NewBackpressureStream(strategy BackpressureStrategy, bufferSize int) *BackpressureStream {
        bs := &BackpressureStream{
            input:    make(chan interface{}),
            output:   make(chan interface{}),
            strategy: strategy,
            buffer:   make([]interface{}, 0, bufferSize),
            maxSize:  bufferSize,
        }
        
        go bs.run()
        return bs
    }
    
    func (bs *BackpressureStream) Send(item interface{}) bool {
        select {
        case bs.input <- item:
            atomic.AddInt64(&bs.stats.produced, 1)
            return true
        default:
            // 输入通道满
            if bs.strategy == Block {
                bs.input <- item
                atomic.AddInt64(&bs.stats.produced, 1)
                return true
            }
            return false
        }
    }
    
    func (bs *BackpressureStream) Receive() <-chan interface{} {
        return bs.output
    }
    
    func (bs *BackpressureStream) run() {
        defer close(bs.output)
        
        for item := range bs.input {
            bs.mutex.Lock()
            
            switch bs.strategy {
            case Buffer:
                if len(bs.buffer) < bs.maxSize {
                    bs.buffer = append(bs.buffer, item)
                } else {
                    // 缓冲区满,阻塞
                    bs.mutex.Unlock()
                    bs.output <- item
                    atomic.AddInt64(&bs.stats.consumed, 1)
                    continue
                }
                
            case DropOldest:
                if len(bs.buffer) < bs.maxSize {
                    bs.buffer = append(bs.buffer, item)
                } else {
                    // 丢弃最老的
                    bs.buffer = append(bs.buffer[1:], item)
                    atomic.AddInt64(&bs.stats.dropped, 1)
                }
                
            case DropNewest:
                if len(bs.buffer) < bs.maxSize {
                    bs.buffer = append(bs.buffer, item)
                } else {
                    // 丢弃最新的
                    atomic.AddInt64(&bs.stats.dropped, 1)
                }
                
            case Block:
                bs.buffer = append(bs.buffer, item)
            }
            
            // 尝试发送缓冲的数据
            for len(bs.buffer) > 0 {
                select {
                case bs.output <- bs.buffer[0]:
                    bs.buffer = bs.buffer[1:]
                    atomic.AddInt64(&bs.stats.consumed, 1)
                default:
                    // 输出通道满,等待下次
                    bs.mutex.Unlock()
                    goto next
                }
            }
            
            bs.mutex.Unlock()
        next:
        }
    }
    
    func (bs *BackpressureStream) GetStats() (int64, int64, int64) {
        return atomic.LoadInt64(&bs.stats.produced),
               atomic.LoadInt64(&bs.stats.consumed),
               atomic.LoadInt64(&bs.stats.dropped)
    }
    
    // 测试不同的背压策略
    strategies := []struct {
        name     string
        strategy BackpressureStrategy
    }{
        {"Buffer", Buffer},
        {"DropOldest", DropOldest},
        {"DropNewest", DropNewest},
    }
    
    for _, s := range strategies {
        fmt.Printf("\n测试 %s 策略:\n", s.name)
        
        stream := NewBackpressureStream(s.strategy, 5)
        
        // 快速生产者
        go func() {
            for i := 0; i < 20; i++ {
                if stream.Send(fmt.Sprintf("item-%d", i)) {
                    fmt.Printf("发送: item-%d\n", i)
                } else {
                    fmt.Printf("发送失败: item-%d\n", i)
                }
                time.Sleep(10 * time.Millisecond)
            }
            close(stream.input)
        }()
        
        // 慢速消费者
        go func() {
            for item := range stream.Receive() {
                fmt.Printf("  接收: %v\n", item)
                time.Sleep(50 * time.Millisecond) // 消费比生产慢
            }
        }()
        
        time.Sleep(2 * time.Second)
        
        produced, consumed, dropped := stream.GetStats()
        fmt.Printf("统计 - 生产: %d, 消费: %d, 丢弃: %d\n", 
            produced, consumed, dropped)
    }
}

func main() {
    demonstrateConcurrencyPatterns()
    demonstrateAdvancedPatterns()
    demonstrateReactivePatterns()
}

:::

🎯 核心知识点总结

基础并发模式要点

  1. Worker Pool: 固定数量的worker goroutine处理任务队列
  2. Pipeline: 将复杂处理分解为多个阶段的流水线
  3. Fan-in/Fan-out: 分发任务和聚合结果的扇形模式
  4. Producer-Consumer: 通过缓冲区解耦生产者和消费者

高级控制模式要点

  1. Circuit Breaker: 监控失败率,达到阈值时快速失败
  2. Rate Limiter: 控制请求速率,防止系统过载
  3. Bulkhead: 资源隔离,防止级联失败
  4. Retry Pattern: 指数退避重试,抖动避免惊群

响应式模式要点

  1. Event Stream: 事件流处理,支持过滤和转换
  2. Observer Pattern: 观察者模式,一对多的依赖关系
  3. Async Composition: 异步操作的组合和链式调用
  4. Backpressure: 背压控制,处理生产消费速度不匹配

设计原则要点

  1. 解耦: 通过channel和interface解耦组件
  2. 并发安全: 正确使用同步原语保证线程安全
  3. 资源管理: 避免goroutine泄漏和资源泄漏
  4. 可观测性: 提供监控指标和错误处理

🔍 面试准备建议

  1. 掌握基础模式: 熟练实现和应用常见的并发设计模式
  2. 理解设计原则: 深入理解并发设计的核心原则和最佳实践
  3. 实践项目应用: 在实际项目中应用这些模式解决具体问题
  4. 性能分析: 理解不同模式的性能特征和适用场景
  5. 组合使用: 学会组合多种模式构建复杂的并发系统

正在精进