Go原子操作详解 - Golang并发编程面试题
Go原子操作提供了比互斥锁更轻量级的并发同步机制,通过CPU硬件指令保证操作的原子性。掌握原子操作对于编写高性能并发程序至关重要。
📋 重点面试题
面试题 1:原子操作的基础概念和使用
难度级别:⭐⭐⭐⭐⭐
考察范围:并发编程/原子操作
技术标签:atomic lock-free CAS memory ordering performance optimization
详细解答
1. 原子操作基础概念
go
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
)
func demonstrateAtomicOperations() {
fmt.Println("=== Go原子操作详解演示 ===")
/*
原子操作关键特性:
1. 原子性保证:
- 操作要么完全执行,要么完全不执行
- 不会被其他goroutine中断
- 比互斥锁开销更小
2. 内存可见性:
- 提供内存屏障功能
- 保证内存操作的顺序性
- 防止编译器和CPU重排序
3. 支持的类型:
- int32, int64, uint32, uint64
- uintptr, Pointer
- 自定义类型通过unsafe.Pointer
4. 基本操作:
- Load/Store: 原子读写
- Add: 原子加法
- CompareAndSwap: 比较并交换
- Swap: 原子交换
*/
demonstrateBasicAtomicOps()
demonstrateCASOperations()
demonstrateAtomicPointer()
demonstratePerformanceComparison()
}
func demonstrateBasicAtomicOps() {
fmt.Println("\n--- 基础原子操作 ---")
/*
基础原子操作类型:
1. Load/Store: 原子读写操作
2. Add: 原子加法运算
3. Swap: 原子值交换
4. CompareAndSwap: 条件性原子更新
*/
// 原子计数器示例
demonstrateAtomicCounter := func() {
fmt.Println("原子计数器:")
var counter int64
var wg sync.WaitGroup
numGoroutines := 10
incrementsPerGoroutine := 10000
start := time.Now()
// 启动多个goroutine并发递增计数器
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < incrementsPerGoroutine; j++ {
// 原子递增
newValue := atomic.AddInt64(&counter, 1)
// 偶尔输出当前值
if j%2000 == 0 {
fmt.Printf(" Goroutine %d: 计数器值 %d\n", id, newValue)
}
}
}(i)
}
wg.Wait()
duration := time.Since(start)
finalValue := atomic.LoadInt64(&counter)
expectedValue := int64(numGoroutines * incrementsPerGoroutine)
fmt.Printf(" 最终计数器值: %d\n", finalValue)
fmt.Printf(" 期望值: %d\n", expectedValue)
fmt.Printf(" 结果正确: %t\n", finalValue == expectedValue)
fmt.Printf(" 执行时间: %v\n", duration)
}
// 原子加法操作
demonstrateAtomicAdd := func() {
fmt.Println("\n原子加法操作:")
var sum int64
// 正数加法
result1 := atomic.AddInt64(&sum, 100)
fmt.Printf(" 加100后: %d (返回值: %d)\n", atomic.LoadInt64(&sum), result1)
// 负数加法 (实际是减法)
result2 := atomic.AddInt64(&sum, -30)
fmt.Printf(" 减30后: %d (返回值: %d)\n", atomic.LoadInt64(&sum), result2)
// 演示不同数据类型的原子操作
var uint32Val uint32 = 42
var uint64Val uint64 = 1000
atomic.AddUint32(&uint32Val, 8)
atomic.AddUint64(&uint64Val, 500)
fmt.Printf(" uint32值: %d\n", atomic.LoadUint32(&uint32Val))
fmt.Printf(" uint64值: %d\n", atomic.LoadUint64(&uint64Val))
}
// 原子存储和加载
demonstrateAtomicLoadStore := func() {
fmt.Println("\n原子加载和存储:")
var sharedValue int64
// 原子存储
atomic.StoreInt64(&sharedValue, 12345)
fmt.Printf(" 存储值: 12345\n")
// 原子加载
loadedValue := atomic.LoadInt64(&sharedValue)
fmt.Printf(" 加载值: %d\n", loadedValue)
// 演示多goroutine场景下的原子读写
var data int64
done := make(chan bool)
// 写入者
go func() {
for i := 0; i < 5; i++ {
atomic.StoreInt64(&data, int64(i*10))
fmt.Printf(" 写入: %d\n", i*10)
time.Sleep(100 * time.Millisecond)
}
done <- true
}()
// 读取者
go func() {
for i := 0; i < 10; i++ {
value := atomic.LoadInt64(&data)
fmt.Printf(" 读取: %d\n", value)
time.Sleep(50 * time.Millisecond)
}
}()
<-done
time.Sleep(100 * time.Millisecond)
}
// 原子交换操作
demonstrateAtomicSwap := func() {
fmt.Println("\n原子交换操作:")
var value int64 = 100
// 原子交换并返回旧值
oldValue := atomic.SwapInt64(&value, 200)
newValue := atomic.LoadInt64(&value)
fmt.Printf(" 交换前值: %d\n", oldValue)
fmt.Printf(" 交换后值: %d\n", newValue)
// 使用交换实现原子重置
var flag int32 = 1
for i := 0; i < 3; i++ {
if atomic.SwapInt32(&flag, 0) == 1 {
fmt.Printf(" 第%d次重置成功\n", i+1)
} else {
fmt.Printf(" 第%d次重置失败 (已经是0)\n", i+1)
}
}
}
demonstrateAtomicCounter()
demonstrateAtomicAdd()
demonstrateAtomicLoadStore()
demonstrateAtomicSwap()
}
func demonstrateCASOperations() {
fmt.Println("\n--- 比较并交换(CAS)操作 ---")
/*
CAS操作特点:
1. 条件性更新:只在当前值等于期望值时更新
2. 原子性:整个比较和交换过程是原子的
3. 返回值:返回是否成功交换的布尔值
4. 无锁编程:CAS是无锁编程的基础
*/
// 基础CAS操作
demonstrateBasicCAS := func() {
fmt.Println("基础CAS操作:")
var value int64 = 100
// 成功的CAS操作
success1 := atomic.CompareAndSwapInt64(&value, 100, 200)
fmt.Printf(" CAS(100->200): 成功=%t, 当前值=%d\n",
success1, atomic.LoadInt64(&value))
// 失败的CAS操作
success2 := atomic.CompareAndSwapInt64(&value, 100, 300)
fmt.Printf(" CAS(100->300): 成功=%t, 当前值=%d\n",
success2, atomic.LoadInt64(&value))
// 再次成功的CAS操作
success3 := atomic.CompareAndSwapInt64(&value, 200, 300)
fmt.Printf(" CAS(200->300): 成功=%t, 当前值=%d\n",
success3, atomic.LoadInt64(&value))
}
// CAS实现无锁栈
demonstrateLockFreeStack := func() {
fmt.Println("\n无锁栈实现:")
type LockFreeStack struct {
head unsafe.Pointer // *node
}
type node struct {
data interface{}
next unsafe.Pointer // *node
}
func NewLockFreeStack() *LockFreeStack {
return &LockFreeStack{}
}
func (s *LockFreeStack) Push(data interface{}) {
newNode := &node{data: data}
for {
head := atomic.LoadPointer(&s.head)
newNode.next = head
if atomic.CompareAndSwapPointer(&s.head, head, unsafe.Pointer(newNode)) {
break
}
// CAS失败,重试
}
}
func (s *LockFreeStack) Pop() interface{} {
for {
head := atomic.LoadPointer(&s.head)
if head == nil {
return nil
}
headNode := (*node)(head)
next := atomic.LoadPointer(&headNode.next)
if atomic.CompareAndSwapPointer(&s.head, head, next) {
return headNode.data
}
// CAS失败,重试
}
}
func (s *LockFreeStack) IsEmpty() bool {
return atomic.LoadPointer(&s.head) == nil
}
// 测试无锁栈
stack := NewLockFreeStack()
var wg sync.WaitGroup
// 启动多个生产者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
data := fmt.Sprintf("数据_%d_%d", id, j)
stack.Push(data)
fmt.Printf(" 生产者%d推入: %s\n", id, data)
time.Sleep(10 * time.Millisecond)
}
}(i)
}
// 启动多个消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 7; j++ {
if data := stack.Pop(); data != nil {
fmt.Printf(" 消费者%d弹出: %s\n", id, data)
} else {
fmt.Printf(" 消费者%d弹出: 栈为空\n", id)
}
time.Sleep(15 * time.Millisecond)
}
}(i)
}
wg.Wait()
// 检查剩余元素
fmt.Printf(" 栈是否为空: %t\n", stack.IsEmpty())
}
// CAS实现原子布尔值
demonstrateAtomicBool := func() {
fmt.Println("\n原子布尔值:")
type AtomicBool struct {
flag int32
}
func (ab *AtomicBool) Set() bool {
return atomic.SwapInt32(&ab.flag, 1) == 0
}
func (ab *AtomicBool) Unset() bool {
return atomic.SwapInt32(&ab.flag, 0) == 1
}
func (ab *AtomicBool) IsSet() bool {
return atomic.LoadInt32(&ab.flag) == 1
}
func (ab *AtomicBool) CompareAndSet(old, new bool) bool {
oldVal := int32(0)
if old {
oldVal = 1
}
newVal := int32(0)
if new {
newVal = 1
}
return atomic.CompareAndSwapInt32(&ab.flag, oldVal, newVal)
}
// 测试原子布尔值
var flag AtomicBool
fmt.Printf(" 初始状态: %t\n", flag.IsSet())
success1 := flag.Set()
fmt.Printf(" 设置标志: 成功=%t, 当前值=%t\n", success1, flag.IsSet())
success2 := flag.Set()
fmt.Printf(" 再次设置: 成功=%t, 当前值=%t\n", success2, flag.IsSet())
success3 := flag.CompareAndSet(true, false)
fmt.Printf(" 条件重置: 成功=%t, 当前值=%t\n", success3, flag.IsSet())
}
demonstrateBasicCAS()
demonstrateLockFreeStack()
demonstrateAtomicBool()
}
func demonstrateAtomicPointer() {
fmt.Println("\n--- 原子指针操作 ---")
/*
原子指针操作:
1. unsafe.Pointer: 通用指针类型
2. 原子操作: LoadPointer, StorePointer, SwapPointer, CompareAndSwapPointer
3. 应用场景: 配置更新、缓存实现、无锁数据结构
4. 安全考虑: 内存管理、垃圾回收影响
*/
// 原子配置更新
demonstrateAtomicConfig := func() {
fmt.Println("原子配置更新:")
type Config struct {
Host string
Port int
Timeout time.Duration
Features map[string]bool
}
type AtomicConfig struct {
ptr unsafe.Pointer // *Config
}
func NewAtomicConfig(config *Config) *AtomicConfig {
ac := &AtomicConfig{}
atomic.StorePointer(&ac.ptr, unsafe.Pointer(config))
return ac
}
func (ac *AtomicConfig) Load() *Config {
return (*Config)(atomic.LoadPointer(&ac.ptr))
}
func (ac *AtomicConfig) Store(config *Config) {
atomic.StorePointer(&ac.ptr, unsafe.Pointer(config))
}
func (ac *AtomicConfig) CompareAndSwap(old, new *Config) bool {
return atomic.CompareAndSwapPointer(&ac.ptr,
unsafe.Pointer(old), unsafe.Pointer(new))
}
// 测试原子配置
initialConfig := &Config{
Host: "localhost",
Port: 8080,
Timeout: 30 * time.Second,
Features: map[string]bool{
"cache": true,
"logging": false,
},
}
atomicConfig := NewAtomicConfig(initialConfig)
fmt.Printf(" 初始配置: %+v\n", atomicConfig.Load())
// 模拟配置热更新
go func() {
time.Sleep(100 * time.Millisecond)
newConfig := &Config{
Host: "0.0.0.0",
Port: 9090,
Timeout: 60 * time.Second,
Features: map[string]bool{
"cache": true,
"logging": true,
"metrics": true,
},
}
atomicConfig.Store(newConfig)
fmt.Printf(" 更新配置: %+v\n", newConfig)
}()
// 模拟配置读取
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
config := atomicConfig.Load()
fmt.Printf(" 读取器%d: Host=%s, Port=%d\n",
id, config.Host, config.Port)
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
}
// 原子缓存实现
demonstrateAtomicCache := func() {
fmt.Println("\n原子缓存实现:")
type CacheEntry struct {
key string
value interface{}
timestamp time.Time
}
type AtomicCache struct {
entries unsafe.Pointer // *[]CacheEntry
mutex sync.RWMutex // 用于写操作的保护
}
func NewAtomicCache() *AtomicCache {
initialEntries := make([]CacheEntry, 0)
ac := &AtomicCache{}
atomic.StorePointer(&ac.entries, unsafe.Pointer(&initialEntries))
return ac
}
func (ac *AtomicCache) Get(key string) (interface{}, bool) {
entries := *(*[]CacheEntry)(atomic.LoadPointer(&ac.entries))
for _, entry := range entries {
if entry.key == key {
return entry.value, true
}
}
return nil, false
}
func (ac *AtomicCache) Set(key string, value interface{}) {
ac.mutex.Lock()
defer ac.mutex.Unlock()
oldEntries := *(*[]CacheEntry)(atomic.LoadPointer(&ac.entries))
newEntries := make([]CacheEntry, 0, len(oldEntries)+1)
// 复制现有条目,跳过同key的条目
for _, entry := range oldEntries {
if entry.key != key {
newEntries = append(newEntries, entry)
}
}
// 添加新条目
newEntries = append(newEntries, CacheEntry{
key: key,
value: value,
timestamp: time.Now(),
})
atomic.StorePointer(&ac.entries, unsafe.Pointer(&newEntries))
}
func (ac *AtomicCache) Len() int {
entries := *(*[]CacheEntry)(atomic.LoadPointer(&ac.entries))
return len(entries)
}
// 测试原子缓存
cache := NewAtomicCache()
// 并发写入
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
key := fmt.Sprintf("key_%d_%d", id, j)
value := fmt.Sprintf("value_%d_%d", id, j)
cache.Set(key, value)
fmt.Printf(" 写入: %s = %s\n", key, value)
time.Sleep(10 * time.Millisecond)
}
}(i)
}
// 并发读取
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(50 * time.Millisecond) // 等待一些数据写入
for j := 0; j < 5; j++ {
key := fmt.Sprintf("key_%d_%d", (id+j)%3, j%3)
if value, found := cache.Get(key); found {
fmt.Printf(" 读取%d: %s = %s\n", id, key, value)
} else {
fmt.Printf(" 读取%d: %s 未找到\n", id, key)
}
time.Sleep(20 * time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Printf(" 最终缓存大小: %d\n", cache.Len())
}
demonstrateAtomicConfig()
demonstrateAtomicCache()
}
func demonstratePerformanceComparison() {
fmt.Println("\n--- 性能比较:原子操作 vs 互斥锁 ---")
/*
性能比较要点:
1. 原子操作优势:
- 无锁,避免goroutine阻塞
- 更低的延迟
- 更高的并发性能
2. 互斥锁优势:
- 支持复杂的临界区
- 更容易理解和使用
- 支持读写锁等变种
3. 选择考虑:
- 操作复杂度
- 并发程度
- 性能要求
*/
const iterations = 1000000
const numGoroutines = 10
// 原子操作性能测试
testAtomicPerformance := func() time.Duration {
var counter int64
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations/numGoroutines; j++ {
atomic.AddInt64(&counter, 1)
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf(" 原子操作结果: 计数器=%d, 耗时=%v\n",
atomic.LoadInt64(&counter), duration)
return duration
}
// 互斥锁性能测试
testMutexPerformance := func() time.Duration {
var counter int64
var mutex sync.Mutex
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations/numGoroutines; j++ {
mutex.Lock()
counter++
mutex.Unlock()
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf(" 互斥锁结果: 计数器=%d, 耗时=%v\n", counter, duration)
return duration
}
// 读写锁性能测试 (读多写少场景)
testRWMutexPerformance := func() time.Duration {
var counter int64
var rwMutex sync.RWMutex
var wg sync.WaitGroup
start := time.Now()
// 写操作goroutine
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations/100; j++ { // 少量写操作
rwMutex.Lock()
counter++
rwMutex.Unlock()
}
}()
// 读操作goroutine
for i := 0; i < numGoroutines-1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations*99/100/(numGoroutines-1); j++ { // 大量读操作
rwMutex.RLock()
_ = counter
rwMutex.RUnlock()
}
}()
}
wg.Wait()
duration := time.Since(start)
fmt.Printf(" 读写锁结果: 计数器=%d, 耗时=%v\n", counter, duration)
return duration
}
// 原子指针性能测试
testAtomicPointerPerformance := func() time.Duration {
type Data struct {
value int64
}
var ptr unsafe.Pointer
initialData := &Data{value: 0}
atomic.StorePointer(&ptr, unsafe.Pointer(initialData))
var wg sync.WaitGroup
start := time.Now()
// 写操作
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations/1000; j++ {
newData := &Data{value: int64(j)}
atomic.StorePointer(&ptr, unsafe.Pointer(newData))
}
}()
// 读操作
for i := 0; i < numGoroutines-1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < iterations*999/1000/(numGoroutines-1); j++ {
data := (*Data)(atomic.LoadPointer(&ptr))
_ = data.value
}
}()
}
wg.Wait()
duration := time.Since(start)
finalData := (*Data)(atomic.LoadPointer(&ptr))
fmt.Printf(" 原子指针结果: 最终值=%d, 耗时=%v\n", finalData.value, duration)
return duration
}
fmt.Printf("性能测试 (%d次操作, %d个goroutine):\n", iterations, numGoroutines)
atomicTime := testAtomicPerformance()
mutexTime := testMutexPerformance()
rwMutexTime := testRWMutexPerformance()
atomicPtrTime := testAtomicPointerPerformance()
fmt.Printf("\n性能比较:\n")
fmt.Printf(" 原子操作 vs 互斥锁: %.2fx 更快\n",
float64(mutexTime)/float64(atomicTime))
fmt.Printf(" 原子操作 vs 读写锁: %.2fx 更快\n",
float64(rwMutexTime)/float64(atomicTime))
fmt.Printf(" 原子指针 vs 原子操作: %.2fx\n",
float64(atomicPtrTime)/float64(atomicTime))
// 内存使用比较
demonstrateMemoryUsage := func() {
fmt.Printf("\n内存使用对比:\n")
var m1, m2 runtime.MemStats
// 原子操作内存使用
runtime.ReadMemStats(&m1)
var atomicVars [1000]int64
for i := range atomicVars {
atomic.StoreInt64(&atomicVars[i], int64(i))
}
runtime.ReadMemStats(&m2)
atomicMem := m2.Alloc - m1.Alloc
// 互斥锁内存使用
runtime.ReadMemStats(&m1)
type MutexCounter struct {
value int64
mutex sync.Mutex
}
mutexVars := make([]MutexCounter, 1000)
for i := range mutexVars {
mutexVars[i].mutex.Lock()
mutexVars[i].value = int64(i)
mutexVars[i].mutex.Unlock()
}
runtime.ReadMemStats(&m2)
mutexMem := m2.Alloc - m1.Alloc
fmt.Printf(" 原子变量内存: %d bytes\n", atomicMem)
fmt.Printf(" 互斥锁内存: %d bytes\n", mutexMem)
fmt.Printf(" 内存比率: %.2fx\n", float64(mutexMem)/float64(atomicMem))
}
demonstrateMemoryUsage()
}go
func demonstrateAdvancedAtomicPatterns() {
fmt.Println("\n=== 高级原子操作模式 ===")
/*
高级原子操作模式:
1. ABA问题解决:
- 版本号/时间戳机制
- 指针标记技术
- 双字CAS操作
2. 内存序模型:
- 获取-释放语义
- 顺序一致性
- 内存屏障效果
3. 无锁数据结构:
- 无锁队列
- 无锁哈希表
- 无锁链表
4. 等待无关算法:
- 无阻塞操作
- 进度保证
- 公平性考虑
*/
demonstrateABAProblem()
demonstrateLockFreeQueue()
demonstrateAtomicReference()
demonstrateWaitFreeAlgorithms()
}
func demonstrateABAProblem() {
fmt.Println("\n--- ABA问题和解决方案 ---")
/*
ABA问题:
1. 问题描述:
- 值从A变为B再变回A
- CAS操作可能错误地认为值未改变
- 导致逻辑错误
2. 解决方案:
- 版本号机制
- 指针标记
- 双字CAS
*/
// 演示ABA问题
demonstrateABAProblemScenario := func() {
fmt.Println("ABA问题演示:")
type Node struct {
value int
next unsafe.Pointer // *Node
}
// 创建链表: A -> B -> C
nodeC := &Node{value: 3, next: nil}
nodeB := &Node{value: 2, next: unsafe.Pointer(nodeC)}
nodeA := &Node{value: 1, next: unsafe.Pointer(nodeB)}
var head unsafe.Pointer
atomic.StorePointer(&head, unsafe.Pointer(nodeA))
fmt.Printf(" 初始链表: %d -> %d -> %d\n",
nodeA.value, nodeB.value, nodeC.value)
// 模拟ABA场景
var wg sync.WaitGroup
// Goroutine 1: 尝试删除A
wg.Add(1)
go func() {
defer wg.Done()
// 读取当前头节点
currentHead := atomic.LoadPointer(&head)
currentNode := (*Node)(currentHead)
if currentNode != nil {
nextNode := (*Node)(atomic.LoadPointer(¤tNode.next))
fmt.Printf(" G1: 准备将头节点从 %d 改为 %d\n",
currentNode.value, nextNode.value)
// 模拟延迟
time.Sleep(100 * time.Millisecond)
// 尝试CAS操作
if atomic.CompareAndSwapPointer(&head, currentHead, currentNode.next) {
fmt.Printf(" G1: 成功删除节点 %d\n", currentNode.value)
} else {
fmt.Printf(" G1: 删除失败,头节点已改变\n")
}
}
}()
// Goroutine 2: 删除A,然后重新插入A
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond)
// 删除A
currentHead := atomic.LoadPointer(&head)
if currentHead != nil {
currentNode := (*Node)(currentHead)
next := atomic.LoadPointer(¤tNode.next)
if atomic.CompareAndSwapPointer(&head, currentHead, next) {
fmt.Printf(" G2: 删除了节点 %d\n", currentNode.value)
// 重新插入A (制造ABA)
time.Sleep(20 * time.Millisecond)
newHead := atomic.LoadPointer(&head)
currentNode.next = newHead
atomic.StorePointer(&head, unsafe.Pointer(currentNode))
fmt.Printf(" G2: 重新插入了节点 %d\n", currentNode.value)
}
}
}()
wg.Wait()
// 检查最终状态
finalHead := (*Node)(atomic.LoadPointer(&head))
if finalHead != nil {
fmt.Printf(" 最终头节点: %d\n", finalHead.value)
}
}
// 使用版本号解决ABA问题
demonstrateVersionedPointer := func() {
fmt.Println("\n版本号解决ABA问题:")
type VersionedPointer struct {
pointer unsafe.Pointer
version uint64
}
type SafeNode struct {
value int
next VersionedPointer
}
type SafeStack struct {
head VersionedPointer
}
func (s *SafeStack) Push(value int) {
newNode := &SafeNode{value: value}
for {
currentHead := s.head
newNode.next = currentHead
newHead := VersionedPointer{
pointer: unsafe.Pointer(newNode),
version: currentHead.version + 1,
}
// 使用双字CAS (模拟)
if s.compareAndSwapVersioned(&s.head, currentHead, newHead) {
break
}
}
}
func (s *SafeStack) Pop() (int, bool) {
for {
currentHead := s.head
if currentHead.pointer == nil {
return 0, false
}
headNode := (*SafeNode)(currentHead.pointer)
nextHead := headNode.next
nextHead.version = currentHead.version + 1
if s.compareAndSwapVersioned(&s.head, currentHead, nextHead) {
return headNode.value, true
}
}
}
func (s *SafeStack) compareAndSwapVersioned(addr *VersionedPointer, old, new VersionedPointer) bool {
// 在真实实现中,这需要硬件支持的双字CAS
// 这里简化为两个独立的CAS操作
return atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&addr.pointer)), old.pointer, new.pointer) &&
atomic.CompareAndSwapUint64(&addr.version, old.version, new.version)
}
// 测试版本化栈
stack := &SafeStack{}
fmt.Printf(" 测试版本化栈:\n")
// 推入元素
for i := 1; i <= 3; i++ {
stack.Push(i)
fmt.Printf(" 推入: %d (版本: %d)\n", i, stack.head.version)
}
// 弹出元素
for i := 0; i < 4; i++ {
if value, ok := stack.Pop(); ok {
fmt.Printf(" 弹出: %d (版本: %d)\n", value, stack.head.version)
} else {
fmt.Printf(" 弹出: 栈为空 (版本: %d)\n", stack.head.version)
}
}
}
demonstrateABAProblemScenario()
demonstrateVersionedPointer()
}
func demonstrateLockFreeQueue() {
fmt.Println("\n--- 无锁队列实现 ---")
/*
无锁队列特点:
1. FIFO语义:先进先出
2. 并发安全:多生产者多消费者
3. 无阻塞:操作不会阻塞线程
4. ABA安全:避免ABA问题
*/
type QueueNode struct {
data interface{}
next unsafe.Pointer // *QueueNode
}
type LockFreeQueue struct {
head unsafe.Pointer // *QueueNode
tail unsafe.Pointer // *QueueNode
}
func NewLockFreeQueue() *LockFreeQueue {
dummy := &QueueNode{}
q := &LockFreeQueue{
head: unsafe.Pointer(dummy),
tail: unsafe.Pointer(dummy),
}
return q
}
func (q *LockFreeQueue) Enqueue(data interface{}) {
newNode := &QueueNode{data: data}
for {
tail := atomic.LoadPointer(&q.tail)
tailNode := (*QueueNode)(tail)
next := atomic.LoadPointer(&tailNode.next)
// 检查tail是否仍然指向队列尾部
if tail == atomic.LoadPointer(&q.tail) {
if next == nil {
// tail真的指向尾部,尝试链接新节点
if atomic.CompareAndSwapPointer(&tailNode.next, nil, unsafe.Pointer(newNode)) {
break
}
} else {
// tail落后了,尝试推进tail
atomic.CompareAndSwapPointer(&q.tail, tail, next)
}
}
}
// 推进tail指针
atomic.CompareAndSwapPointer(&q.tail, atomic.LoadPointer(&q.tail), unsafe.Pointer(newNode))
}
func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
for {
head := atomic.LoadPointer(&q.head)
tail := atomic.LoadPointer(&q.tail)
headNode := (*QueueNode)(head)
next := atomic.LoadPointer(&headNode.next)
// 检查head是否仍然指向队列头部
if head == atomic.LoadPointer(&q.head) {
if head == tail {
if next == nil {
// 队列为空
return nil, false
}
// tail落后了,尝试推进tail
atomic.CompareAndSwapPointer(&q.tail, tail, next)
} else {
if next == nil {
// 这种情况不应该发生
continue
}
// 读取数据
nextNode := (*QueueNode)(next)
data := nextNode.data
// 尝试移动head
if atomic.CompareAndSwapPointer(&q.head, head, next) {
return data, true
}
}
}
}
}
func (q *LockFreeQueue) IsEmpty() bool {
head := atomic.LoadPointer(&q.head)
tail := atomic.LoadPointer(&q.tail)
headNode := (*QueueNode)(head)
next := atomic.LoadPointer(&headNode.next)
return head == tail && next == nil
}
// 测试无锁队列
queue := NewLockFreeQueue()
var wg sync.WaitGroup
fmt.Printf("测试无锁队列:\n")
// 启动多个生产者
producers := 3
itemsPerProducer := 5
for i := 0; i < producers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < itemsPerProducer; j++ {
data := fmt.Sprintf("数据_%d_%d", id, j)
queue.Enqueue(data)
fmt.Printf(" 生产者%d入队: %s\n", id, data)
time.Sleep(10 * time.Millisecond)
}
}(i)
}
// 启动多个消费者
consumers := 2
expectedItems := producers * itemsPerProducer
for i := 0; i < consumers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
consumeCount := 0
for consumeCount < expectedItems/consumers+2 { // 多尝试几次
if data, ok := queue.Dequeue(); ok {
fmt.Printf(" 消费者%d出队: %s\n", id, data)
consumeCount++
} else {
if consumeCount >= expectedItems/consumers {
break
}
time.Sleep(5 * time.Millisecond)
}
}
}(i)
}
wg.Wait()
fmt.Printf(" 队列是否为空: %t\n", queue.IsEmpty())
}
func demonstrateAtomicReference() {
fmt.Println("\n--- 原子引用和标记指针 ---")
/*
原子引用技术:
1. 标记指针:在指针中嵌入标记位
2. 引用计数:自动内存管理
3. 弱引用:避免循环引用
4. 世代管理:内存回收优化
*/
// 标记指针实现
demonstrateMarkedPointer := func() {
fmt.Println("标记指针:")
const markBit = 1
type MarkedPointer struct {
ptr uintptr
}
func NewMarkedPointer(ptr unsafe.Pointer, marked bool) MarkedPointer {
addr := uintptr(ptr)
if marked {
addr |= markBit
}
return MarkedPointer{ptr: addr}
}
func (mp *MarkedPointer) GetPointer() unsafe.Pointer {
return unsafe.Pointer(mp.ptr &^ markBit)
}
func (mp *MarkedPointer) IsMarked() bool {
return (mp.ptr & markBit) != 0
}
func (mp *MarkedPointer) CompareAndSwap(old, new MarkedPointer) bool {
return atomic.CompareAndSwapUintptr(&mp.ptr, old.ptr, new.ptr)
}
// 测试标记指针
type TestData struct {
value int
}
data1 := &TestData{value: 42}
data2 := &TestData{value: 84}
var markedPtr MarkedPointer
// 设置未标记的指针
markedPtr = NewMarkedPointer(unsafe.Pointer(data1), false)
fmt.Printf(" 初始: 指针=%p, 标记=%t, 值=%d\n",
markedPtr.GetPointer(), markedPtr.IsMarked(),
(*TestData)(markedPtr.GetPointer()).value)
// 设置标记的指针
newMarked := NewMarkedPointer(unsafe.Pointer(data2), true)
success := markedPtr.CompareAndSwap(markedPtr, newMarked)
fmt.Printf(" 更新: 成功=%t, 指针=%p, 标记=%t, 值=%d\n",
success, newMarked.GetPointer(), newMarked.IsMarked(),
(*TestData)(newMarked.GetPointer()).value)
}
// 原子引用计数
demonstrateAtomicRefCount := func() {
fmt.Println("\n原子引用计数:")
type RefCountedData struct {
value int
refs int64
}
func (rcd *RefCountedData) AddRef() {
atomic.AddInt64(&rcd.refs, 1)
}
func (rcd *RefCountedData) Release() bool {
if atomic.AddInt64(&rcd.refs, -1) == 0 {
return true // 可以释放
}
return false
}
func (rcd *RefCountedData) RefCount() int64 {
return atomic.LoadInt64(&rcd.refs)
}
type AtomicRef struct {
ptr unsafe.Pointer // *RefCountedData
}
func (ar *AtomicRef) Load() *RefCountedData {
for {
ptr := atomic.LoadPointer(&ar.ptr)
if ptr == nil {
return nil
}
data := (*RefCountedData)(ptr)
data.AddRef()
// 检查指针是否仍然有效
if atomic.LoadPointer(&ar.ptr) == ptr {
return data
}
// 指针已改变,释放引用并重试
data.Release()
}
}
func (ar *AtomicRef) Store(data *RefCountedData) {
if data != nil {
data.AddRef()
}
old := atomic.SwapPointer(&ar.ptr, unsafe.Pointer(data))
if old != nil {
oldData := (*RefCountedData)(old)
if oldData.Release() {
fmt.Printf(" 释放了旧对象 (值: %d)\n", oldData.value)
}
}
}
// 测试原子引用计数
var atomicRef AtomicRef
// 创建数据
data1 := &RefCountedData{value: 100, refs: 0}
atomicRef.Store(data1)
fmt.Printf(" 存储数据1: 值=%d, 引用数=%d\n",
data1.value, data1.RefCount())
// 多个引用
ref1 := atomicRef.Load()
ref2 := atomicRef.Load()
fmt.Printf(" 加载后引用数: %d\n", data1.RefCount())
// 更新数据
data2 := &RefCountedData{value: 200, refs: 0}
atomicRef.Store(data2)
fmt.Printf(" 存储数据2: 值=%d, 引用数=%d\n",
data2.value, data2.RefCount())
// 释放引用
if ref1.Release() {
fmt.Printf(" ref1释放: 对象可回收\n")
}
if ref2.Release() {
fmt.Printf(" ref2释放: 对象可回收\n")
}
fmt.Printf(" 最终引用数: data1=%d, data2=%d\n",
data1.RefCount(), data2.RefCount())
}
demonstrateMarkedPointer()
demonstrateAtomicRefCount()
}
func demonstrateWaitFreeAlgorithms() {
fmt.Println("\n--- 等待无关算法 ---")
/*
等待无关算法特点:
1. 进度保证:每个线程都能在有限步骤内完成操作
2. 无阻塞:没有线程会被无限期阻塞
3. 公平性:避免饥饿问题
4. 性能:通常比无锁算法有更好的延迟保证
*/
// 等待无关计数器
demonstrateWaitFreeCounter := func() {
fmt.Println("等待无关计数器:")
type WaitFreeCounter struct {
counters []int64 // 每个线程的私有计数器
numThreads int
}
func NewWaitFreeCounter(numThreads int) *WaitFreeCounter {
return &WaitFreeCounter{
counters: make([]int64, numThreads),
numThreads: numThreads,
}
}
func (wfc *WaitFreeCounter) Increment(threadID int) {
atomic.AddInt64(&wfc.counters[threadID], 1)
}
func (wfc *WaitFreeCounter) Get() int64 {
total := int64(0)
for i := 0; i < wfc.numThreads; i++ {
total += atomic.LoadInt64(&wfc.counters[i])
}
return total
}
func (wfc *WaitFreeCounter) GetThreadCounter(threadID int) int64 {
return atomic.LoadInt64(&wfc.counters[threadID])
}
// 测试等待无关计数器
numThreads := 4
counter := NewWaitFreeCounter(numThreads)
var wg sync.WaitGroup
fmt.Printf(" 启动 %d 个线程:\n", numThreads)
for i := 0; i < numThreads; i++ {
wg.Add(1)
go func(threadID int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter.Increment(threadID)
if j%200 == 0 {
fmt.Printf(" 线程%d: 本地计数=%d, 全局计数=%d\n",
threadID, counter.GetThreadCounter(threadID), counter.Get())
}
}
}(i)
}
wg.Wait()
fmt.Printf(" 最终计数: %d\n", counter.Get())
for i := 0; i < numThreads; i++ {
fmt.Printf(" 线程%d贡献: %d\n", i, counter.GetThreadCounter(i))
}
}
// 等待无关读写器
demonstrateWaitFreeSnapshot := func() {
fmt.Println("\n等待无关快照:")
type WaitFreeSnapshot struct {
values []int64
versions []int64
size int
}
func NewWaitFreeSnapshot(size int) *WaitFreeSnapshot {
return &WaitFreeSnapshot{
values: make([]int64, size),
versions: make([]int64, size),
size: size,
}
}
func (wfs *WaitFreeSnapshot) Update(index int, value int64) {
atomic.StoreInt64(&wfs.values[index], value)
atomic.AddInt64(&wfs.versions[index], 1)
}
func (wfs *WaitFreeSnapshot) Scan() []int64 {
for {
// 第一次扫描:记录版本号和值
oldVersions := make([]int64, wfs.size)
values := make([]int64, wfs.size)
for i := 0; i < wfs.size; i++ {
oldVersions[i] = atomic.LoadInt64(&wfs.versions[i])
values[i] = atomic.LoadInt64(&wfs.values[i])
}
// 第二次扫描:检查版本号是否改变
consistent := true
for i := 0; i < wfs.size; i++ {
newVersion := atomic.LoadInt64(&wfs.versions[i])
if newVersion != oldVersions[i] {
consistent = false
break
}
}
if consistent {
return values
}
// 版本号改变了,重试
runtime.Gosched() // 让出CPU给其他goroutine
}
}
// 测试等待无关快照
snapshot := NewWaitFreeSnapshot(3)
var wg sync.WaitGroup
// 启动更新器
for i := 0; i < 3; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
for j := 0; j < 50; j++ {
value := int64(index*100 + j)
snapshot.Update(index, value)
fmt.Printf(" 更新器%d: 设置值为 %d\n", index, value)
time.Sleep(time.Millisecond)
}
}(i)
}
// 启动读取器
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
values := snapshot.Scan()
fmt.Printf(" 读取器: 快照 %v\n", values)
time.Sleep(5 * time.Millisecond)
}
}()
wg.Wait()
finalSnapshot := snapshot.Scan()
fmt.Printf(" 最终快照: %v\n", finalSnapshot)
}
demonstrateWaitFreeCounter()
demonstrateWaitFreeSnapshot()
}
func main() {
demonstrateAtomicOperations()
demonstrateAdvancedAtomicPatterns()
}🎯 核心知识点总结
原子操作基础要点
- 原子性保证: 操作要么完全执行,要么完全不执行
- 内存可见性: 提供内存屏障,保证操作顺序
- 支持类型: int32/64、uint32/64、uintptr、unsafe.Pointer
- 基本操作: Load/Store、Add、Swap、CompareAndSwap
CAS操作要点
- 条件更新: 只在当前值等于期望值时更新
- 原子性: 比较和交换是原子操作
- 无锁编程: CAS是无锁编程的基础
- ABA问题: 需要版本号等机制解决
性能优化要点
- 低开销: 比互斥锁开销更小
- 高并发: 避免线程阻塞,支持更高并发
- 缓存友好: 减少内存竞争和缓存失效
- 适用场景: 简单操作、高频访问、读多写少
高级模式要点
- 无锁数据结构: 栈、队列、链表等无锁实现
- 内存管理: 引用计数、标记指针等技术
- 等待无关: 提供更强的进度保证
- 错误处理: ABA问题、内存序等高级话题
🔍 面试准备建议
- 理解原理: 深入理解原子操作的硬件基础和实现原理
- 掌握API: 熟练使用sync/atomic包的各种操作
- 实际应用: 在项目中合理使用原子操作替代锁
- 性能分析: 了解原子操作的性能特征和适用场景
- 高级技术: 学习无锁编程和等待无关算法的设计
