线程池
1. 简单的Goroutine池实现
go
import (
"context"
"errors"
"sync"
"sync/atomic"
)
// 任务接口
type Task interface {
Execute() error
}
// 函数任务
type FuncTask struct {
fn func() error
}
func (ft *FuncTask) Execute() error {
return ft.fn()
}
// NewFuncTask 创建函数任务
func NewFuncTask(fn func() error) Task {
return &FuncTask{fn: fn}
}
// 简单的Goroutine池
type SimpleGoroutinePool struct {
workers int
taskQueue chan Task
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
closed int32
}
func NewSimpleGoroutinePool(workers, queueSize int) *SimpleGoroutinePool {
ctx, cancel := context.WithCancel(context.Background())
pool := &SimpleGoroutinePool{
workers: workers,
taskQueue: make(chan Task, queueSize),
ctx: ctx,
cancel: cancel,
}
// 启动工作goroutine
for i := 0; i < workers; i++ {
pool.wg.Add(1)
go pool.worker(i)
}
return pool
}
func (p *SimpleGoroutinePool) worker(id int) {
defer p.wg.Done()
fmt.Printf("工作者 %d 启动\n", id)
defer fmt.Printf("工作者 %d 退出\n", id)
for {
select {
case task, ok := <-p.taskQueue:
if !ok {
return // 任务队列关闭
}
if err := task.Execute(); err != nil {
fmt.Printf("工作者 %d 执行任务失败: %v\n", id, err)
}
case <-p.ctx.Done():
return // 池被关闭
}
}
}
func (p *SimpleGoroutinePool) Submit(task Task) error {
if atomic.LoadInt32(&p.closed) == 1 {
return errors.New("pool is closed")
}
select {
case p.taskQueue <- task:
return nil
case <-p.ctx.Done():
return errors.New("pool is closed")
default:
return errors.New("task queue is full")
}
}
func (p *SimpleGoroutinePool) SubmitFunc(fn func() error) error {
return p.Submit(NewFuncTask(fn))
}
func (p *SimpleGoroutinePool) Close() {
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
return // 已经关闭
}
close(p.taskQueue)
p.cancel()
p.wg.Wait()
}
func demonstrateSimplePool() {
fmt.Println("\n=== 简单Goroutine池演示 ===")
// 创建池
pool := NewSimpleGoroutinePool(3, 10)
defer pool.Close()
// 提交任务
for i := 0; i < 10; i++ {
taskID := i
err := pool.SubmitFunc(func() error {
fmt.Printf("执行任务 %d\n", taskID)
time.Sleep(time.Duration(taskID*100) * time.Millisecond)
return nil
})
if err != nil {
fmt.Printf("提交任务 %d 失败: %v\n", taskID, err)
}
}
// 等待一段时间让任务完成
time.Sleep(2 * time.Second)
}::: :::
2. 高级Goroutine池实现
点击查看完整代码实现
点击查看完整代码实现
go
// 任务结果
type TaskResult struct {
Result interface{}
Error error
}
// 带结果的任务
type TaskWithResult struct {
fn func() (interface{}, error)
result chan TaskResult
}
func (t *TaskWithResult) Execute() error {
result, err := t.fn()
t.result <- TaskResult{Result: result, Error: err}
return err
}
// 高级Goroutine池
type AdvancedGoroutinePool struct {
workers int
maxWorkers int
taskQueue chan Task
workerQueue chan chan Task
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
closed int32
// 统计信息
submittedTasks int64
completedTasks int64
failedTasks int64
activeWorkers int64
}
func NewAdvancedGoroutinePool(minWorkers, maxWorkers, queueSize int) *AdvancedGoroutinePool {
ctx, cancel := context.WithCancel(context.Background())
pool := &AdvancedGoroutinePool{
workers: minWorkers,
maxWorkers: maxWorkers,
taskQueue: make(chan Task, queueSize),
workerQueue: make(chan chan Task, maxWorkers),
ctx: ctx,
cancel: cancel,
}
// 启动调度器
go pool.dispatcher()
// 启动最小数量的工作者
for i := 0; i < minWorkers; i++ {
go pool.startWorker(i)
}
return pool
}
func (p *AdvancedGoroutinePool) dispatcher() {
for {
select {
case task := <-p.taskQueue:
// 尝试获取可用的工作者
select {
case workerChannel := <-p.workerQueue:
// 有可用工作者,分配任务
workerChannel <- task
default:
// 没有可用工作者,检查是否可以创建新的
currentWorkers := atomic.LoadInt64(&p.activeWorkers)
if int(currentWorkers) < p.maxWorkers {
// 创建新的工作者
go p.startWorker(int(currentWorkers))
// 等待新工作者注册
workerChannel := <-p.workerQueue
workerChannel <- task
} else {
// 等待工作者可用
workerChannel := <-p.workerQueue
workerChannel <- task
}
}
case <-p.ctx.Done():
return
}
}
}
func (p *AdvancedGoroutinePool) startWorker(id int) {
atomic.AddInt64(&p.activeWorkers, 1)
defer atomic.AddInt64(&p.activeWorkers, -1)
p.wg.Add(1)
defer p.wg.Done()
fmt.Printf("高级工作者 %d 启动\n", id)
defer fmt.Printf("高级工作者 %d 退出\n", id)
// 工作者的任务通道
taskChannel := make(chan Task)
for {
// 注册到工作者队列
select {
case p.workerQueue <- taskChannel:
// 成功注册,等待任务
select {
case task := <-taskChannel:
// 执行任务
if err := task.Execute(); err != nil {
atomic.AddInt64(&p.failedTasks, 1)
fmt.Printf("高级工作者 %d 任务失败: %v\n", id, err)
} else {
atomic.AddInt64(&p.completedTasks, 1)
}
case <-p.ctx.Done():
return
}
case <-p.ctx.Done():
return
}
}
}
func (p *AdvancedGoroutinePool) Submit(task Task) error {
if atomic.LoadInt32(&p.closed) == 1 {
return errors.New("pool is closed")
}
atomic.AddInt64(&p.submittedTasks, 1)
select {
case p.taskQueue <- task:
return nil
case <-p.ctx.Done():
return errors.New("pool is closed")
default:
return errors.New("task queue is full")
}
}
func (p *AdvancedGoroutinePool) SubmitWithResult(fn func() (interface{}, error)) (interface{}, error) {
task := &TaskWithResult{
fn: fn,
result: make(chan TaskResult, 1),
}
if err := p.Submit(task); err != nil {
return nil, err
}
result := <-task.result
return result.Result, result.Error
}
func (p *AdvancedGoroutinePool) Stats() (submitted, completed, failed, active int64) {
return atomic.LoadInt64(&p.submittedTasks),
atomic.LoadInt64(&p.completedTasks),
atomic.LoadInt64(&p.failedTasks),
atomic.LoadInt64(&p.activeWorkers)
}
func (p *AdvancedGoroutinePool) Close() {
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
return
}
close(p.taskQueue)
p.cancel()
p.wg.Wait()
}
func demonstrateAdvancedPool() {
fmt.Println("\n=== 高级Goroutine池演示 ===")
// 创建高级池
pool := NewAdvancedGoroutinePool(2, 5, 20)
defer pool.Close()
// 提交不同类型的任务
var wg sync.WaitGroup
// 提交普通任务
for i := 0; i < 15; i++ {
taskID := i
pool.SubmitFunc(func() error {
fmt.Printf("执行普通任务 %d\n", taskID)
time.Sleep(time.Duration(taskID%5*100) * time.Millisecond)
return nil
})
}
// 提交带结果的任务
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
result, err := pool.SubmitWithResult(func() (interface{}, error) {
sum := 0
for j := 0; j < id*10; j++ {
sum += j
}
return sum, nil
})
if err != nil {
fmt.Printf("带结果任务 %d 失败: %v\n", id, err)
} else {
fmt.Printf("带结果任务 %d 结果: %v\n", id, result)
}
}(i)
}
// 定期打印统计信息
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
submitted, completed, failed, active := pool.Stats()
fmt.Printf("池统计 - 提交: %d, 完成: %d, 失败: %d, 活跃工作者: %d\n",
submitted, completed, failed, active)
case <-pool.ctx.Done():
return
}
}
}()
wg.Wait()
time.Sleep(2 * time.Second)
// 最终统计
submitted, completed, failed, active := pool.Stats()
fmt.Printf("最终统计 - 提交: %d, 完成: %d, 失败: %d, 活跃工作者: %d\n",
submitted, completed, failed, active)
}:::
3. 性能测试和对比
点击查看完整代码实现
go
func demonstratePoolPerformance() {
fmt.Println("\n=== Goroutine池性能测试 ===")
const numTasks = 10000
// 测试1:直接创建goroutine
start := time.Now()
testDirectGoroutines(numTasks)
directTime := time.Since(start)
// 测试2:使用简单池
start = time.Now()
testSimplePool(numTasks)
simplePoolTime := time.Since(start)
// 测试3:使用高级池
start = time.Now()
testAdvancedPool(numTasks)
advancedPoolTime := time.Since(start)
fmt.Printf("\n性能对比 (%d 个任务):\n", numTasks)
fmt.Printf("直接创建goroutine: %v\n", directTime)
fmt.Printf("简单池: %v (%.2fx)\n", simplePoolTime,
float64(directTime)/float64(simplePoolTime))
fmt.Printf("高级池: %v (%.2fx)\n", advancedPoolTime,
float64(directTime)/float64(advancedPoolTime))
}
func testDirectGoroutines(numTasks int) {
var wg sync.WaitGroup
for i := 0; i < numTasks; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 简单的计算任务
sum := 0
for j := 0; j < 100; j++ {
sum += j
}
}(i)
}
wg.Wait()
}
func testSimplePool(numTasks int) {
pool := NewSimpleGoroutinePool(10, 100)
defer pool.Close()
var wg sync.WaitGroup
for i := 0; i < numTasks; i++ {
wg.Add(1)
pool.SubmitFunc(func() error {
defer wg.Done()
// 简单的计算任务
sum := 0
for j := 0; j < 100; j++ {
sum += j
}
return nil
})
}
wg.Wait()
}
func testAdvancedPool(numTasks int) {
pool := NewAdvancedGoroutinePool(5, 15, 200)
defer pool.Close()
var wg sync.WaitGroup
for i := 0; i < numTasks; i++ {
wg.Add(1)
pool.SubmitFunc(func() error {
defer wg.Done()
// 简单的计算任务
sum := 0
for j := 0; j < 100; j++ {
sum += j
}
return nil
})
}
wg.Wait()
}
func main() {
demonstrateGoroutineBasics()
demonstrateGoroutineCharacteristics()
demonstrateGoroutineLifecycle()
demonstrateStackManagement()
demonstrateMemoryModel()
demonstrateSimplePool()
demonstrateAdvancedPool()
demonstratePoolPerformance()
}2. 安全的Channel模式
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateSafeChannelPatterns() {
fmt.Println("\n=== 安全的Channel模式 ===")
// 模式1:超时保护的channel操作
demonstrateTimeoutProtectedChannels()
// 模式2:带取消的channel操作
demonstrateCancelableChannels()
// 模式3:安全的生产者-消费者模式
demonstrateSafeProducerConsumer()
// 模式4:扇入扇出模式
demonstrateFanInFanOut()
}
func demonstrateTimeoutProtectedChannels() {
fmt.Println("\n--- 超时保护的Channel操作 ---")
// 发送端超时保护
ch := make(chan string, 1)
// 安全的发送操作
safeSend := func(ch chan<- string, msg string, timeout time.Duration) bool {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case ch <- msg:
fmt.Printf("成功发送: %s\n", msg)
return true
case <-timer.C:
fmt.Printf("发送超时: %s\n", msg)
return false
}
}
// 安全的接收操作
safeReceive := func(ch <-chan string, timeout time.Duration) (string, bool) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case msg := <-ch:
fmt.Printf("成功接收: %s\n", msg)
return msg, true
case <-timer.C:
fmt.Println("接收超时")
return "", false
}
}
// 测试场景
go func() {
time.Sleep(200 * time.Millisecond)
safeSend(ch, "延迟消息", 100*time.Millisecond) // 这会超时
}()
safeSend(ch, "即时消息", 500*time.Millisecond)
safeReceive(ch, 300*time.Millisecond)
safeReceive(ch, 100*time.Millisecond) // 这会超时
time.Sleep(300 * time.Millisecond)
}
func demonstrateCancelableChannels() {
fmt.Println("\n--- 带取消的Channel操作 ---")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
data := make(chan int, 5)
results := make(chan int, 5)
// 可取消的生产者
go func() {
defer close(data)
defer fmt.Println("生产者退出")
for i := 0; i < 10; i++ {
select {
case data <- i:
fmt.Printf("生产: %d\n", i)
time.Sleep(100 * time.Millisecond)
case <-ctx.Done():
fmt.Println("生产者被取消")
return
}
}
}()
// 可取消的消费者
go func() {
defer close(results)
defer fmt.Println("消费者退出")
for {
select {
case item, ok := <-data:
if !ok {
fmt.Println("数据通道关闭,消费者正常退出")
return
}
// 处理数据
processed := item * 2
select {
case results <- processed:
fmt.Printf("处理: %d -> %d\n", item, processed)
case <-ctx.Done():
fmt.Println("消费者被取消")
return
}
case <-ctx.Done():
fmt.Println("消费者被取消")
return
}
}
}()
// 收集结果
go func() {
defer fmt.Println("结果收集器退出")
for {
select {
case result, ok := <-results:
if !ok {
fmt.Println("结果通道关闭,收集器正常退出")
return
}
fmt.Printf("收集结果: %d\n", result)
case <-ctx.Done():
fmt.Println("结果收集器被取消")
return
}
}
}()
// 运行一段时间后取消
time.Sleep(500 * time.Millisecond)
fmt.Println("开始取消操作...")
cancel()
time.Sleep(200 * time.Millisecond)
}
func demonstrateSafeProducerConsumer() {
fmt.Println("\n--- 安全的生产者-消费者模式 ---")
type SafeQueue struct {
items chan interface{}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
NewSafeQueue := func(bufferSize int) *SafeQueue {
ctx, cancel := context.WithCancel(context.Background())
return &SafeQueue{
items: make(chan interface{}, bufferSize),
ctx: ctx,
cancel: cancel,
}
}
StartProducer := func(sq *SafeQueue, producer func() interface{}) {
sq.wg.Add(1)
go func() {
defer sq.wg.Done()
defer fmt.Println("生产者goroutine退出")
for {
select {
case <-sq.ctx.Done():
return
default:
item := producer()
if item == nil {
return // 生产者完成
}
select {
case sq.items <- item:
fmt.Printf("生产项目: %v\n", item)
case <-sq.ctx.Done():
return
}
}
}
}()
}
StartConsumer := func(sq *SafeQueue, consumer func(interface{})) {
sq.wg.Add(1)
go func() {
defer sq.wg.Done()
defer fmt.Println("消费者goroutine退出")
for {
select {
case item, ok := <-sq.items:
if !ok {
return // 队列关闭
}
consumer(item)
case <-sq.ctx.Done():
return
}
}
}()
}
Close := func(sq *SafeQueue) {
sq.cancel()
close(sq.items)
sq.wg.Wait()
}
// 使用安全队列
queue := NewSafeQueue(5)
// 启动生产者
counter := 0
StartProducer(queue, func() interface{} {
if counter >= 5 {
return nil // 完成生产
}
counter++
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("item-%d", counter)
})
// 启动消费者
StartConsumer(queue, func(item interface{}) {
fmt.Printf("消费项目: %v\n", item)
time.Sleep(150 * time.Millisecond)
})
// 等待一段时间后关闭
time.Sleep(1 * time.Second)
fmt.Println("关闭安全队列...")
Close(queue)
}
func demonstrateFanInFanOut() {
fmt.Println("\n--- 扇入扇出模式 ---")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// 扇出:将单个输入分发到多个worker
fanOut := func(ctx context.Context, input <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan int)
outputs[i] = output
go func(workerID int, out chan<- int) {
defer close(out)
defer fmt.Printf("Worker %d 退出\n", workerID)
for {
select {
case item, ok := <-input:
if !ok {
return
}
// 处理项目
processed := item * (workerID + 1)
fmt.Printf("Worker %d 处理: %d -> %d\n", workerID, item, processed)
select {
case out <- processed:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(i, output)
}
return outputs
}
// 扇入:将多个worker的输出合并到单个通道
fanIn := func(ctx context.Context, inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for i, input := range inputs {
wg.Add(1)
go func(inputID int, in <-chan int) {
defer wg.Done()
defer fmt.Printf("FanIn worker %d 退出\n", inputID)
for {
select {
case item, ok := <-in:
if !ok {
return
}
select {
case output <- item:
fmt.Printf("FanIn收集: %d (来自worker %d)\n", item, inputID)
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(i, input)
}
go func() {
wg.Wait()
close(output)
fmt.Println("FanIn输出通道关闭")
}()
return output
}
// 创建输入数据
input := make(chan int, 5)
go func() {
defer close(input)
for i := 1; i <= 6; i++ {
select {
case input <- i:
fmt.Printf("输入: %d\n", i)
time.Sleep(200 * time.Millisecond)
case <-ctx.Done():
return
}
}
}()
// 扇出到3个worker
workerOutputs := fanOut(ctx, input, 3)
// 扇入合并结果
finalOutput := fanIn(ctx, workerOutputs...)
// 收集最终结果
go func() {
defer fmt.Println("结果收集完成")
for {
select {
case result, ok := <-finalOutput:
if !ok {
return
}
fmt.Printf("最终结果: %d\n", result)
case <-ctx.Done():
fmt.Println("结果收集被取消")
return
}
}
}()
// 等待context超时
<-ctx.Done()
time.Sleep(200 * time.Millisecond) // 等待清理
}
func main() {
demonstrateCommonLeakScenarios()
demonstrateLeakDetection()
demonstrateContextDrivenLifecycle()
demonstrateSafeChannelPatterns()
}:::
生产者消费者
点击查看完整代码实现
go
func demonstrateProducerConsumer() {
fmt.Println("\n=== 生产者-消费者模式 ===")
// 有界缓冲区的实现
demonstrateBoundedBuffer()
// 多生产者多消费者模式
demonstrateMultiProducerConsumer()
}
func demonstrateBoundedBuffer() {
fmt.Println("\n--- 有界缓冲区实现 ---")
type BoundedBuffer struct {
buffer []interface{}
capacity int
count int
in int // 生产者索引
out int // 消费者索引
mu sync.Mutex
notFull *sync.Cond // 缓冲区不满的条件
notEmpty *sync.Cond // 缓冲区不空的条件
}
NewBoundedBuffer := func(capacity int) *BoundedBuffer {
bb := &BoundedBuffer{
buffer: make([]interface{}, capacity),
capacity: capacity,
}
bb.notFull = sync.NewCond(&bb.mu)
bb.notEmpty = sync.NewCond(&bb.mu)
return bb
}
// 生产者操作
Put := func(bb *BoundedBuffer, item interface{}) {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待缓冲区不满
for bb.count == bb.capacity {
fmt.Printf("生产者: 缓冲区已满,等待空间\n")
bb.notFull.Wait()
}
// 放入数据
bb.buffer[bb.in] = item
bb.in = (bb.in + 1) % bb.capacity
bb.count++
fmt.Printf("生产者: 放入 %v,缓冲区大小: %d/%d\n",
item, bb.count, bb.capacity)
// 通知消费者
bb.notEmpty.Signal()
}
// 消费者操作
Get := func(bb *BoundedBuffer) interface{} {
bb.mu.Lock()
defer bb.mu.Unlock()
// 等待缓冲区不空
for bb.count == 0 {
fmt.Printf("消费者: 缓冲区为空,等待数据\n")
bb.notEmpty.Wait()
}
// 取出数据
item := bb.buffer[bb.out]
bb.buffer[bb.out] = nil // 清理引用
bb.out = (bb.out + 1) % bb.capacity
bb.count--
fmt.Printf("消费者: 取出 %v,缓冲区大小: %d/%d\n",
item, bb.count, bb.capacity)
// 通知生产者
bb.notFull.Signal()
return item
}
// 演示使用
buffer := NewBoundedBuffer(3)
var wg sync.WaitGroup
// 启动生产者
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i <= 6; i++ {
Put(buffer, fmt.Sprintf("item-%d", i))
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
}
}()
// 启动消费者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 6; i++ {
item := Get(buffer)
fmt.Printf("消费者处理: %v\n", item)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}()
wg.Wait()
}
func demonstrateMultiProducerConsumer() {
fmt.Println("\n--- 多生产者多消费者模式 ---")
type WorkQueue struct {
jobs []string
mu sync.Mutex
notEmpty *sync.Cond
closed bool
}
NewWorkQueue := func() *WorkQueue {
wq := &WorkQueue{}
wq.notEmpty = sync.NewCond(&wq.mu)
return wq
}
// 添加工作
AddJob := func(wq *WorkQueue, job string) bool {
wq.mu.Lock()
defer wq.mu.Unlock()
if wq.closed {
return false
}
wq.jobs = append(wq.jobs, job)
fmt.Printf("添加工作: %s,队列长度: %d\n", job, len(wq.jobs))
wq.notEmpty.Signal() // 通知一个等待的worker
return true
}
// 获取工作
GetJob := func(wq *WorkQueue) (string, bool) {
wq.mu.Lock()
defer wq.mu.Unlock()
// 等待工作或队列关闭
for len(wq.jobs) == 0 && !wq.closed {
wq.notEmpty.Wait()
}
if len(wq.jobs) == 0 && wq.closed {
return "", false // 队列已关闭且无工作
}
job := wq.jobs[0]
wq.jobs = wq.jobs[1:]
fmt.Printf("获取工作: %s,队列长度: %d\n", job, len(wq.jobs))
return job, true
}
// 关闭队列
Close := func(wq *WorkQueue) {
wq.mu.Lock()
defer wq.mu.Unlock()
wq.closed = true
wq.notEmpty.Broadcast() // 唤醒所有等待的worker
fmt.Println("工作队列已关闭")
}
// 演示使用
queue := NewWorkQueue()
var wg sync.WaitGroup
// 启动多个生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go func(producerID int) {
defer wg.Done()
for j := 1; j <= 3; j++ {
job := fmt.Sprintf("P%d-Job%d", producerID, j)
AddJob(queue, job)
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
}
}(i)
}
// 启动多个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for {
job, ok := GetJob(queue)
if !ok {
fmt.Printf("Worker %d: 队列已关闭,退出\n", workerID)
break
}
fmt.Printf("Worker %d: 处理 %s\n", workerID, job)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}(i)
}
// 等待所有生产者完成
time.Sleep(2 * time.Second)
Close(queue)
wg.Wait()
}事件通知和状态管理
点击查看完整代码实现
点击查看完整代码实现
go
func demonstrateEventNotification() {
fmt.Println("\n=== 事件通知和状态管理 ===")
// 状态机实现
demonstrateStateMachine()
// 事件总线实现
demonstrateEventBus()
}
func demonstrateStateMachine() {
fmt.Println("\n--- 状态机实现 ---")
type State int
const (
StateIdle State = iota
StateRunning
StatePaused
StateStopped
)
func (s State) String() string {
switch s {
case StateIdle:
return "Idle"
case StateRunning:
return "Running"
case StatePaused:
return "Paused"
case StateStopped:
return "Stopped"
default:
return "Unknown"
}
}
type StateMachine struct {
currentState State
mu sync.Mutex
stateChanged *sync.Cond
}
NewStateMachine := func() *StateMachine {
sm := &StateMachine{
currentState: StateIdle,
}
sm.stateChanged = sync.NewCond(&sm.mu)
return sm
}
// 获取当前状态
GetState := func(sm *StateMachine) State {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.currentState
}
// 改变状态
ChangeState := func(sm *StateMachine, newState State) bool {
sm.mu.Lock()
defer sm.mu.Unlock()
// 检查状态转换是否合法
if !isValidTransition(sm.currentState, newState) {
return false
}
oldState := sm.currentState
sm.currentState = newState
fmt.Printf("状态转换: %s -> %s\n", oldState, newState)
sm.stateChanged.Broadcast()
return true
}
// 等待特定状态
WaitForState := func(sm *StateMachine, targetState State) {
sm.mu.Lock()
defer sm.mu.Unlock()
for sm.currentState != targetState {
sm.stateChanged.Wait()
}
}
// 等待任意状态变化
WaitForStateChange := func(sm *StateMachine, fromState State) State {
sm.mu.Lock()
defer sm.mu.Unlock()
for sm.currentState == fromState {
sm.stateChanged.Wait()
}
return sm.currentState
}
isValidTransition := func(from, to State) bool {
switch from {
case StateIdle:
return to == StateRunning
case StateRunning:
return to == StatePaused || to == StateStopped
case StatePaused:
return to == StateRunning || to == StateStopped
case StateStopped:
return to == StateIdle
default:
return false
}
}
// 演示使用
sm := NewStateMachine()
var wg sync.WaitGroup
// 状态监控器
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
currentState := GetState(sm)
fmt.Printf("监控器: 当前状态 %s\n", currentState)
newState := WaitForStateChange(sm, currentState)
fmt.Printf("监控器: 状态已变化为 %s\n", newState)
}
}()
// 等待特定状态的观察者
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("观察者: 等待Running状态")
WaitForState(sm, StateRunning)
fmt.Println("观察者: 检测到Running状态")
fmt.Println("观察者: 等待Stopped状态")
WaitForState(sm, StateStopped)
fmt.Println("观察者: 检测到Stopped状态")
}()
// 状态控制器
wg.Add(1)
go func() {
defer wg.Done()
transitions := []State{
StateRunning,
StatePaused,
StateRunning,
StateStopped,
StateIdle,
}
for _, newState := range transitions {
time.Sleep(300 * time.Millisecond)
if ChangeState(sm, newState) {
fmt.Printf("控制器: 成功转换到 %s\n", newState)
} else {
fmt.Printf("控制器: 无法转换到 %s\n", newState)
}
}
}()
wg.Wait()
}
func demonstrateEventBus() {
fmt.Println("\n--- 事件总线实现 ---")
type Event struct {
Type string
Data interface{}
}
type EventBus struct {
subscribers map[string][]*Subscriber
mu sync.Mutex
eventCond *sync.Cond
}
type Subscriber struct {
ID string
EventCh chan Event
Active bool
cond *sync.Cond
}
NewEventBus := func() *EventBus {
eb := &EventBus{
subscribers: make(map[string][]*Subscriber),
}
eb.eventCond = sync.NewCond(&eb.mu)
return eb
}
NewSubscriber := func(id string) *Subscriber {
s := &Subscriber{
ID: id,
EventCh: make(chan Event, 10),
Active: true,
}
s.cond = sync.NewCond(&sync.Mutex{})
return s
}
// 订阅事件
Subscribe := func(eb *EventBus, eventType string, subscriber *Subscriber) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.subscribers[eventType] = append(eb.subscribers[eventType], subscriber)
fmt.Printf("订阅者 %s 订阅了事件类型: %s\n", subscriber.ID, eventType)
}
// 发布事件
Publish := func(eb *EventBus, event Event) {
eb.mu.Lock()
subscribers := eb.subscribers[event.Type]
eb.mu.Unlock()
fmt.Printf("发布事件: %s, 数据: %v\n", event.Type, event.Data)
for _, subscriber := range subscribers {
if subscriber.Active {
select {
case subscriber.EventCh <- event:
fmt.Printf("事件已发送给订阅者 %s\n", subscriber.ID)
default:
fmt.Printf("订阅者 %s 的通道已满,丢弃事件\n", subscriber.ID)
}
}
}
eb.mu.Lock()
eb.eventCond.Broadcast()
eb.mu.Unlock()
}
// 等待特定事件
WaitForEvent := func(eb *EventBus, eventType string) {
eb.mu.Lock()
defer eb.mu.Unlock()
// 简化的实现:在实际使用中需要更复杂的事件匹配
eb.eventCond.Wait()
}
// 演示使用
eventBus := NewEventBus()
var wg sync.WaitGroup
// 创建订阅者
subscriber1 := NewSubscriber("Logger")
subscriber2 := NewSubscriber("MetricsCollector")
subscriber3 := NewSubscriber("Alerter")
// 订阅不同事件
Subscribe(eventBus, "user.login", subscriber1)
Subscribe(eventBus, "user.login", subscriber2)
Subscribe(eventBus, "system.error", subscriber1)
Subscribe(eventBus, "system.error", subscriber3)
// 启动订阅者处理器
for _, sub := range []*Subscriber{subscriber1, subscriber2, subscriber3} {
wg.Add(1)
go func(s *Subscriber) {
defer wg.Done()
defer close(s.EventCh)
fmt.Printf("订阅者 %s 开始处理事件\n", s.ID)
eventCount := 0
for event := range s.EventCh {
eventCount++
fmt.Printf("订阅者 %s 处理事件: %s, 数据: %v\n",
s.ID, event.Type, event.Data)
// 模拟处理时间
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
if eventCount >= 3 { // 处理3个事件后退出
s.Active = false
break
}
}
fmt.Printf("订阅者 %s 停止处理事件\n", s.ID)
}(sub)
}
// 发布事件
wg.Add(1)
go func() {
defer wg.Done()
events := []Event{
{Type: "user.login", Data: map[string]string{"user": "alice", "ip": "192.168.1.1"}},
{Type: "system.error", Data: map[string]string{"error": "database connection failed"}},
{Type: "user.login", Data: map[string]string{"user": "bob", "ip": "192.168.1.2"}},
{Type: "system.error", Data: map[string]string{"error": "memory usage high"}},
{Type: "user.login", Data: map[string]string{"user": "charlie", "ip": "192.168.1.3"}},
}
for _, event := range events {
time.Sleep(400 * time.Millisecond)
Publish(eventBus, event)
}
}()
wg.Wait()
}
func main() {
rand.Seed(time.Now().UnixNano())
demonstrateCondBasics()
demonstrateCondInternals()
demonstrateProducerConsumer()
demonstrateBarrierPattern()
demonstrateEventNotification()
}:::
1. Worker Pool模式
点击查看完整代码实现
go
import (
"context"
"fmt"
"sync"
"time"
)
// 工作任务定义
type Task struct {
ID int
Data string
}
// Worker Pool使用WaitGroup
func workerPoolWithWaitGroup() {
const numWorkers = 3
const numTasks = 10
tasks := make(chan Task, numTasks)
var wg sync.WaitGroup
// 启动worker goroutine
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
worker(workerID, tasks)
}(i)
}
// 发送任务
for i := 0; i < numTasks; i++ {
tasks <- Task{ID: i, Data: fmt.Sprintf("task-%d", i)}
}
close(tasks) // 关闭任务通道,worker会自动退出
wg.Wait() // 等待所有worker完成
fmt.Println("所有任务处理完成")
}
func worker(id int, tasks <-chan Task) {
for task := range tasks {
fmt.Printf("Worker %d 处理任务 %d: %s\n", id, task.ID, task.Data)
time.Sleep(200 * time.Millisecond) // 模拟处理时间
fmt.Printf("Worker %d 完成任务 %d\n", id, task.ID)
}
fmt.Printf("Worker %d 退出\n", id)
}错误
点击查看完整代码实现
go
package main
import (
"context"
"fmt"
"log"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
func demonstrateErrorHandlingStrategy() {
fmt.Println("=== Go错误处理策略 ===")
/*
错误处理策略核心要素:
1. 错误分类体系:
- 业务错误:用户输入错误、权限错误等
- 系统错误:网络错误、数据库错误等
- 致命错误:内存不足、配置错误等
- 临时错误:超时、限流等可重试错误
2. 错误信息设计:
- 错误码:唯一标识错误类型
- 错误消息:用户友好的描述
- 技术细节:开发者需要的详细信息
- 上下文信息:请求ID、用户ID等
3. 错误处理策略:
- 快速失败:立即返回错误
- 重试机制:自动重试临时错误
- 降级处理:提供替代方案
- 熔断保护:防止错误传播
4. 错误监控告警:
- 错误统计:错误率、错误分布
- 异常检测:异常模式识别
- 告警机制:及时通知相关人员
- 错误追踪:完整的错误链路
*/
demonstrateErrorClassification()
demonstrateCustomErrors()
demonstrateErrorPropagation()
demonstrateErrorRecovery()
}
func demonstrateErrorClassification() {
fmt.Println("\n--- 错误分类体系 ---")
/*
错误分类要点:
1. 按错误性质分类:业务、系统、配置
2. 按严重程度分类:致命、错误、警告
3. 按处理方式分类:可重试、不可重试
4. 按影响范围分类:用户级、服务级、系统级
*/
// 错误类型定义
type ErrorType int
const (
ErrorTypeBusiness ErrorType = iota // 业务错误
ErrorTypeSystem // 系统错误
ErrorTypeNetwork // 网络错误
ErrorTypeValidation // 验证错误
ErrorTypePermission // 权限错误
ErrorTypeConfiguration // 配置错误
ErrorTypeTimeout // 超时错误
ErrorTypeResource // 资源错误
)
func (et ErrorType) String() string {
switch et {
case ErrorTypeBusiness:
return "BUSINESS"
case ErrorTypeSystem:
return "SYSTEM"
case ErrorTypeNetwork:
return "NETWORK"
case ErrorTypeValidation:
return "VALIDATION"
case ErrorTypePermission:
return "PERMISSION"
case ErrorTypeConfiguration:
return "CONFIGURATION"
case ErrorTypeTimeout:
return "TIMEOUT"
case ErrorTypeResource:
return "RESOURCE"
default:
return "UNKNOWN"
}
}
// 错误严重级别
type ErrorSeverity int
const (
SeverityLow ErrorSeverity = iota
SeverityMedium
SeverityHigh
SeverityCritical
)
func (es ErrorSeverity) String() string {
switch es {
case SeverityLow:
return "LOW"
case SeverityMedium:
return "MEDIUM"
case SeverityHigh:
return "HIGH"
case SeverityCritical:
return "CRITICAL"
default:
return "UNKNOWN"
}
}
// 分类错误接口
type ClassifiedError interface {
error
Code() string
Type() ErrorType
Severity() ErrorSeverity
IsRetryable() bool
IsTemporary() bool
Details() map[string]interface{}
StackTrace() string
}
// 基础分类错误实现
type BaseClassifiedError struct {
code string
message string
errorType ErrorType
severity ErrorSeverity
retryable bool
temporary bool
details map[string]interface{}
stackTrace string
cause error
}
func NewClassifiedError(code, message string, errorType ErrorType, severity ErrorSeverity) *BaseClassifiedError {
return &BaseClassifiedError{
code: code,
message: message,
errorType: errorType,
severity: severity,
retryable: false,
temporary: false,
details: make(map[string]interface{}),
stackTrace: captureStackTrace(),
}
}
func (bce *BaseClassifiedError) Error() string {
return fmt.Sprintf("[%s] %s: %s", bce.code, bce.errorType.String(), bce.message)
}
func (bce *BaseClassifiedError) Code() string {
return bce.code
}
func (bce *BaseClassifiedError) Type() ErrorType {
return bce.errorType
}
func (bce *BaseClassifiedError) Severity() ErrorSeverity {
return bce.severity
}
func (bce *BaseClassifiedError) IsRetryable() bool {
return bce.retryable
}
func (bce *BaseClassifiedError) IsTemporary() bool {
return bce.temporary
}
func (bce *BaseClassifiedError) Details() map[string]interface{} {
return bce.details
}
func (bce *BaseClassifiedError) StackTrace() string {
return bce.stackTrace
}
func (bce *BaseClassifiedError) WithCause(cause error) *BaseClassifiedError {
bce.cause = cause
return bce
}
func (bce *BaseClassifiedError) WithDetail(key string, value interface{}) *BaseClassifiedError {
bce.details[key] = value
return bce
}
func (bce *BaseClassifiedError) SetRetryable(retryable bool) *BaseClassifiedError {
bce.retryable = retryable
return bce
}
func (bce *BaseClassifiedError) SetTemporary(temporary bool) *BaseClassifiedError {
bce.temporary = temporary
return bce
}
func captureStackTrace() string {
var buf strings.Builder
for i := 2; i < 10; i++ { // 跳过当前函数和调用者
pc, file, line, ok := runtime.Caller(i)
if !ok {
break
}
fn := runtime.FuncForPC(pc)
if fn == nil {
continue
}
funcName := fn.Name()
if idx := strings.LastIndex(funcName, "/"); idx != -1 {
funcName = funcName[idx+1:]
}
if idx := strings.LastIndex(file, "/"); idx != -1 {
file = file[idx+1:]
}
buf.WriteString(fmt.Sprintf(" %s:%d %s\n", file, line, funcName))
}
return buf.String()
}
// 具体错误类型工厂
type ErrorFactory struct{}
func (ef *ErrorFactory) NewValidationError(message string) *BaseClassifiedError {
return NewClassifiedError("VALIDATION_001", message, ErrorTypeValidation, SeverityMedium)
}
func (ef *ErrorFactory) NewPermissionError(message string) *BaseClassifiedError {
return NewClassifiedError("PERMISSION_001", message, ErrorTypePermission, SeverityHigh)
}
func (ef *ErrorFactory) NewNetworkError(message string) *BaseClassifiedError {
return NewClassifiedError("NETWORK_001", message, ErrorTypeNetwork, SeverityMedium).
SetRetryable(true).
SetTemporary(true)
}
func (ef *ErrorFactory) NewDatabaseError(message string) *BaseClassifiedError {
return NewClassifiedError("DATABASE_001", message, ErrorTypeSystem, SeverityHigh).
SetRetryable(true)
}
func (ef *ErrorFactory) NewTimeoutError(message string) *BaseClassifiedError {
return NewClassifiedError("TIMEOUT_001", message, ErrorTypeTimeout, SeverityMedium).
SetRetryable(true).
SetTemporary(true)
}
func (ef *ErrorFactory) NewBusinessError(code, message string) *BaseClassifiedError {
return NewClassifiedError(code, message, ErrorTypeBusiness, SeverityLow)
}
// 演示错误分类
fmt.Printf("错误分类体系演示:\n")
factory := &ErrorFactory{}
// 创建不同类型的错误
errors := []ClassifiedError{
factory.NewValidationError("用户名不能为空"),
factory.NewPermissionError("权限不足,无法访问该资源"),
factory.NewNetworkError("网络连接超时").WithDetail("timeout", "5s"),
factory.NewDatabaseError("数据库连接失败").WithDetail("host", "localhost:5432"),
factory.NewTimeoutError("请求处理超时").WithDetail("duration", "30s"),
factory.NewBusinessError("ORDER_001", "订单状态不允许取消"),
}
fmt.Printf(" 📊 错误分类统计:\n")
typeCount := make(map[ErrorType]int)
severityCount := make(map[ErrorSeverity]int)
retryableCount := 0
temporaryCount := 0
for i, err := range errors {
fmt.Printf(" %d. %s\n", i+1, err.Error())
fmt.Printf(" 类型: %s, 严重程度: %s\n", err.Type().String(), err.Severity().String())
fmt.Printf(" 可重试: %t, 临时性: %t\n", err.IsRetryable(), err.IsTemporary())
if len(err.Details()) > 0 {
fmt.Printf(" 详细信息: %v\n", err.Details())
}
// 统计
typeCount[err.Type()]++
severityCount[err.Severity()]++
if err.IsRetryable() {
retryableCount++
}
if err.IsTemporary() {
temporaryCount++
}
fmt.Println()
}
fmt.Printf(" 📈 统计摘要:\n")
fmt.Printf(" 按类型分布:\n")
for errorType, count := range typeCount {
fmt.Printf(" %s: %d\n", errorType.String(), count)
}
fmt.Printf(" 按严重程度分布:\n")
for severity, count := range severityCount {
fmt.Printf(" %s: %d\n", severity.String(), count)
}
fmt.Printf(" 可重试错误: %d/%d\n", retryableCount, len(errors))
fmt.Printf(" 临时性错误: %d/%d\n", temporaryCount, len(errors))
}
func demonstrateCustomErrors() {
fmt.Println("\n--- 自定义错误设计 ---")
/*
自定义错误要点:
1. 错误包装:保留原始错误信息
2. 上下文传递:携带请求上下文
3. 错误链:构建错误调用链
4. 元数据:附加诊断信息
*/
// 上下文错误
type ContextualError struct {
BaseError error
Context context.Context
RequestID string
UserID string
Operation string
Timestamp time.Time
AdditionalInfo map[string]interface{}
}
func NewContextualError(ctx context.Context, operation string, baseError error) *ContextualError {
ce := &ContextualError{
BaseError: baseError,
Context: ctx,
Operation: operation,
Timestamp: time.Now(),
AdditionalInfo: make(map[string]interface{}),
}
// 从上下文中提取信息
if requestID := ctx.Value("request_id"); requestID != nil {
if id, ok := requestID.(string); ok {
ce.RequestID = id
}
}
if userID := ctx.Value("user_id"); userID != nil {
if id, ok := userID.(string); ok {
ce.UserID = id
}
}
return ce
}
func (ce *ContextualError) Error() string {
return fmt.Sprintf("[%s] %s: %v", ce.Operation, ce.RequestID, ce.BaseError)
}
func (ce *ContextualError) Unwrap() error {
return ce.BaseError
}
func (ce *ContextualError) WithInfo(key string, value interface{}) *ContextualError {
ce.AdditionalInfo[key] = value
return ce
}
// 错误链构建器
type ErrorChain struct {
errors []error
mutex sync.RWMutex
}
func NewErrorChain() *ErrorChain {
return &ErrorChain{
errors: make([]error, 0),
}
}
func (ec *ErrorChain) Add(err error) *ErrorChain {
if err == nil {
return ec
}
ec.mutex.Lock()
defer ec.mutex.Unlock()
ec.errors = append(ec.errors, err)
return ec
}
func (ec *ErrorChain) HasErrors() bool {
ec.mutex.RLock()
defer ec.mutex.RUnlock()
return len(ec.errors) > 0
}
func (ec *ErrorChain) Error() string {
ec.mutex.RLock()
defer ec.mutex.RUnlock()
if len(ec.errors) == 0 {
return "no errors"
}
if len(ec.errors) == 1 {
return ec.errors[0].Error()
}
var builder strings.Builder
builder.WriteString("multiple errors occurred:\n")
for i, err := range ec.errors {
builder.WriteString(fmt.Sprintf(" %d. %s\n", i+1, err.Error()))
}
return builder.String()
}
func (ec *ErrorChain) Errors() []error {
ec.mutex.RLock()
defer ec.mutex.RUnlock()
result := make([]error, len(ec.errors))
copy(result, ec.errors)
return result
}
func (ec *ErrorChain) First() error {
ec.mutex.RLock()
defer ec.mutex.RUnlock()
if len(ec.errors) > 0 {
return ec.errors[0]
}
return nil
}
func (ec *ErrorChain) Last() error {
ec.mutex.RLock()
defer ec.mutex.RUnlock()
if len(ec.errors) > 0 {
return ec.errors[len(ec.errors)-1]
}
return nil
}
// 业务错误包装器
type BusinessErrorWrapper struct {
UserMessage string // 用户友好的消息
TechnicalError error // 技术错误详情
ErrorCode string // 业务错误码
Suggestions []string // 解决建议
}
func NewBusinessErrorWrapper(code, userMessage string, technicalError error) *BusinessErrorWrapper {
return &BusinessErrorWrapper{
UserMessage: userMessage,
TechnicalError: technicalError,
ErrorCode: code,
Suggestions: make([]string, 0),
}
}
func (bew *BusinessErrorWrapper) Error() string {
return fmt.Sprintf("[%s] %s", bew.ErrorCode, bew.UserMessage)
}
func (bew *BusinessErrorWrapper) Unwrap() error {
return bew.TechnicalError
}
func (bew *BusinessErrorWrapper) AddSuggestion(suggestion string) *BusinessErrorWrapper {
bew.Suggestions = append(bew.Suggestions, suggestion)
return bew
}
func (bew *BusinessErrorWrapper) GetUserMessage() string {
return bew.UserMessage
}
func (bew *BusinessErrorWrapper) GetTechnicalDetails() string {
if bew.TechnicalError != nil {
return bew.TechnicalError.Error()
}
return ""
}
func (bew *BusinessErrorWrapper) GetSuggestions() []string {
return bew.Suggestions
}
// 演示自定义错误
fmt.Printf("自定义错误设计演示:\n")
// 模拟业务操作
simulateBusinessOperation := func() error {
// 创建带上下文的请求
ctx := context.Background()
ctx = context.WithValue(ctx, "request_id", "req-12345")
ctx = context.WithValue(ctx, "user_id", "user-67890")
// 模拟数据库错误
dbError := fmt.Errorf("connection timeout after 5s")
// 包装为上下文错误
contextError := NewContextualError(ctx, "user_lookup", dbError).
WithInfo("table", "users").
WithInfo("query_duration", "5.2s")
// 包装为业务错误
businessError := NewBusinessErrorWrapper(
"USER_LOOKUP_FAILED",
"用户信息获取失败,请稍后重试",
contextError,
).AddSuggestion("检查网络连接").
AddSuggestion("联系技术支持")
return businessError
}
// 模拟多个错误的聚合
simulateMultipleErrors := func() error {
chain := NewErrorChain()
// 模拟验证错误
chain.Add(fmt.Errorf("用户名不能为空"))
chain.Add(fmt.Errorf("密码长度不足8位"))
chain.Add(fmt.Errorf("邮箱格式不正确"))
if chain.HasErrors() {
return chain
}
return nil
}
fmt.Printf(" 🔍 上下文错误示例:\n")
if err := simulateBusinessOperation(); err != nil {
fmt.Printf(" 错误: %s\n", err.Error())
// 类型断言获取详细信息
if businessErr, ok := err.(*BusinessErrorWrapper); ok {
fmt.Printf(" 用户消息: %s\n", businessErr.GetUserMessage())
fmt.Printf(" 技术详情: %s\n", businessErr.GetTechnicalDetails())
fmt.Printf(" 解决建议: %v\n", businessErr.GetSuggestions())
// 解包获取原始错误
if contextErr, ok := businessErr.Unwrap().(*ContextualError); ok {
fmt.Printf(" 请求ID: %s\n", contextErr.RequestID)
fmt.Printf(" 用户ID: %s\n", contextErr.UserID)
fmt.Printf(" 操作: %s\n", contextErr.Operation)
fmt.Printf(" 时间戳: %s\n", contextErr.Timestamp.Format(time.RFC3339))
fmt.Printf(" 附加信息: %v\n", contextErr.AdditionalInfo)
}
}
}
fmt.Printf("\n 📝 错误链示例:\n")
if err := simulateMultipleErrors(); err != nil {
fmt.Printf(" %s", err.Error())
if errorChain, ok := err.(*ErrorChain); ok {
fmt.Printf(" 错误总数: %d\n", len(errorChain.Errors()))
fmt.Printf(" 首个错误: %s\n", errorChain.First().Error())
fmt.Printf(" 最后错误: %s\n", errorChain.Last().Error())
}
}
}
func demonstrateErrorPropagation() {
fmt.Println("\n--- 错误传播机制 ---")
/*
错误传播要点:
1. 错误包装:保留调用栈信息
2. 错误转换:适配不同层级的错误
3. 错误过滤:决定哪些错误需要传播
4. 错误增强:添加上下文信息
*/
// 错误传播器
type ErrorPropagator struct {
filters []ErrorFilter
transformers []ErrorTransformer
enhancers []ErrorEnhancer
}
type ErrorFilter interface {
ShouldPropagate(err error) bool
}
type ErrorTransformer interface {
Transform(err error) error
}
type ErrorEnhancer interface {
Enhance(err error, context map[string]interface{}) error
}
// 严重性过滤器
type SeverityFilter struct {
MinSeverity ErrorSeverity
}
func (sf *SeverityFilter) ShouldPropagate(err error) bool {
if classifiedErr, ok := err.(ClassifiedError); ok {
return classifiedErr.Severity() >= sf.MinSeverity
}
return true // 未分类错误默认传播
}
// 错误类型转换器
type ErrorTypeTransformer struct {
FromType ErrorType
ToType ErrorType
}
func (ett *ErrorTypeTransformer) Transform(err error) error {
if classifiedErr, ok := err.(*BaseClassifiedError); ok {
if classifiedErr.Type() == ett.FromType {
// 创建新的错误类型
newErr := NewClassifiedError(
classifiedErr.Code(),
classifiedErr.message,
ett.ToType,
classifiedErr.Severity(),
)
newErr.details = classifiedErr.details
return newErr
}
}
return err
}
// 上下文增强器
type ContextEnhancer struct {
Service string
Version string
}
func (ce *ContextEnhancer) Enhance(err error, context map[string]interface{}) error {
// 创建增强的错误包装
enhanced := &EnhancedError{
OriginalError: err,
Service: ce.Service,
Version: ce.Version,
Context: context,
Timestamp: time.Now(),
}
return enhanced
}
type EnhancedError struct {
OriginalError error
Service string
Version string
Context map[string]interface{}
Timestamp time.Time
}
func (ee *EnhancedError) Error() string {
return fmt.Sprintf("[%s@%s] %s", ee.Service, ee.Version, ee.OriginalError.Error())
}
func (ee *EnhancedError) Unwrap() error {
return ee.OriginalError
}
func NewErrorPropagator() *ErrorPropagator {
return &ErrorPropagator{
filters: make([]ErrorFilter, 0),
transformers: make([]ErrorTransformer, 0),
enhancers: make([]ErrorEnhancer, 0),
}
}
func (ep *ErrorPropagator) AddFilter(filter ErrorFilter) {
ep.filters = append(ep.filters, filter)
}
func (ep *ErrorPropagator) AddTransformer(transformer ErrorTransformer) {
ep.transformers = append(ep.transformers, transformer)
}
func (ep *ErrorPropagator) AddEnhancer(enhancer ErrorEnhancer) {
ep.enhancers = append(ep.enhancers, enhancer)
}
func (ep *ErrorPropagator) Propagate(err error, context map[string]interface{}) error {
if err == nil {
return nil
}
// 应用过滤器
for _, filter := range ep.filters {
if !filter.ShouldPropagate(err) {
return nil // 不传播此错误
}
}
// 应用转换器
for _, transformer := range ep.transformers {
err = transformer.Transform(err)
}
// 应用增强器
for _, enhancer := range ep.enhancers {
err = enhancer.Enhance(err, context)
}
return err
}
// 演示错误传播
fmt.Printf("错误传播机制演示:\n")
// 创建错误传播器
propagator := NewErrorPropagator()
// 添加严重性过滤器(只传播中等以上严重度的错误)
propagator.AddFilter(&SeverityFilter{MinSeverity: SeverityMedium})
// 添加错误类型转换器(将网络错误转换为系统错误)
propagator.AddTransformer(&ErrorTypeTransformer{
FromType: ErrorTypeNetwork,
ToType: ErrorTypeSystem,
})
// 添加上下文增强器
propagator.AddEnhancer(&ContextEnhancer{
Service: "user-service",
Version: "v1.2.3",
})
// 测试不同类型的错误传播
factory := &ErrorFactory{}
testErrors := []error{
factory.NewValidationError("输入验证失败"), // 中等严重度,应该传播
NewClassifiedError("TRACE_001", "调试信息", ErrorTypeBusiness, SeverityLow), // 低严重度,应该被过滤
factory.NewNetworkError("网络连接失败"), // 网络错误,应该被转换为系统错误
factory.NewDatabaseError("数据库查询失败"), // 高严重度,应该传播
}
context := map[string]interface{}{
"request_id": "req-98765",
"user_id": "user-12345",
"operation": "get_user_profile",
}
fmt.Printf(" 🚀 错误传播测试:\n")
for i, originalErr := range testErrors {
fmt.Printf(" 测试 %d - 原始错误: %s\n", i+1, originalErr.Error())
propagatedErr := propagator.Propagate(originalErr, context)
if propagatedErr == nil {
fmt.Printf(" 结果: 错误被过滤,未传播\n")
} else {
fmt.Printf(" 结果: %s\n", propagatedErr.Error())
// 检查是否被增强
if enhanced, ok := propagatedErr.(*EnhancedError); ok {
fmt.Printf(" 服务: %s\n", enhanced.Service)
fmt.Printf(" 版本: %s\n", enhanced.Version)
fmt.Printf(" 时间戳: %s\n", enhanced.Timestamp.Format("15:04:05"))
fmt.Printf(" 上下文: %v\n", enhanced.Context)
}
}
fmt.Println()
}
}
func demonstrateErrorRecovery() {
fmt.Println("\n--- 错误恢复策略 ---")
/*
错误恢复要点:
1. 重试机制:指数退避、最大重试次数
2. 熔断保护:防止级联失败
3. 降级处理:提供备用方案
4. 超时控制:避免无限等待
*/
// 重试策略
type RetryStrategy interface {
ShouldRetry(attempt int, err error) bool
NextDelay(attempt int) time.Duration
MaxAttempts() int
}
// 指数退避重试策略
type ExponentialBackoffStrategy struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
MaxRetries int
}
func (ebs *ExponentialBackoffStrategy) ShouldRetry(attempt int, err error) bool {
if attempt >= ebs.MaxRetries {
return false
}
// 检查错误是否可重试
if classifiedErr, ok := err.(ClassifiedError); ok {
return classifiedErr.IsRetryable()
}
return false
}
func (ebs *ExponentialBackoffStrategy) NextDelay(attempt int) time.Duration {
delay := time.Duration(float64(ebs.InitialDelay) *
pow(ebs.Multiplier, float64(attempt)))
if delay > ebs.MaxDelay {
delay = ebs.MaxDelay
}
return delay
}
func (ebs *ExponentialBackoffStrategy) MaxAttempts() int {
return ebs.MaxRetries
}
// 简单的幂运算实现
func pow(base, exp float64) float64 {
if exp == 0 {
return 1
}
result := base
for i := 1; i < int(exp); i++ {
result *= base
}
return result
}
// 重试执行器
type RetryExecutor struct {
strategy RetryStrategy
logger *log.Logger
}
func NewRetryExecutor(strategy RetryStrategy) *RetryExecutor {
return &RetryExecutor{
strategy: strategy,
logger: log.New(log.Writer(), "[RETRY] ", log.LstdFlags),
}
}
func (re *RetryExecutor) Execute(ctx context.Context, operation func() error) error {
var lastErr error
for attempt := 0; attempt < re.strategy.MaxAttempts(); attempt++ {
// 检查上下文是否已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 执行操作
err := operation()
if err == nil {
if attempt > 0 {
re.logger.Printf("操作在第 %d 次重试后成功", attempt+1)
}
return nil
}
lastErr = err
// 检查是否应该重试
if !re.strategy.ShouldRetry(attempt, err) {
re.logger.Printf("错误不可重试或达到最大重试次数: %v", err)
break
}
// 计算延迟时间
delay := re.strategy.NextDelay(attempt)
re.logger.Printf("第 %d 次重试失败: %v, %v 后重试", attempt+1, err, delay)
// 等待重试
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
// 继续重试
}
}
return fmt.Errorf("重试 %d 次后仍然失败: %v", re.strategy.MaxAttempts(), lastErr)
}
// 熔断器(简化版)
type CircuitBreaker struct {
name string
maxFailures int
resetTimeout time.Duration
failures int
lastFailTime time.Time
state string
mutex sync.RWMutex
}
func NewCircuitBreaker(name string, maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
name: name,
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: "CLOSED",
}
}
func (cb *CircuitBreaker) Execute(operation func() error) error {
cb.mutex.RLock()
state := cb.state
failures := cb.failures
lastFailTime := cb.lastFailTime
cb.mutex.RUnlock()
// 检查熔断器状态
if state == "OPEN" {
if time.Since(lastFailTime) > cb.resetTimeout {
cb.mutex.Lock()
cb.state = "HALF_OPEN"
cb.mutex.Unlock()
} else {
return fmt.Errorf("熔断器 %s 已开启", cb.name)
}
}
// 执行操作
err := operation()
cb.mutex.Lock()
defer cb.mutex.Unlock()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = "OPEN"
log.Printf("熔断器 %s 开启,失败次数: %d", cb.name, cb.failures)
}
return err
}
// 成功时重置
if cb.state == "HALF_OPEN" {
cb.state = "CLOSED"
log.Printf("熔断器 %s 恢复正常", cb.name)
}
cb.failures = 0
return nil
}
// 降级处理器
type FallbackHandler struct {
primary func() (interface{}, error)
fallback func() (interface{}, error)
threshold time.Duration
}
func NewFallbackHandler(primary, fallback func() (interface{}, error), threshold time.Duration) *FallbackHandler {
return &FallbackHandler{
primary: primary,
fallback: fallback,
threshold: threshold,
}
}
func (fh *FallbackHandler) Execute() (interface{}, error) {
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), fh.threshold)
defer cancel()
resultCh := make(chan interface{}, 1)
errorCh := make(chan error, 1)
// 执行主要操作
go func() {
result, err := fh.primary()
if err != nil {
errorCh <- err
} else {
resultCh <- result
}
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errorCh:
log.Printf("主要操作失败,执行降级: %v", err)
return fh.fallback()
case <-ctx.Done():
log.Printf("主要操作超时,执行降级")
return fh.fallback()
}
}
// 演示错误恢复
fmt.Printf("错误恢复策略演示:\n")
// 1. 重试机制演示
fmt.Printf(" 🔄 重试机制测试:\n")
retryStrategy := &ExponentialBackoffStrategy{
InitialDelay: 100 * time.Millisecond,
MaxDelay: 1 * time.Second,
Multiplier: 2.0,
MaxRetries: 3,
}
executor := NewRetryExecutor(retryStrategy)
// 模拟不稳定的操作
callCount := 0
unstableOperation := func() error {
callCount++
if callCount < 3 { // 前两次失败
factory := &ErrorFactory{}
return factory.NewNetworkError(fmt.Sprintf("网络错误 #%d", callCount))
}
return nil // 第三次成功
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := executor.Execute(ctx, unstableOperation); err != nil {
fmt.Printf(" ❌ 重试失败: %v\n", err)
} else {
fmt.Printf(" ✅ 重试成功,总调用次数: %d\n", callCount)
}
// 2. 熔断器演示
fmt.Printf("\n ⚡ 熔断器测试:\n")
breaker := NewCircuitBreaker("test-service", 2, 2*time.Second)
// 模拟失败操作
failingOperation := func() error {
return fmt.Errorf("服务不可用")
}
// 连续失败触发熔断
for i := 1; i <= 5; i++ {
err := breaker.Execute(failingOperation)
if err != nil {
fmt.Printf(" 调用 %d: ❌ %v\n", i, err)
}
}
// 3. 降级处理演示
fmt.Printf("\n 📉 降级处理测试:\n")
// 主要服务(模拟慢响应)
primaryService := func() (interface{}, error) {
time.Sleep(200 * time.Millisecond) // 模拟慢响应
return "来自主要服务的数据", nil
}
// 降级服务(快速响应)
fallbackService := func() (interface{}, error) {
return "来自缓存的数据", nil
}
handler := NewFallbackHandler(primaryService, fallbackService, 100*time.Millisecond)
result, err := handler.Execute()
if err != nil {
fmt.Printf(" ❌ 降级处理失败: %v\n", err)
} else {
fmt.Printf(" ✅ 降级处理成功: %v\n", result)
}
fmt.Printf("\n 📋 错误恢复最佳实践:\n")
fmt.Printf(" 1. 识别可重试的错误类型\n")
fmt.Printf(" 2. 实现指数退避重试策略\n")
fmt.Printf(" 3. 使用熔断器防止级联失败\n")
fmt.Printf(" 4. 提供降级和备用方案\n")
fmt.Printf(" 5. 设置合理的超时时间\n")
fmt.Printf(" 6. 监控错误率和恢复情况\n")
}
func main() {
demonstrateErrorHandlingStrategy()
}日志
点击查看完整代码实现
go
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"runtime"
"strings"
"sync"
"time"
)
func demonstrateLoggingManagement() {
fmt.Println("=== Go日志管理策略 ===")
/*
日志管理核心要素:
1. 日志级别管理:
- DEBUG: 详细的调试信息
- INFO: 一般信息记录
- WARN: 警告信息
- ERROR: 错误信息
- FATAL: 致命错误
2. 结构化日志:
- JSON格式输出
- 字段标准化
- 上下文信息
- 链路追踪ID
3. 日志输出管理:
- 多目标输出
- 异步写入
- 缓冲机制
- 轮转策略
4. 性能优化:
- 惰性求值
- 批量写入
- 压缩存储
- 采样机制
*/
demonstrateStructuredLogging()
demonstrateLogLevels()
demonstrateAsyncLogging()
demonstrateLogAggregation()
}
func demonstrateStructuredLogging() {
fmt.Println("\n--- 结构化日志设计 ---")
/*
结构化日志要点:
1. 标准字段:时间戳、级别、消息、调用者信息
2. 上下文字段:请求ID、用户ID、会话ID
3. 业务字段:订单ID、产品ID等业务相关信息
4. 元数据:服务名、版本号、环境信息
*/
// 日志级别定义
type LogLevel int
const (
DEBUG LogLevel = iota
INFO
WARN
ERROR
FATAL
)
func (l LogLevel) String() string {
switch l {
case DEBUG:
return "DEBUG"
case INFO:
return "INFO"
case WARN:
return "WARN"
case ERROR:
return "ERROR"
case FATAL:
return "FATAL"
default:
return "UNKNOWN"
}
}
// 日志条目结构
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
Version string `json:"version"`
Environment string `json:"environment"`
RequestID string `json:"request_id,omitempty"`
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Caller CallerInfo `json:"caller"`
Fields map[string]interface{} `json:"fields,omitempty"`
Error *ErrorInfo `json:"error,omitempty"`
}
type CallerInfo struct {
File string `json:"file"`
Line int `json:"line"`
Function string `json:"function"`
}
type ErrorInfo struct {
Type string `json:"type"`
Message string `json:"message"`
StackTrace string `json:"stack_trace,omitempty"`
}
// 结构化日志器
type StructuredLogger struct {
service string
version string
environment string
minLevel LogLevel
outputs []io.Writer
fields map[string]interface{}
mutex sync.RWMutex
}
func NewStructuredLogger(service, version, environment string) *StructuredLogger {
return &StructuredLogger{
service: service,
version: version,
environment: environment,
minLevel: INFO,
outputs: []io.Writer{os.Stdout},
fields: make(map[string]interface{}),
}
}
func (sl *StructuredLogger) SetLevel(level LogLevel) {
sl.mutex.Lock()
defer sl.mutex.Unlock()
sl.minLevel = level
}
func (sl *StructuredLogger) AddOutput(output io.Writer) {
sl.mutex.Lock()
defer sl.mutex.Unlock()
sl.outputs = append(sl.outputs, output)
}
func (sl *StructuredLogger) WithField(key string, value interface{}) *StructuredLogger {
sl.mutex.RLock()
fields := make(map[string]interface{})
for k, v := range sl.fields {
fields[k] = v
}
sl.mutex.RUnlock()
fields[key] = value
return &StructuredLogger{
service: sl.service,
version: sl.version,
environment: sl.environment,
minLevel: sl.minLevel,
outputs: sl.outputs,
fields: fields,
}
}
func (sl *StructuredLogger) WithFields(fields map[string]interface{}) *StructuredLogger {
sl.mutex.RLock()
newFields := make(map[string]interface{})
for k, v := range sl.fields {
newFields[k] = v
}
sl.mutex.RUnlock()
for k, v := range fields {
newFields[k] = v
}
return &StructuredLogger{
service: sl.service,
version: sl.version,
environment: sl.environment,
minLevel: sl.minLevel,
outputs: sl.outputs,
fields: newFields,
}
}
func (sl *StructuredLogger) log(level LogLevel, message string, err error) {
sl.mutex.RLock()
if level < sl.minLevel {
sl.mutex.RUnlock()
return
}
outputs := make([]io.Writer, len(sl.outputs))
copy(outputs, sl.outputs)
sl.mutex.RUnlock()
// 获取调用者信息
caller := sl.getCaller()
// 构建日志条目
entry := LogEntry{
Timestamp: time.Now(),
Level: level.String(),
Message: message,
Service: sl.service,
Version: sl.version,
Environment: sl.environment,
Caller: caller,
}
// 添加上下文字段
if len(sl.fields) > 0 {
entry.Fields = make(map[string]interface{})
for k, v := range sl.fields {
entry.Fields[k] = v
}
}
// 添加错误信息
if err != nil {
entry.Error = &ErrorInfo{
Type: fmt.Sprintf("%T", err),
Message: err.Error(),
}
}
// 序列化为JSON
data, marshalErr := json.Marshal(entry)
if marshalErr != nil {
// 降级处理
fallbackLog := fmt.Sprintf(`{"timestamp":"%s","level":"%s","message":"JSON marshal error: %v","service":"%s"}`,
entry.Timestamp.Format(time.RFC3339),
entry.Level,
marshalErr,
entry.Service)
data = []byte(fallbackLog)
}
data = append(data, '\n')
// 写入所有输出
for _, output := range outputs {
output.Write(data)
}
}
func (sl *StructuredLogger) getCaller() CallerInfo {
// 跳过当前函数和log函数
pc, file, line, ok := runtime.Caller(3)
if !ok {
return CallerInfo{}
}
// 获取函数名
fn := runtime.FuncForPC(pc)
funcName := "unknown"
if fn != nil {
funcName = fn.Name()
// 简化函数名
if idx := strings.LastIndex(funcName, "/"); idx != -1 {
funcName = funcName[idx+1:]
}
}
// 简化文件路径
if idx := strings.LastIndex(file, "/"); idx != -1 {
file = file[idx+1:]
}
return CallerInfo{
File: file,
Line: line,
Function: funcName,
}
}
func (sl *StructuredLogger) Debug(message string) {
sl.log(DEBUG, message, nil)
}
func (sl *StructuredLogger) Info(message string) {
sl.log(INFO, message, nil)
}
func (sl *StructuredLogger) Warn(message string) {
sl.log(WARN, message, nil)
}
func (sl *StructuredLogger) Error(message string, err error) {
sl.log(ERROR, message, err)
}
func (sl *StructuredLogger) Fatal(message string, err error) {
sl.log(FATAL, message, err)
os.Exit(1)
}
// 演示结构化日志
fmt.Printf("结构化日志演示:\n")
logger := NewStructuredLogger("user-service", "v1.2.3", "production")
logger.SetLevel(DEBUG)
// 基础日志
logger.Info("服务启动成功")
logger.Debug("数据库连接池初始化")
// 带字段的日志
userLogger := logger.WithFields(map[string]interface{}{
"request_id": "req-12345",
"user_id": "user-67890",
"session_id": "sess-abcdef",
})
userLogger.Info("用户登录成功")
// 带错误的日志
err := fmt.Errorf("数据库连接超时")
userLogger.Error("用户查询失败", err)
// 业务操作日志
orderLogger := userLogger.WithFields(map[string]interface{}{
"order_id": "order-123",
"product_id": "prod-456",
"amount": 99.99,
"currency": "USD",
})
orderLogger.Info("订单创建成功")
// 性能日志
performanceLogger := logger.WithFields(map[string]interface{}{
"operation": "database_query",
"duration_ms": 150,
"rows": 42,
})
performanceLogger.Warn("数据库查询耗时较长")
}
func demonstrateLogLevels() {
fmt.Println("\n--- 日志级别管理 ---")
/*
日志级别管理要点:
1. 动态级别调整:运行时调整日志级别
2. 模块级别控制:不同模块设置不同级别
3. 环境相关配置:开发、测试、生产环境
4. 采样和限流:高频日志的控制机制
*/
// 日志级别管理器
type LogLevelManager struct {
globalLevel LogLevel
moduleLevel map[string]LogLevel
sampling map[LogLevel]int // 采样率:1表示记录所有,10表示每10条记录1条
counters map[string]int64 // 计数器
mutex sync.RWMutex
}
func NewLogLevelManager() *LogLevelManager {
return &LogLevelManager{
globalLevel: INFO,
moduleLevel: make(map[string]LogLevel),
sampling: map[LogLevel]int{
DEBUG: 1,
INFO: 1,
WARN: 1,
ERROR: 1,
FATAL: 1,
},
counters: make(map[string]int64),
}
}
func (llm *LogLevelManager) SetGlobalLevel(level LogLevel) {
llm.mutex.Lock()
defer llm.mutex.Unlock()
llm.globalLevel = level
}
func (llm *LogLevelManager) SetModuleLevel(module string, level LogLevel) {
llm.mutex.Lock()
defer llm.mutex.Unlock()
llm.moduleLevel[module] = level
}
func (llm *LogLevelManager) SetSampling(level LogLevel, rate int) {
llm.mutex.Lock()
defer llm.mutex.Unlock()
if rate > 0 {
llm.sampling[level] = rate
}
}
func (llm *LogLevelManager) ShouldLog(module string, level LogLevel) bool {
llm.mutex.RLock()
defer llm.mutex.RUnlock()
// 检查模块级别
if moduleLevel, exists := llm.moduleLevel[module]; exists {
if level < moduleLevel {
return false
}
} else if level < llm.globalLevel {
return false
}
// 检查采样率
if samplingRate, exists := llm.sampling[level]; exists && samplingRate > 1 {
counterKey := fmt.Sprintf("%s_%s", module, level.String())
llm.counters[counterKey]++
return llm.counters[counterKey]%int64(samplingRate) == 0
}
return true
}
func (llm *LogLevelManager) GetStats() map[string]interface{} {
llm.mutex.RLock()
defer llm.mutex.RUnlock()
stats := map[string]interface{}{
"global_level": llm.globalLevel.String(),
"module_levels": make(map[string]string),
"sampling_rates": make(map[string]int),
"counters": make(map[string]int64),
}
for module, level := range llm.moduleLevel {
stats["module_levels"].(map[string]string)[module] = level.String()
}
for level, rate := range llm.sampling {
stats["sampling_rates"].(map[string]int)[level.String()] = rate
}
for key, count := range llm.counters {
stats["counters"].(map[string]int64)[key] = count
}
return stats
}
// 支持级别管理的日志器
type LevelManagedLogger struct {
module string
levelMgr *LogLevelManager
baseLogger *StructuredLogger
}
func NewLevelManagedLogger(module string, levelMgr *LogLevelManager, baseLogger *StructuredLogger) *LevelManagedLogger {
return &LevelManagedLogger{
module: module,
levelMgr: levelMgr,
baseLogger: baseLogger,
}
}
func (lml *LevelManagedLogger) log(level LogLevel, message string, err error) {
if !lml.levelMgr.ShouldLog(lml.module, level) {
return
}
// 添加模块信息
logger := lml.baseLogger.WithField("module", lml.module)
switch level {
case DEBUG:
logger.Debug(message)
case INFO:
logger.Info(message)
case WARN:
logger.Warn(message)
case ERROR:
logger.Error(message, err)
case FATAL:
logger.Fatal(message, err)
}
}
func (lml *LevelManagedLogger) Debug(message string) {
lml.log(DEBUG, message, nil)
}
func (lml *LevelManagedLogger) Info(message string) {
lml.log(INFO, message, nil)
}
func (lml *LevelManagedLogger) Warn(message string) {
lml.log(WARN, message, nil)
}
func (lml *LevelManagedLogger) Error(message string, err error) {
lml.log(ERROR, message, err)
}
// 演示日志级别管理
fmt.Printf("日志级别管理演示:\n")
levelManager := NewLogLevelManager()
baseLogger := NewStructuredLogger("demo-service", "v1.0.0", "development")
// 创建不同模块的日志器
authLogger := NewLevelManagedLogger("auth", levelManager, baseLogger)
dbLogger := NewLevelManagedLogger("database", levelManager, baseLogger)
apiLogger := NewLevelManagedLogger("api", levelManager, baseLogger)
fmt.Printf(" 📊 默认配置测试:\n")
// 默认级别测试(INFO级别)
authLogger.Debug("认证模块调试信息") // 不会输出
authLogger.Info("用户认证成功") // 会输出
dbLogger.Info("数据库连接建立") // 会输出
fmt.Printf("\n 🔧 动态级别调整:\n")
// 调整全局级别为DEBUG
levelManager.SetGlobalLevel(DEBUG)
authLogger.Debug("现在可以看到调试信息了")
// 设置特定模块级别
levelManager.SetModuleLevel("database", WARN)
dbLogger.Info("数据库信息日志") // 不会输出(级别不够)
dbLogger.Warn("数据库连接缓慢") // 会输出
fmt.Printf("\n 📉 日志采样测试:\n")
// 设置DEBUG级别采样(每5条记录1条)
levelManager.SetSampling(DEBUG, 5)
fmt.Printf(" 发送10条DEBUG日志(应该只记录2条):\n")
for i := 1; i <= 10; i++ {
authLogger.Debug(fmt.Sprintf("调试信息 #%d", i))
}
// 显示统计信息
fmt.Printf("\n 📈 日志统计信息:\n")
stats := levelManager.GetStats()
for key, value := range stats {
switch v := value.(type) {
case map[string]string:
fmt.Printf(" %s:\n", key)
for k, val := range v {
fmt.Printf(" %s: %s\n", k, val)
}
case map[string]int:
fmt.Printf(" %s:\n", key)
for k, val := range v {
fmt.Printf(" %s: %d\n", k, val)
}
case map[string]int64:
fmt.Printf(" %s:\n", key)
for k, val := range v {
fmt.Printf(" %s: %d\n", k, val)
}
default:
fmt.Printf(" %s: %v\n", key, v)
}
}
}
func demonstrateAsyncLogging() {
fmt.Println("\n--- 异步日志处理 ---")
/*
异步日志要点:
1. 缓冲机制:批量写入提高性能
2. 背压处理:缓冲区满时的策略
3. 优雅关闭:确保所有日志都被写入
4. 错误处理:异步写入失败的处理
*/
// 异步日志条目
type AsyncLogEntry struct {
Data []byte
Timestamp time.Time
Retry int
}
// 异步日志器
type AsyncLogger struct {
buffer chan AsyncLogEntry
writers []io.Writer
batchSize int
flushInterval time.Duration
maxRetries int
stopCh chan struct{}
doneCh chan struct{}
errorCh chan error
stats *AsyncLoggerStats
mutex sync.RWMutex
}
type AsyncLoggerStats struct {
TotalLogs int64
DroppedLogs int64
FailedWrites int64
BatchWrites int64
}
func NewAsyncLogger(bufferSize, batchSize int, flushInterval time.Duration) *AsyncLogger {
return &AsyncLogger{
buffer: make(chan AsyncLogEntry, bufferSize),
batchSize: batchSize,
flushInterval: flushInterval,
maxRetries: 3,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
errorCh: make(chan error, 100),
stats: &AsyncLoggerStats{},
}
}
func (al *AsyncLogger) AddWriter(writer io.Writer) {
al.mutex.Lock()
defer al.mutex.Unlock()
al.writers = append(al.writers, writer)
}
func (al *AsyncLogger) Start() {
go al.processLoop()
}
func (al *AsyncLogger) Stop() error {
close(al.stopCh)
// 等待处理完成
select {
case <-al.doneCh:
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("异步日志器关闭超时")
}
}
func (al *AsyncLogger) Log(data []byte) error {
entry := AsyncLogEntry{
Data: data,
Timestamp: time.Now(),
Retry: 0,
}
select {
case al.buffer <- entry:
al.stats.TotalLogs++
return nil
default:
// 缓冲区满,丢弃日志
al.stats.DroppedLogs++
return fmt.Errorf("日志缓冲区已满")
}
}
func (al *AsyncLogger) processLoop() {
defer close(al.doneCh)
ticker := time.NewTicker(al.flushInterval)
defer ticker.Stop()
batch := make([]AsyncLogEntry, 0, al.batchSize)
for {
select {
case <-al.stopCh:
// 处理剩余的日志
al.flushRemaining()
return
case entry := <-al.buffer:
batch = append(batch, entry)
if len(batch) >= al.batchSize {
al.writeBatch(batch)
batch = batch[:0] // 重置批次
}
case <-ticker.C:
if len(batch) > 0 {
al.writeBatch(batch)
batch = batch[:0]
}
}
}
}
func (al *AsyncLogger) writeBatch(batch []AsyncLogEntry) {
if len(batch) == 0 {
return
}
al.mutex.RLock()
writers := make([]io.Writer, len(al.writers))
copy(writers, al.writers)
al.mutex.RUnlock()
// 合并批次数据
var combinedData []byte
for _, entry := range batch {
combinedData = append(combinedData, entry.Data...)
}
// 写入所有目标
hasError := false
for _, writer := range writers {
if _, err := writer.Write(combinedData); err != nil {
al.errorCh <- fmt.Errorf("写入失败: %v", err)
al.stats.FailedWrites++
hasError = true
}
}
if !hasError {
al.stats.BatchWrites++
}
}
func (al *AsyncLogger) flushRemaining() {
// 处理缓冲区中剩余的所有日志
remaining := make([]AsyncLogEntry, 0)
for {
select {
case entry := <-al.buffer:
remaining = append(remaining, entry)
default:
if len(remaining) > 0 {
al.writeBatch(remaining)
}
return
}
}
}
func (al *AsyncLogger) GetStats() AsyncLoggerStats {
return *al.stats
}
func (al *AsyncLogger) GetErrors() []error {
var errors []error
for {
select {
case err := <-al.errorCh:
errors = append(errors, err)
default:
return errors
}
}
}
// 演示异步日志
fmt.Printf("异步日志处理演示:\n")
// 创建异步日志器
asyncLogger := NewAsyncLogger(1000, 10, 100*time.Millisecond)
// 添加输出目标
asyncLogger.AddWriter(os.Stdout)
// 启动异步处理
asyncLogger.Start()
fmt.Printf(" 📝 发送异步日志:\n")
// 发送大量日志测试性能
start := time.Now()
for i := 1; i <= 50; i++ {
logData := fmt.Sprintf(`{"timestamp":"%s","level":"INFO","message":"异步日志消息 #%d","service":"test"}%s`,
time.Now().Format(time.RFC3339), i, "\n")
if err := asyncLogger.Log([]byte(logData)); err != nil {
fmt.Printf(" ❌ 日志发送失败: %v\n", err)
}
// 模拟一些处理时间
if i%10 == 0 {
time.Sleep(10 * time.Millisecond)
}
}
duration := time.Since(start)
// 等待一段时间让异步处理完成
time.Sleep(500 * time.Millisecond)
// 停止异步日志器
if err := asyncLogger.Stop(); err != nil {
fmt.Printf(" ❌ 异步日志器停止失败: %v\n", err)
}
// 显示统计信息
stats := asyncLogger.GetStats()
fmt.Printf("\n 📊 异步日志统计:\n")
fmt.Printf(" 发送耗时: %v\n", duration)
fmt.Printf(" 总日志数: %d\n", stats.TotalLogs)
fmt.Printf(" 丢弃日志数: %d\n", stats.DroppedLogs)
fmt.Printf(" 批次写入数: %d\n", stats.BatchWrites)
fmt.Printf(" 写入失败数: %d\n", stats.FailedWrites)
// 检查错误
errors := asyncLogger.GetErrors()
if len(errors) > 0 {
fmt.Printf(" 错误数: %d\n", len(errors))
for i, err := range errors {
if i < 3 { // 只显示前3个错误
fmt.Printf(" %d: %v\n", i+1, err)
}
}
}
fmt.Printf(" ✅ 异步日志处理完成\n")
}
func demonstrateLogAggregation() {
fmt.Println("\n--- 日志聚合和分析 ---")
/*
日志聚合要点:
1. 日志收集:从多个源收集日志
2. 格式标准化:统一日志格式
3. 索引和搜索:支持快速查询
4. 监控和告警:基于日志的监控
*/
// 日志聚合器
type LogAggregator struct {
entries []LogEntry
indices map[string][]int // 字段索引
stats map[string]int64 // 统计信息
mutex sync.RWMutex
}
func NewLogAggregator() *LogAggregator {
return &LogAggregator{
entries: make([]LogEntry, 0),
indices: make(map[string][]int),
stats: make(map[string]int64),
}
}
func (la *LogAggregator) AddEntry(entry LogEntry) {
la.mutex.Lock()
defer la.mutex.Unlock()
index := len(la.entries)
la.entries = append(la.entries, entry)
// 更新索引
la.updateIndex("level", entry.Level, index)
la.updateIndex("service", entry.Service, index)
la.updateIndex("environment", entry.Environment, index)
if entry.RequestID != "" {
la.updateIndex("request_id", entry.RequestID, index)
}
if entry.UserID != "" {
la.updateIndex("user_id", entry.UserID, index)
}
// 更新统计信息
la.stats["total"]++
la.stats["level_"+entry.Level]++
la.stats["service_"+entry.Service]++
}
func (la *LogAggregator) updateIndex(field, value string, index int) {
key := field + ":" + value
if _, exists := la.indices[key]; !exists {
la.indices[key] = make([]int, 0)
}
la.indices[key] = append(la.indices[key], index)
}
func (la *LogAggregator) Search(field, value string) []LogEntry {
la.mutex.RLock()
defer la.mutex.RUnlock()
key := field + ":" + value
indices, exists := la.indices[key]
if !exists {
return nil
}
results := make([]LogEntry, 0, len(indices))
for _, index := range indices {
if index < len(la.entries) {
results = append(results, la.entries[index])
}
}
return results
}
func (la *LogAggregator) GetTimeRange(start, end time.Time) []LogEntry {
la.mutex.RLock()
defer la.mutex.RUnlock()
var results []LogEntry
for _, entry := range la.entries {
if (entry.Timestamp.Equal(start) || entry.Timestamp.After(start)) &&
(entry.Timestamp.Equal(end) || entry.Timestamp.Before(end)) {
results = append(results, entry)
}
}
return results
}
func (la *LogAggregator) GetStats() map[string]int64 {
la.mutex.RLock()
defer la.mutex.RUnlock()
stats := make(map[string]int64)
for k, v := range la.stats {
stats[k] = v
}
return stats
}
func (la *LogAggregator) AnalyzeErrorPatterns() map[string]int {
la.mutex.RLock()
defer la.mutex.RUnlock()
patterns := make(map[string]int)
for _, entry := range la.entries {
if entry.Level == "ERROR" && entry.Error != nil {
patterns[entry.Error.Type]++
}
}
return patterns
}
func (la *LogAggregator) GetTopUsers(limit int) []struct {
UserID string
Count int
} {
la.mutex.RLock()
defer la.mutex.RUnlock()
userCounts := make(map[string]int)
for _, entry := range la.entries {
if entry.UserID != "" {
userCounts[entry.UserID]++
}
}
// 简单排序(实际应用中应使用更高效的排序)
type userCount struct {
UserID string
Count int
}
var users []userCount
for userID, count := range userCounts {
users = append(users, userCount{UserID: userID, Count: count})
}
// 简单冒泡排序(仅用于演示)
for i := 0; i < len(users)-1; i++ {
for j := 0; j < len(users)-i-1; j++ {
if users[j].Count < users[j+1].Count {
users[j], users[j+1] = users[j+1], users[j]
}
}
}
result := make([]struct {
UserID string
Count int
}, 0, limit)
for i := 0; i < len(users) && i < limit; i++ {
result = append(result, struct {
UserID string
Count int
}{UserID: users[i].UserID, Count: users[i].Count})
}
return result
}
// 演示日志聚合
fmt.Printf("日志聚合和分析演示:\n")
aggregator := NewLogAggregator()
// 模拟添加各种日志
baseTime := time.Now().Add(-time.Hour)
logEntries := []LogEntry{
{
Timestamp: baseTime,
Level: "INFO",
Message: "用户登录成功",
Service: "auth-service",
Environment: "production",
RequestID: "req-001",
UserID: "user-123",
},
{
Timestamp: baseTime.Add(5 * time.Minute),
Level: "ERROR",
Message: "数据库连接失败",
Service: "user-service",
Environment: "production",
RequestID: "req-002",
Error: &ErrorInfo{
Type: "DatabaseError",
Message: "connection timeout",
},
},
{
Timestamp: baseTime.Add(10 * time.Minute),
Level: "WARN",
Message: "API请求频率过高",
Service: "api-gateway",
Environment: "production",
RequestID: "req-003",
UserID: "user-123",
},
{
Timestamp: baseTime.Add(15 * time.Minute),
Level: "INFO",
Message: "订单创建成功",
Service: "order-service",
Environment: "production",
RequestID: "req-004",
UserID: "user-456",
},
{
Timestamp: baseTime.Add(20 * time.Minute),
Level: "ERROR",
Message: "支付处理失败",
Service: "payment-service",
Environment: "production",
RequestID: "req-005",
UserID: "user-789",
Error: &ErrorInfo{
Type: "PaymentError",
Message: "insufficient funds",
},
},
}
// 添加日志条目
fmt.Printf(" 📥 添加日志条目:\n")
for _, entry := range logEntries {
aggregator.AddEntry(entry)
fmt.Printf(" 添加: %s [%s] %s\n", entry.Timestamp.Format("15:04:05"), entry.Level, entry.Message)
}
// 搜索测试
fmt.Printf("\n 🔍 搜索测试:\n")
// 按级别搜索
errorLogs := aggregator.Search("level", "ERROR")
fmt.Printf(" ERROR级别日志: %d条\n", len(errorLogs))
for _, log := range errorLogs {
fmt.Printf(" - %s: %s\n", log.Service, log.Message)
}
// 按用户搜索
userLogs := aggregator.Search("user_id", "user-123")
fmt.Printf(" 用户user-123的日志: %d条\n", len(userLogs))
// 按服务搜索
serviceLogs := aggregator.Search("service", "user-service")
fmt.Printf(" user-service的日志: %d条\n", len(serviceLogs))
// 时间范围搜索
fmt.Printf("\n ⏰ 时间范围搜索:\n")
recentLogs := aggregator.GetTimeRange(baseTime.Add(10*time.Minute), baseTime.Add(25*time.Minute))
fmt.Printf(" 最近15分钟的日志: %d条\n", len(recentLogs))
// 统计分析
fmt.Printf("\n 📊 统计分析:\n")
stats := aggregator.GetStats()
for key, value := range stats {
fmt.Printf(" %s: %d\n", key, value)
}
// 错误模式分析
fmt.Printf("\n 🚨 错误模式分析:\n")
errorPatterns := aggregator.AnalyzeErrorPatterns()
for errorType, count := range errorPatterns {
fmt.Printf(" %s: %d次\n", errorType, count)
}
// 活跃用户分析
fmt.Printf("\n 👥 活跃用户分析:\n")
topUsers := aggregator.GetTopUsers(3)
for i, user := range topUsers {
fmt.Printf(" %d. %s: %d次活动\n", i+1, user.UserID, user.Count)
}
fmt.Printf("\n 📋 日志聚合最佳实践:\n")
fmt.Printf(" 1. 建立统一的日志格式和字段标准\n")
fmt.Printf(" 2. 实现高效的索引和搜索机制\n")
fmt.Printf(" 3. 提供丰富的查询和分析功能\n")
fmt.Printf(" 4. 建立基于日志的监控和告警\n")
fmt.Printf(" 5. 考虑日志的存储和归档策略\n")
}
func main() {
demonstrateLoggingManagement()
}