Go并发设计模式详解 - Golang并发编程面试题
Go并发设计模式是构建高效并发程序的基础。本章深入探讨常见的并发模式、设计原则和实现技巧。
📋 重点面试题
面试题 1:经典并发设计模式
难度级别:⭐⭐⭐⭐⭐
考察范围:并发设计/设计模式
技术标签:concurrency patterns design patterns goroutine patterns channel patterns
详细解答
1. 基础并发模式
点击查看完整代码实现
点击查看完整代码实现
go
package main
import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
"time"
)
func demonstrateConcurrencyPatterns() {
fmt.Println("=== Go并发设计模式演示 ===")
/*
经典并发模式分类:
1. 创建型模式:
- Worker Pool:工作池模式
- Pipeline:管道模式
- Fan-in/Fan-out:扇入扇出模式
2. 同步模式:
- Producer-Consumer:生产者消费者
- Publisher-Subscriber:发布订阅
- Request-Response:请求响应
3. 控制型模式:
- Circuit Breaker:熔断器
- Rate Limiter:限流器
- Timeout:超时控制
4. 数据流模式:
- Stream Processing:流处理
- Map-Reduce:映射归约
- Scatter-Gather:分散聚集
*/
// 演示各种并发模式
demonstrateWorkerPool()
demonstratePipeline()
demonstrateFanInFanOut()
demonstrateProducerConsumer()
}
func demonstrateWorkerPool() {
fmt.Println("\n--- Worker Pool 模式 ---")
/*
Worker Pool模式:
- 固定数量的worker goroutine
- 通过channel分发任务
- 控制并发度,避免goroutine泄漏
*/
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Error error
}
type WorkerPool struct {
workerCount int
jobQueue chan Job
resultQueue chan Result
quit chan bool
wg sync.WaitGroup
}
func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobQueue: make(chan Job, queueSize),
resultQueue: make(chan Result, queueSize),
quit: make(chan bool),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(workerID int) {
defer wp.wg.Done()
for {
select {
case job := <-wp.jobQueue:
result := wp.processJob(workerID, job)
wp.resultQueue <- result
case <-wp.quit:
fmt.Printf("Worker %d 退出\n", workerID)
return
}
}
}
func (wp *WorkerPool) processJob(workerID int, job Job) Result {
// 模拟工作处理
processingTime := time.Duration(rand.Intn(100)) * time.Millisecond
time.Sleep(processingTime)
output := fmt.Sprintf("Worker %d 处理了 Job %d: %s",
workerID, job.ID, job.Data)
return Result{
JobID: job.ID,
Output: output,
Error: nil,
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobQueue <- job
}
func (wp *WorkerPool) GetResult() Result {
return <-wp.resultQueue
}
func (wp *WorkerPool) Stop() {
close(wp.quit)
wp.wg.Wait()
close(wp.jobQueue)
close(wp.resultQueue)
}
// 演示Worker Pool使用
pool := NewWorkerPool(3, 10)
pool.Start()
// 提交任务
go func() {
for i := 0; i < 10; i++ {
job := Job{
ID: i,
Data: fmt.Sprintf("task-%d", i),
}
pool.Submit(job)
fmt.Printf("提交任务 %d\n", i)
}
}()
// 收集结果
go func() {
for i := 0; i < 10; i++ {
result := pool.GetResult()
fmt.Printf("收到结果: %s\n", result.Output)
}
}()
time.Sleep(2 * time.Second)
pool.Stop()
fmt.Println("Worker Pool 演示完成")
}
func demonstratePipeline() {
fmt.Println("\n--- Pipeline 模式 ---")
/*
Pipeline模式:
- 将复杂处理分解为多个阶段
- 每个阶段由独立的goroutine处理
- 通过channel连接各个阶段
*/
// 阶段1:数据生成
generateNumbers := func(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= 20; i++ {
select {
case out <- i:
fmt.Printf("生成: %d\n", i)
case <-ctx.Done():
return
}
time.Sleep(50 * time.Millisecond)
}
}()
return out
}
// 阶段2:数据转换
transform := func(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case num, ok := <-in:
if !ok {
return
}
transformed := num * num
fmt.Printf("转换: %d -> %d\n", num, transformed)
select {
case out <- transformed:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
// 阶段3:数据过滤
filter := func(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case num, ok := <-in:
if !ok {
return
}
if num%2 == 0 { // 只保留偶数
fmt.Printf("过滤保留: %d\n", num)
select {
case out <- num:
case <-ctx.Done():
return
}
} else {
fmt.Printf("过滤丢弃: %d\n", num)
}
case <-ctx.Done():
return
}
}
}()
return out
}
// 阶段4:结果收集
collect := func(ctx context.Context, in <-chan int) {
results := make([]int, 0)
for {
select {
case num, ok := <-in:
if !ok {
fmt.Printf("收集完成,结果: %v\n", results)
return
}
results = append(results, num)
fmt.Printf("收集: %d\n", num)
case <-ctx.Done():
fmt.Printf("收集被取消,当前结果: %v\n", results)
return
}
}
}
// 构建并运行管道
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 连接各个阶段
stage1 := generateNumbers(ctx)
stage2 := transform(ctx, stage1)
stage3 := filter(ctx, stage2)
// 收集最终结果
collect(ctx, stage3)
fmt.Println("Pipeline 演示完成")
}
func demonstrateFanInFanOut() {
fmt.Println("\n--- Fan-in/Fan-out 模式 ---")
/*
Fan-out模式:一个数据源分发给多个处理器
Fan-in模式:多个数据源合并到一个处理器
*/
// Fan-out:分发任务到多个worker
fanOut := func(ctx context.Context, in <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
outputs[i] = output
go func(workerID int, out chan<- int) {
defer close(out)
for {
select {
case data, ok := <-in:
if !ok {
fmt.Printf("Fan-out Worker %d 结束\n", workerID)
return
}
// 模拟处理
processed := data * (workerID + 1)
fmt.Printf("Worker %d 处理: %d -> %d\n",
workerID, data, processed)
select {
case out <- processed:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(i, output)
}
return outputs
}
// Fan-in:合并多个channel的输出
fanIn := func(ctx context.Context, inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
// 为每个输入channel启动一个goroutine
for i, input := range inputs {
wg.Add(1)
go func(id int, in <-chan int) {
defer wg.Done()
for {
select {
case data, ok := <-in:
if !ok {
fmt.Printf("Fan-in 输入 %d 结束\n", id)
return
}
select {
case output <- data:
fmt.Printf("Fan-in 合并来自输入 %d 的数据: %d\n",
id, data)
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(i, input)
}
// 等待所有输入完成后关闭输出
go func() {
wg.Wait()
close(output)
fmt.Println("Fan-in 合并完成")
}()
return output
}
// 数据源
source := func(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= 10; i++ {
select {
case out <- i:
fmt.Printf("源数据: %d\n", i)
case <-ctx.Done():
return
}
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 构建Fan-out/Fan-in管道
input := source(ctx)
outputs := fanOut(ctx, input, 3) // 分发给3个worker
merged := fanIn(ctx, outputs...) // 合并结果
// 收集最终结果
results := make([]int, 0)
for result := range merged {
results = append(results, result)
}
fmt.Printf("Fan-in/Fan-out 最终结果: %v\n", results)
}
func demonstrateProducerConsumer() {
fmt.Println("\n--- Producer-Consumer 模式 ---")
/*
Producer-Consumer模式:
- 生产者生成数据
- 消费者消费数据
- 通过缓冲区解耦
*/
type Message struct {
ID int
Content string
Timestamp time.Time
}
// 生产者
producer := func(ctx context.Context, id int, output chan<- Message) {
defer func() {
fmt.Printf("生产者 %d 退出\n", id)
}()
messageID := 0
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
message := Message{
ID: messageID,
Content: fmt.Sprintf("来自生产者 %d 的消息 %d", id, messageID),
Timestamp: time.Now(),
}
select {
case output <- message:
fmt.Printf("生产者 %d 发送: %s\n", id, message.Content)
messageID++
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// 消费者
consumer := func(ctx context.Context, id int, input <-chan Message) {
defer func() {
fmt.Printf("消费者 %d 退出\n", id)
}()
processed := 0
for {
select {
case message, ok := <-input:
if !ok {
fmt.Printf("消费者 %d 处理了 %d 条消息\n", id, processed)
return
}
// 模拟处理时间
processingTime := time.Duration(rand.Intn(300)) * time.Millisecond
time.Sleep(processingTime)
fmt.Printf("消费者 %d 处理: %s (耗时: %v)\n",
id, message.Content, processingTime)
processed++
case <-ctx.Done():
fmt.Printf("消费者 %d 被取消,已处理 %d 条消息\n", id, processed)
return
}
}
}
// 创建带缓冲的channel作为消息队列
messageQueue := make(chan Message, 5)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var wg sync.WaitGroup
// 启动2个生产者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
producer(ctx, id, messageQueue)
}(i)
}
// 启动3个消费者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
consumer(ctx, id, messageQueue)
}(i)
}
// 等待超时
<-ctx.Done()
close(messageQueue)
wg.Wait()
fmt.Println("Producer-Consumer 演示完成")
}:::
面试题 2:高级并发控制模式
难度级别:⭐⭐⭐⭐⭐
考察范围:并发控制/可靠性设计
技术标签:circuit breaker rate limiter bulkhead retry patterns
详细解答
1. 并发控制和稳定性模式
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateAdvancedPatterns() {
fmt.Println("\n=== 高级并发控制模式 ===")
/*
高级并发控制模式:
1. 稳定性模式:
- Circuit Breaker:熔断器
- Bulkhead:舱壁隔离
- Timeout:超时控制
2. 流控模式:
- Rate Limiter:限流器
- Semaphore:信号量
- Throttling:节流
3. 重试模式:
- Exponential Backoff:指数退避
- Jitter:抖动
- Dead Letter Queue:死信队列
*/
demonstrateCircuitBreaker()
demonstrateRateLimiter()
demonstrateBulkhead()
demonstrateRetryPattern()
}
func demonstrateCircuitBreaker() {
fmt.Println("\n--- Circuit Breaker 模式 ---")
/*
Circuit Breaker模式:
- 监控服务调用失败率
- 达到阈值时"开路",快速失败
- 定期尝试恢复"闭路"状态
*/
type CircuitBreakerState int
const (
Closed CircuitBreakerState = iota
Open
HalfOpen
)
func (s CircuitBreakerState) String() string {
switch s {
case Closed:
return "CLOSED"
case Open:
return "OPEN"
case HalfOpen:
return "HALF-OPEN"
default:
return "UNKNOWN"
}
}
type CircuitBreaker struct {
maxFailures int
timeout time.Duration
state CircuitBreakerState
failures int
lastFailureTime time.Time
mutex sync.RWMutex
}
func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
timeout: timeout,
state: Closed,
}
}
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mutex.Lock()
defer cb.mutex.Unlock()
// 检查是否可以调用
if !cb.canCall() {
return fmt.Errorf("circuit breaker is %s", cb.state)
}
// 执行调用
err := fn()
// 记录结果
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) canCall() bool {
switch cb.state {
case Closed:
return true
case Open:
// 检查是否可以转到半开状态
if time.Since(cb.lastFailureTime) > cb.timeout {
cb.state = HalfOpen
fmt.Printf("熔断器状态: %s -> %s\n", Open, HalfOpen)
return true
}
return false
case HalfOpen:
return true
default:
return false
}
}
func (cb *CircuitBreaker) recordResult(err error) {
if err != nil {
cb.failures++
cb.lastFailureTime = time.Now()
if cb.state == HalfOpen || cb.failures >= cb.maxFailures {
cb.state = Open
fmt.Printf("熔断器打开: 失败次数 %d, 状态: %s\n",
cb.failures, cb.state)
}
} else {
// 成功调用
if cb.state == HalfOpen {
cb.state = Closed
cb.failures = 0
fmt.Printf("熔断器关闭: 状态 %s, 失败计数重置\n", cb.state)
}
}
}
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.state
}
// 模拟不稳定的服务
unreliableService := func() error {
// 70%的失败率
if rand.Float32() < 0.7 {
return fmt.Errorf("service failure")
}
return nil
}
// 演示熔断器使用
cb := NewCircuitBreaker(3, 2*time.Second)
for i := 0; i < 15; i++ {
err := cb.Call(unreliableService)
if err != nil {
fmt.Printf("调用 %d: 失败 - %v (状态: %s)\n",
i+1, err, cb.GetState())
} else {
fmt.Printf("调用 %d: 成功 (状态: %s)\n",
i+1, cb.GetState())
}
time.Sleep(300 * time.Millisecond)
}
fmt.Println("Circuit Breaker 演示完成")
}
func demonstrateRateLimiter() {
fmt.Println("\n--- Rate Limiter 模式 ---")
/*
Rate Limiter模式:
- 控制请求速率
- 防止系统过载
- 支持突发流量
*/
// Token Bucket算法实现
type TokenBucket struct {
capacity int
tokens int
refillRate time.Duration
lastRefill time.Time
mutex sync.Mutex
}
func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) TryConsume() bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
tokensToAdd := int(elapsed / tb.refillRate)
if tokensToAdd > 0 {
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastRefill = now
}
}
func (tb *TokenBucket) GetTokens() int {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
return tb.tokens
}
// Sliding Window算法实现
type SlidingWindowLimiter struct {
maxRequests int
window time.Duration
requests []time.Time
mutex sync.Mutex
}
func NewSlidingWindowLimiter(maxRequests int, window time.Duration) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
maxRequests: maxRequests,
window: window,
requests: make([]time.Time, 0),
}
}
func (swl *SlidingWindowLimiter) TryConsume() bool {
swl.mutex.Lock()
defer swl.mutex.Unlock()
now := time.Now()
cutoff := now.Add(-swl.window)
// 移除窗口外的请求
var validRequests []time.Time
for _, req := range swl.requests {
if req.After(cutoff) {
validRequests = append(validRequests, req)
}
}
swl.requests = validRequests
// 检查是否可以添加新请求
if len(swl.requests) < swl.maxRequests {
swl.requests = append(swl.requests, now)
return true
}
return false
}
func (swl *SlidingWindowLimiter) GetCurrentRequests() int {
swl.mutex.Lock()
defer swl.mutex.Unlock()
now := time.Now()
cutoff := now.Add(-swl.window)
count := 0
for _, req := range swl.requests {
if req.After(cutoff) {
count++
}
}
return count
}
// 演示限流器
fmt.Println("Token Bucket 限流器测试:")
tokenBucket := NewTokenBucket(5, 200*time.Millisecond)
for i := 0; i < 10; i++ {
if tokenBucket.TryConsume() {
fmt.Printf("请求 %d: 通过 (剩余令牌: %d)\n",
i+1, tokenBucket.GetTokens())
} else {
fmt.Printf("请求 %d: 被限流 (剩余令牌: %d)\n",
i+1, tokenBucket.GetTokens())
}
time.Sleep(100 * time.Millisecond)
}
fmt.Println("\nSliding Window 限流器测试:")
slidingWindow := NewSlidingWindowLimiter(3, time.Second)
for i := 0; i < 8; i++ {
if slidingWindow.TryConsume() {
fmt.Printf("请求 %d: 通过 (窗口内请求: %d)\n",
i+1, slidingWindow.GetCurrentRequests())
} else {
fmt.Printf("请求 %d: 被限流 (窗口内请求: %d)\n",
i+1, slidingWindow.GetCurrentRequests())
}
time.Sleep(200 * time.Millisecond)
}
}
func demonstrateBulkhead() {
fmt.Println("\n--- Bulkhead 隔离模式 ---")
/*
Bulkhead模式:
- 资源隔离,防止级联失败
- 为不同服务分配独立资源池
- 一个服务故障不影响其他服务
*/
type ResourcePool struct {
name string
resources chan struct{}
timeout time.Duration
}
func NewResourcePool(name string, size int, timeout time.Duration) *ResourcePool {
pool := &ResourcePool{
name: name,
resources: make(chan struct{}, size),
timeout: timeout,
}
// 填充资源池
for i := 0; i < size; i++ {
pool.resources <- struct{}{}
}
return pool
}
func (rp *ResourcePool) Acquire() error {
select {
case <-rp.resources:
return nil
case <-time.After(rp.timeout):
return fmt.Errorf("acquire timeout for %s", rp.name)
}
}
func (rp *ResourcePool) Release() {
select {
case rp.resources <- struct{}{}:
default:
// 池满,可能资源泄漏
fmt.Printf("警告: %s 资源池已满\n", rp.name)
}
}
func (rp *ResourcePool) Available() int {
return len(rp.resources)
}
// 服务隔离管理器
type BulkheadManager struct {
pools map[string]*ResourcePool
mutex sync.RWMutex
}
func NewBulkheadManager() *BulkheadManager {
return &BulkheadManager{
pools: make(map[string]*ResourcePool),
}
}
func (bm *BulkheadManager) AddPool(name string, size int, timeout time.Duration) {
bm.mutex.Lock()
defer bm.mutex.Unlock()
bm.pools[name] = NewResourcePool(name, size, timeout)
}
func (bm *BulkheadManager) Execute(poolName string, fn func() error) error {
bm.mutex.RLock()
pool, exists := bm.pools[poolName]
bm.mutex.RUnlock()
if !exists {
return fmt.Errorf("pool %s not found", poolName)
}
// 获取资源
if err := pool.Acquire(); err != nil {
return err
}
defer pool.Release()
// 执行任务
return fn()
}
func (bm *BulkheadManager) GetPoolStatus() map[string]int {
bm.mutex.RLock()
defer bm.mutex.RUnlock()
status := make(map[string]int)
for name, pool := range bm.pools {
status[name] = pool.Available()
}
return status
}
// 演示舱壁隔离
manager := NewBulkheadManager()
// 为不同服务创建隔离的资源池
manager.AddPool("user-service", 3, time.Second)
manager.AddPool("payment-service", 2, time.Second)
manager.AddPool("notification-service", 5, time.Second)
var wg sync.WaitGroup
// 模拟不同服务的并发请求
services := []string{"user-service", "payment-service", "notification-service"}
for _, service := range services {
for i := 0; i < 5; i++ {
wg.Add(1)
go func(svc string, reqID int) {
defer wg.Done()
err := manager.Execute(svc, func() error {
// 模拟服务处理时间
delay := time.Duration(rand.Intn(500)) * time.Millisecond
time.Sleep(delay)
fmt.Printf("%s 请求 %d 完成 (耗时: %v)\n",
svc, reqID, delay)
return nil
})
if err != nil {
fmt.Printf("%s 请求 %d 失败: %v\n", svc, reqID, err)
}
}(service, i)
time.Sleep(50 * time.Millisecond)
}
}
// 监控资源池状态
go func() {
for i := 0; i < 10; i++ {
status := manager.GetPoolStatus()
fmt.Printf("资源池状态: %v\n", status)
time.Sleep(200 * time.Millisecond)
}
}()
wg.Wait()
fmt.Println("Bulkhead 隔离演示完成")
}
func demonstrateRetryPattern() {
fmt.Println("\n--- Retry 重试模式 ---")
/*
Retry模式:
- 指数退避重试
- 抖动避免惊群
- 最大重试次数限制
*/
type RetryConfig struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
Multiplier float64
Jitter bool
}
func RetryWithBackoff(config RetryConfig, fn func() error) error {
var lastErr error
delay := config.BaseDelay
for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
err := fn()
if err == nil {
if attempt > 1 {
fmt.Printf("重试成功,尝试次数: %d\n", attempt)
}
return nil
}
lastErr = err
fmt.Printf("尝试 %d/%d 失败: %v\n",
attempt, config.MaxAttempts, err)
if attempt == config.MaxAttempts {
break
}
// 计算下次重试延迟
actualDelay := delay
if config.Jitter {
// 添加±25%的抖动
jitter := time.Duration(rand.Float64() * 0.5 * float64(delay))
if rand.Float64() < 0.5 {
actualDelay -= jitter
} else {
actualDelay += jitter
}
}
fmt.Printf("等待 %v 后重试...\n", actualDelay)
time.Sleep(actualDelay)
// 指数退避
delay = time.Duration(float64(delay) * config.Multiplier)
if delay > config.MaxDelay {
delay = config.MaxDelay
}
}
return fmt.Errorf("重试 %d 次后仍然失败: %v",
config.MaxAttempts, lastErr)
}
// 模拟不稳定的操作
attempt := 0
unstableOperation := func() error {
attempt++
// 前3次必定失败,第4次成功
if attempt < 4 {
return fmt.Errorf("operation failed (attempt %d)", attempt)
}
return nil
}
// 测试重试机制
config := RetryConfig{
MaxAttempts: 5,
BaseDelay: 100 * time.Millisecond,
MaxDelay: 2 * time.Second,
Multiplier: 2.0,
Jitter: true,
}
fmt.Println("开始重试操作:")
err := RetryWithBackoff(config, unstableOperation)
if err != nil {
fmt.Printf("最终失败: %v\n", err)
} else {
fmt.Println("操作最终成功")
}
}
func main() {
demonstrateConcurrencyPatterns()
demonstrateAdvancedPatterns()
}:::
面试题 3:响应式编程模式
难度级别:⭐⭐⭐⭐⭐
考察范围:异步编程/事件驱动
技术标签:reactive programming event-driven stream processing async patterns
详细解答
1. 响应式和事件驱动模式
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateReactivePatterns() {
fmt.Println("\n=== 响应式编程模式 ===")
/*
响应式编程模式:
1. 事件流处理:
- Event Stream:事件流
- Observer:观察者模式
- Subject:主题模式
2. 异步组合:
- Future/Promise:异步结果
- Compose:组合操作
- Transform:转换操作
3. 背压控制:
- Backpressure:背压处理
- Buffer:缓冲策略
- Drop:丢弃策略
*/
demonstrateEventStream()
demonstrateObserverPattern()
demonstrateAsyncComposition()
demonstrateBackpressure()
}
func demonstrateEventStream() {
fmt.Println("\n--- Event Stream 事件流 ---")
type Event struct {
Type string
Data interface{}
Timestamp time.Time
}
type EventStream struct {
subscribers []chan Event
buffer []Event
maxBuffer int
mutex sync.RWMutex
closed bool
}
func NewEventStream(bufferSize int) *EventStream {
return &EventStream{
subscribers: make([]chan Event, 0),
buffer: make([]Event, 0, bufferSize),
maxBuffer: bufferSize,
}
}
func (es *EventStream) Subscribe() <-chan Event {
es.mutex.Lock()
defer es.mutex.Unlock()
if es.closed {
ch := make(chan Event)
close(ch)
return ch
}
subscriber := make(chan Event, 10)
es.subscribers = append(es.subscribers, subscriber)
// 发送缓冲的事件
for _, event := range es.buffer {
select {
case subscriber <- event:
default:
// 订阅者满了,跳过
}
}
return subscriber
}
func (es *EventStream) Publish(event Event) {
es.mutex.RLock()
defer es.mutex.RUnlock()
if es.closed {
return
}
event.Timestamp = time.Now()
// 添加到缓冲区
if len(es.buffer) < es.maxBuffer {
es.buffer = append(es.buffer, event)
} else {
// 缓冲区满,移除最老的事件
es.buffer = append(es.buffer[1:], event)
}
// 发送给所有订阅者
for i, subscriber := range es.subscribers {
select {
case subscriber <- event:
default:
// 订阅者阻塞,考虑移除
fmt.Printf("订阅者 %d 阻塞,跳过事件\n", i)
}
}
}
func (es *EventStream) Close() {
es.mutex.Lock()
defer es.mutex.Unlock()
es.closed = true
for _, subscriber := range es.subscribers {
close(subscriber)
}
es.subscribers = nil
}
// 事件过滤器
func (es *EventStream) Filter(predicate func(Event) bool) *EventStream {
filtered := NewEventStream(es.maxBuffer)
go func() {
subscriber := es.Subscribe()
for event := range subscriber {
if predicate(event) {
filtered.Publish(event)
}
}
filtered.Close()
}()
return filtered
}
// 事件转换器
func (es *EventStream) Map(transform func(Event) Event) *EventStream {
mapped := NewEventStream(es.maxBuffer)
go func() {
subscriber := es.Subscribe()
for event := range subscriber {
transformed := transform(event)
mapped.Publish(transformed)
}
mapped.Close()
}()
return mapped
}
// 演示事件流
stream := NewEventStream(5)
// 创建过滤流:只处理"user"类型事件
userStream := stream.Filter(func(e Event) bool {
return e.Type == "user"
})
// 创建转换流:添加处理时间戳
processedStream := userStream.Map(func(e Event) Event {
return Event{
Type: e.Type + "_processed",
Data: map[string]interface{}{
"original": e.Data,
"processed_at": time.Now(),
},
Timestamp: e.Timestamp,
}
})
// 订阅处理后的事件
go func() {
subscriber := processedStream.Subscribe()
for event := range subscriber {
fmt.Printf("处理后事件: %s, 数据: %v\n",
event.Type, event.Data)
}
}()
// 发布事件
events := []Event{
{Type: "user", Data: "用户登录"},
{Type: "system", Data: "系统启动"},
{Type: "user", Data: "用户注销"},
{Type: "order", Data: "新订单"},
{Type: "user", Data: "用户注册"},
}
for _, event := range events {
stream.Publish(event)
fmt.Printf("发布事件: %s\n", event.Type)
time.Sleep(200 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
stream.Close()
}
func demonstrateObserverPattern() {
fmt.Println("\n--- Observer 观察者模式 ---")
type Observer interface {
Update(event interface{})
ID() string
}
type Subject interface {
Attach(observer Observer)
Detach(observer Observer)
Notify(event interface{})
}
// 具体主题实现
type EventSubject struct {
observers []Observer
mutex sync.RWMutex
}
func NewEventSubject() *EventSubject {
return &EventSubject{
observers: make([]Observer, 0),
}
}
func (es *EventSubject) Attach(observer Observer) {
es.mutex.Lock()
defer es.mutex.Unlock()
es.observers = append(es.observers, observer)
fmt.Printf("观察者 %s 已添加\n", observer.ID())
}
func (es *EventSubject) Detach(observer Observer) {
es.mutex.Lock()
defer es.mutex.Unlock()
for i, obs := range es.observers {
if obs.ID() == observer.ID() {
es.observers = append(es.observers[:i], es.observers[i+1:]...)
fmt.Printf("观察者 %s 已移除\n", observer.ID())
break
}
}
}
func (es *EventSubject) Notify(event interface{}) {
es.mutex.RLock()
observers := make([]Observer, len(es.observers))
copy(observers, es.observers)
es.mutex.RUnlock()
// 并发通知所有观察者
var wg sync.WaitGroup
for _, observer := range observers {
wg.Add(1)
go func(obs Observer) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("观察者 %s 处理事件时出现异常: %v\n",
obs.ID(), r)
}
}()
obs.Update(event)
}(observer)
}
wg.Wait()
}
// 具体观察者实现
type LogObserver struct {
id string
}
func (lo *LogObserver) Update(event interface{}) {
fmt.Printf("[LOG] %s: %v\n", lo.id, event)
time.Sleep(50 * time.Millisecond) // 模拟处理时间
}
func (lo *LogObserver) ID() string {
return lo.id
}
type EmailObserver struct {
id string
}
func (eo *EmailObserver) Update(event interface{}) {
fmt.Printf("[EMAIL] %s: 发送邮件通知 - %v\n", eo.id, event)
time.Sleep(100 * time.Millisecond) // 模拟发送时间
}
func (eo *EmailObserver) ID() string {
return eo.id
}
type MetricsObserver struct {
id string
counter int64
}
func (mo *MetricsObserver) Update(event interface{}) {
atomic.AddInt64(&mo.counter, 1)
fmt.Printf("[METRICS] %s: 事件计数 - %d\n",
mo.id, atomic.LoadInt64(&mo.counter))
}
func (mo *MetricsObserver) ID() string {
return mo.id
}
// 演示观察者模式
subject := NewEventSubject()
// 创建观察者
logObs := &LogObserver{id: "logger"}
emailObs := &EmailObserver{id: "emailer"}
metricsObs := &MetricsObserver{id: "metrics"}
// 注册观察者
subject.Attach(logObs)
subject.Attach(emailObs)
subject.Attach(metricsObs)
// 发送事件
events := []interface{}{
"用户登录事件",
"订单创建事件",
"支付完成事件",
"系统警告事件",
}
for i, event := range events {
fmt.Printf("\n--- 事件 %d ---\n", i+1)
subject.Notify(event)
time.Sleep(200 * time.Millisecond)
}
// 移除一个观察者
subject.Detach(emailObs)
fmt.Printf("\n--- 移除邮件观察者后 ---\n")
subject.Notify("最终事件")
}
func demonstrateAsyncComposition() {
fmt.Println("\n--- Async Composition 异步组合 ---")
// Future/Promise实现
type Future struct {
result chan interface{}
err chan error
done chan bool
}
func NewFuture() *Future {
return &Future{
result: make(chan interface{}, 1),
err: make(chan error, 1),
done: make(chan bool, 1),
}
}
func (f *Future) Complete(value interface{}) {
select {
case f.result <- value:
close(f.done)
default:
}
}
func (f *Future) Fail(err error) {
select {
case f.err <- err:
close(f.done)
default:
}
}
func (f *Future) Get(timeout time.Duration) (interface{}, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case result := <-f.result:
return result, nil
case err := <-f.err:
return nil, err
case <-timer.C:
return nil, fmt.Errorf("timeout after %v", timeout)
}
}
func (f *Future) Then(transform func(interface{}) interface{}) *Future {
nextFuture := NewFuture()
go func() {
result, err := f.Get(10 * time.Second)
if err != nil {
nextFuture.Fail(err)
return
}
transformed := transform(result)
nextFuture.Complete(transformed)
}()
return nextFuture
}
// 异步操作函数
asyncOperation := func(id int, delay time.Duration) *Future {
future := NewFuture()
go func() {
time.Sleep(delay)
// 模拟随机成功/失败
if rand.Float32() < 0.8 {
result := fmt.Sprintf("操作 %d 的结果", id)
future.Complete(result)
} else {
future.Fail(fmt.Errorf("操作 %d 失败", id))
}
}()
return future
}
// 组合多个异步操作
combineAsync := func(futures ...*Future) *Future {
combinedFuture := NewFuture()
go func() {
results := make([]interface{}, len(futures))
for i, future := range futures {
result, err := future.Get(5 * time.Second)
if err != nil {
combinedFuture.Fail(fmt.Errorf("组合操作失败: %v", err))
return
}
results[i] = result
}
combinedFuture.Complete(results)
}()
return combinedFuture
}
// 演示异步组合
fmt.Println("启动多个异步操作:")
future1 := asyncOperation(1, 200*time.Millisecond)
future2 := asyncOperation(2, 300*time.Millisecond)
future3 := asyncOperation(3, 150*time.Millisecond)
// 链式组合
transformedFuture := future1.Then(func(result interface{}) interface{} {
return fmt.Sprintf("转换后: %s", result)
})
// 并行组合
combinedFuture := combineAsync(future2, future3)
// 获取结果
if result, err := transformedFuture.Get(time.Second); err != nil {
fmt.Printf("转换操作失败: %v\n", err)
} else {
fmt.Printf("转换操作成功: %v\n", result)
}
if result, err := combinedFuture.Get(time.Second); err != nil {
fmt.Printf("组合操作失败: %v\n", err)
} else {
fmt.Printf("组合操作成功: %v\n", result)
}
}
func demonstrateBackpressure() {
fmt.Println("\n--- Backpressure 背压控制 ---")
/*
背压控制:
- 当消费者处理速度跟不上生产者时的处理策略
- Buffer:缓冲策略
- Drop:丢弃策略
- Block:阻塞策略
*/
type BackpressureStrategy int
const (
Buffer BackpressureStrategy = iota
DropOldest
DropNewest
Block
)
type BackpressureStream struct {
input chan interface{}
output chan interface{}
strategy BackpressureStrategy
buffer []interface{}
maxSize int
mutex sync.Mutex
stats struct {
produced int64
consumed int64
dropped int64
}
}
func NewBackpressureStream(strategy BackpressureStrategy, bufferSize int) *BackpressureStream {
bs := &BackpressureStream{
input: make(chan interface{}),
output: make(chan interface{}),
strategy: strategy,
buffer: make([]interface{}, 0, bufferSize),
maxSize: bufferSize,
}
go bs.run()
return bs
}
func (bs *BackpressureStream) Send(item interface{}) bool {
select {
case bs.input <- item:
atomic.AddInt64(&bs.stats.produced, 1)
return true
default:
// 输入通道满
if bs.strategy == Block {
bs.input <- item
atomic.AddInt64(&bs.stats.produced, 1)
return true
}
return false
}
}
func (bs *BackpressureStream) Receive() <-chan interface{} {
return bs.output
}
func (bs *BackpressureStream) run() {
defer close(bs.output)
for item := range bs.input {
bs.mutex.Lock()
switch bs.strategy {
case Buffer:
if len(bs.buffer) < bs.maxSize {
bs.buffer = append(bs.buffer, item)
} else {
// 缓冲区满,阻塞
bs.mutex.Unlock()
bs.output <- item
atomic.AddInt64(&bs.stats.consumed, 1)
continue
}
case DropOldest:
if len(bs.buffer) < bs.maxSize {
bs.buffer = append(bs.buffer, item)
} else {
// 丢弃最老的
bs.buffer = append(bs.buffer[1:], item)
atomic.AddInt64(&bs.stats.dropped, 1)
}
case DropNewest:
if len(bs.buffer) < bs.maxSize {
bs.buffer = append(bs.buffer, item)
} else {
// 丢弃最新的
atomic.AddInt64(&bs.stats.dropped, 1)
}
case Block:
bs.buffer = append(bs.buffer, item)
}
// 尝试发送缓冲的数据
for len(bs.buffer) > 0 {
select {
case bs.output <- bs.buffer[0]:
bs.buffer = bs.buffer[1:]
atomic.AddInt64(&bs.stats.consumed, 1)
default:
// 输出通道满,等待下次
bs.mutex.Unlock()
goto next
}
}
bs.mutex.Unlock()
next:
}
}
func (bs *BackpressureStream) GetStats() (int64, int64, int64) {
return atomic.LoadInt64(&bs.stats.produced),
atomic.LoadInt64(&bs.stats.consumed),
atomic.LoadInt64(&bs.stats.dropped)
}
// 测试不同的背压策略
strategies := []struct {
name string
strategy BackpressureStrategy
}{
{"Buffer", Buffer},
{"DropOldest", DropOldest},
{"DropNewest", DropNewest},
}
for _, s := range strategies {
fmt.Printf("\n测试 %s 策略:\n", s.name)
stream := NewBackpressureStream(s.strategy, 5)
// 快速生产者
go func() {
for i := 0; i < 20; i++ {
if stream.Send(fmt.Sprintf("item-%d", i)) {
fmt.Printf("发送: item-%d\n", i)
} else {
fmt.Printf("发送失败: item-%d\n", i)
}
time.Sleep(10 * time.Millisecond)
}
close(stream.input)
}()
// 慢速消费者
go func() {
for item := range stream.Receive() {
fmt.Printf(" 接收: %v\n", item)
time.Sleep(50 * time.Millisecond) // 消费比生产慢
}
}()
time.Sleep(2 * time.Second)
produced, consumed, dropped := stream.GetStats()
fmt.Printf("统计 - 生产: %d, 消费: %d, 丢弃: %d\n",
produced, consumed, dropped)
}
}
func main() {
demonstrateConcurrencyPatterns()
demonstrateAdvancedPatterns()
demonstrateReactivePatterns()
}:::
🎯 核心知识点总结
基础并发模式要点
- Worker Pool: 固定数量的worker goroutine处理任务队列
- Pipeline: 将复杂处理分解为多个阶段的流水线
- Fan-in/Fan-out: 分发任务和聚合结果的扇形模式
- Producer-Consumer: 通过缓冲区解耦生产者和消费者
高级控制模式要点
- Circuit Breaker: 监控失败率,达到阈值时快速失败
- Rate Limiter: 控制请求速率,防止系统过载
- Bulkhead: 资源隔离,防止级联失败
- Retry Pattern: 指数退避重试,抖动避免惊群
响应式模式要点
- Event Stream: 事件流处理,支持过滤和转换
- Observer Pattern: 观察者模式,一对多的依赖关系
- Async Composition: 异步操作的组合和链式调用
- Backpressure: 背压控制,处理生产消费速度不匹配
设计原则要点
- 解耦: 通过channel和interface解耦组件
- 并发安全: 正确使用同步原语保证线程安全
- 资源管理: 避免goroutine泄漏和资源泄漏
- 可观测性: 提供监控指标和错误处理
🔍 面试准备建议
- 掌握基础模式: 熟练实现和应用常见的并发设计模式
- 理解设计原则: 深入理解并发设计的核心原则和最佳实践
- 实践项目应用: 在实际项目中应用这些模式解决具体问题
- 性能分析: 理解不同模式的性能特征和适用场景
- 组合使用: 学会组合多种模式构建复杂的并发系统
