Skip to content

分布式事务解决方案

一、问题描述

1.1 业务背景

在分布式系统中,一个业务操作往往需要跨多个服务、多个数据库,如何保证数据一致性是核心难题。

典型场景

  • 电商下单:扣库存(库存服务)、创建订单(订单服务)、扣款(支付服务)
  • 转账:A账户扣款、B账户加款
  • 积分兑换:扣积分、发放优惠券、增加兑换记录

传统单体架构

sql
BEGIN TRANSACTION;
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001;
INSERT INTO orders (user_id, product_id) VALUES (1, 1001);
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
COMMIT;

✅ 数据库本地事务ACID保证

分布式架构

库存服务(DB1) → 扣库存
订单服务(DB2) → 创建订单  
支付服务(DB3) → 扣款

❌ 跨服务、跨数据库,无法使用本地事务

1.2 核心挑战

原子性(Atomicity)

  • 要么全部成功,要么全部失败
  • 部分成功导致数据不一致

一致性(Consistency)

  • 不同服务之间数据要保持一致
  • 不能出现库存扣了但订单没创建

隔离性(Isolation)

  • 并发事务不能互相干扰

持久性(Durability)

  • 事务提交后数据永久保存

1.3 CAP理论

分布式系统无法同时满足:

  • C (Consistency):一致性
  • A (Availability):可用性
  • P (Partition Tolerance):分区容错性

常见选择

  • CP:金融系统(牺牲可用性保证一致性)
  • AP:社交系统(牺牲强一致性保证可用性)

BASE理论(最终一致性):

  • BA (Basically Available):基本可用
  • S (Soft State):软状态(中间状态)
  • E (Eventually Consistent):最终一致性

1.4 面试考察点

  • 方案选择:2PC vs TCC vs SAGA vs 本地消息表
  • 原理理解:每种方案的工作原理
  • 优缺点对比:各方案的适用场景
  • 工程实践:如何在实际项目中落地
  • Seata框架:AT/TCC/SAGA模式源码

二、分布式事务解决方案

2.1 方案对比

方案类型一致性性能复杂度适用场景
2PC强一致金融、支付
3PC强一致很少使用
TCC最终一致最终电商、订单 ⭐
SAGA最终一致最终长事务、流程
本地消息表最终一致最终异步场景 ⭐
最大努力通知最终一致最终支付回调

三、2PC(两阶段提交)

3.1 原理

Two-Phase Commit:协调者-参与者模型

阶段一:准备阶段(Prepare)

mermaid
sequenceDiagram
    participant C as 协调者
    participant P1 as 参与者1
    participant P2 as 参与者2
    participant P3 as 参与者3
    
    C->>P1: Prepare(准备事务)
    C->>P2: Prepare
    C->>P3: Prepare
    
    P1->>C: Yes(准备就绪)
    P2->>C: Yes
    P3->>C: Yes

阶段二:提交阶段(Commit)

mermaid
sequenceDiagram
    participant C as 协调者
    participant P1 as 参与者1
    participant P2 as 参与者2
    participant P3 as 参与者3
    
    C->>P1: Commit(提交事务)
    C->>P2: Commit
    C->>P3: Commit
    
    P1->>C: ACK
    P2->>C: ACK
    P3->>C: ACK

3.2 实现

go
package twopc

import (
    "context"
    "fmt"
)

// Coordinator 协调者
type Coordinator struct {
    participants []Participant
}

// Participant 参与者接口
type Participant interface {
    Prepare(ctx context.Context, txID string) error
    Commit(ctx context.Context, txID string) error
    Rollback(ctx context.Context, txID string) error
}

// Execute 执行2PC事务
func (c *Coordinator) Execute(ctx context.Context, txID string) error {
    // 阶段一:准备
    err := c.preparePhase(ctx, txID)
    if err != nil {
        // 准备失败,回滚
        c.rollbackPhase(ctx, txID)
        return fmt.Errorf("prepare phase failed: %w", err)
    }
    
    // 阶段二:提交
    err = c.commitPhase(ctx, txID)
    if err != nil {
        // 提交失败(极少发生)
        return fmt.Errorf("commit phase failed: %w", err)
    }
    
    return nil
}

// preparePhase 准备阶段
func (c *Coordinator) preparePhase(ctx context.Context, txID string) error {
    for _, p := range c.participants {
        err := p.Prepare(ctx, txID)
        if err != nil {
            return err
        }
    }
    return nil
}

// commitPhase 提交阶段
func (c *Coordinator) commitPhase(ctx context.Context, txID string) error {
    for _, p := range c.participants {
        err := p.Commit(ctx, txID)
        if err != nil {
            return err
        }
    }
    return nil
}

// rollbackPhase 回滚阶段
func (c *Coordinator) rollbackPhase(ctx context.Context, txID string) {
    for _, p := range c.participants {
        p.Rollback(ctx, txID)
    }
}

// 示例:库存服务参与者
type InventoryParticipant struct {
    db *sql.DB
}

func (ip *InventoryParticipant) Prepare(ctx context.Context, txID string) error {
    // 开启事务,锁定资源
    tx, err := ip.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    
    // 检查库存并锁定
    var stock int
    err = tx.QueryRow("SELECT stock FROM inventory WHERE product_id = ? FOR UPDATE", 1001).Scan(&stock)
    if err != nil {
        tx.Rollback()
        return err
    }
    
    if stock < 1 {
        tx.Rollback()
        return fmt.Errorf("insufficient stock")
    }
    
    // 预扣库存
    _, err = tx.Exec("UPDATE inventory SET stock = stock - 1 WHERE product_id = ?", 1001)
    if err != nil {
        tx.Rollback()
        return err
    }
    
    // 保存事务上下文(不提交)
    saveTxContext(txID, tx)
    
    return nil // 返回准备就绪
}

func (ip *InventoryParticipant) Commit(ctx context.Context, txID string) error {
    // 获取事务上下文
    tx := getTxContext(txID)
    
    // 提交事务
    return tx.Commit()
}

func (ip *InventoryParticipant) Rollback(ctx context.Context, txID string) error {
    tx := getTxContext(txID)
    return tx.Rollback()
}
java
// 协调者
public class TwoPCCoordinator {
    
    private List<Participant> participants;
    
    public void execute(String txId) throws Exception {
        try {
            // 阶段一:准备
            preparePhase(txId);
            
            // 阶段二:提交
            commitPhase(txId);
            
        } catch (Exception e) {
            // 准备失败,回滚
            rollbackPhase(txId);
            throw e;
        }
    }
    
    private void preparePhase(String txId) throws Exception {
        for (Participant p : participants) {
            boolean success = p.prepare(txId);
            if (!success) {
                throw new Exception("Prepare failed");
            }
        }
    }
    
    private void commitPhase(String txId) throws Exception {
        for (Participant p : participants) {
            p.commit(txId);
        }
    }
    
    private void rollbackPhase(String txId) {
        for (Participant p : participants) {
            try {
                p.rollback(txId);
            } catch (Exception e) {
                // 记录日志
            }
        }
    }
}

// 参与者接口
interface Participant {
    boolean prepare(String txId) throws Exception;
    void commit(String txId) throws Exception;
    void rollback(String txId) throws Exception;
}

3.3 优缺点

优点

  • ✅ 强一致性保证
  • ✅ 实现相对简单

缺点

  • 同步阻塞:所有参与者等待协调者指令
  • 单点故障:协调者宕机导致所有参与者阻塞
  • 数据不一致:网络分区时可能部分提交部分未提交
  • 性能差:锁资源时间长

适用场景

  • 金融系统(强一致性要求)
  • 数据量小、并发低的场景

四、3PC(三阶段提交)

4.1 原理

在2PC基础上增加CanCommit阶段,减少阻塞时间。

阶段一:CanCommit(询问)

协调者 → 参与者:你能执行这个事务吗?
参与者 → 协调者:可以 / 不可以

阶段二:PreCommit(预提交)

协调者 → 参与者:准备提交
参与者 → 协调者:准备完成

阶段三:DoCommit(提交)

协调者 → 参与者:提交事务
参与者 → 协调者:提交完成

4.2 对比2PC

维度2PC3PC
阶段数23
阻塞时间
单点故障严重改善(超时机制)
复杂度
实际应用

为什么3PC很少使用?

  • 复杂度高
  • 性能提升有限
  • 依然无法完全解决数据不一致问题

五、TCC(Try-Confirm-Cancel)

5.1 原理

将事务分为三个阶段:

Try(尝试)

  • 预留资源
  • 检查业务规则
  • 预扣库存、预冻结金额

Confirm(确认)

  • 确认提交
  • 使用预留资源
  • 扣减库存、扣款

Cancel(取消)

  • 取消预留
  • 释放资源
  • 恢复库存、解冻金额

5.2 流程图

mermaid
sequenceDiagram
    participant M as 主业务
    participant S1 as 库存服务
    participant S2 as 订单服务
    participant S3 as 支付服务
    
    M->>S1: Try(预扣库存)
    S1-->>M: 成功
    
    M->>S2: Try(预创建订单)
    S2-->>M: 成功
    
    M->>S3: Try(预冻结金额)
    S3-->>M: 成功
    
    alt 全部成功
        M->>S1: Confirm(确认扣库存)
        M->>S2: Confirm(确认订单)
        M->>S3: Confirm(确认扣款)
    else 任一失败
        M->>S1: Cancel(恢复库存)
        M->>S2: Cancel(取消订单)
        M->>S3: Cancel(解冻金额)
    end

5.3 实现

go
package tcc

import (
    "context"
    "fmt"
)

// TCCTransaction TCC事务管理器
type TCCTransaction struct {
    participants []TCCParticipant
}

// TCCParticipant TCC参与者接口
type TCCParticipant interface {
    Try(ctx context.Context, txID string) error
    Confirm(ctx context.Context, txID string) error
    Cancel(ctx context.Context, txID string) error
}

// Execute 执行TCC事务
func (t *TCCTransaction) Execute(ctx context.Context, txID string) error {
    // Try阶段
    err := t.tryPhase(ctx, txID)
    if err != nil {
        // Try失败,Cancel
        t.cancelPhase(ctx, txID)
        return fmt.Errorf("try phase failed: %w", err)
    }
    
    // Confirm阶段
    err = t.confirmPhase(ctx, txID)
    if err != nil {
        // Confirm失败,重试(幂等)
        return fmt.Errorf("confirm phase failed: %w", err)
    }
    
    return nil
}

func (t *TCCTransaction) tryPhase(ctx context.Context, txID string) error {
    for _, p := range t.participants {
        err := p.Try(ctx, txID)
        if err != nil {
            return err
        }
    }
    return nil
}

func (t *TCCTransaction) confirmPhase(ctx context.Context, txID string) error {
    for _, p := range t.participants {
        err := p.Confirm(ctx, txID)
        if err != nil {
            return err
        }
    }
    return nil
}

func (t *TCCTransaction) cancelPhase(ctx context.Context, txID string) {
    for _, p := range t.participants {
        p.Cancel(ctx, txID)
    }
}

// 示例:库存服务TCC实现
type InventoryTCC struct {
    db *gorm.DB
}

// Try 预扣库存
func (i *InventoryTCC) Try(ctx context.Context, txID string) error {
    // 1. 检查库存
    var inv Inventory
    err := i.db.Where("product_id = ?", 1001).First(&inv).Error
    if err != nil {
        return err
    }
    
    if inv.AvailableStock < 1 {
        return fmt.Errorf("insufficient stock")
    }
    
    // 2. 预扣库存(从可用库存转到冻结库存)
    err = i.db.Exec(`
        UPDATE inventory 
        SET available_stock = available_stock - 1,
            frozen_stock = frozen_stock + 1
        WHERE product_id = ? AND available_stock >= 1
    `, 1001).Error
    
    if err != nil {
        return err
    }
    
    // 3. 记录TCC事务日志
    log := &TCCLog{
        TxID:      txID,
        Service:   "inventory",
        Action:    "try",
        ProductID: 1001,
        Quantity:  1,
        Status:    "prepared",
    }
    i.db.Create(log)
    
    return nil
}

// Confirm 确认扣库存
func (i *InventoryTCC) Confirm(ctx context.Context, txID string) error {
    // 1. 检查幂等性
    var log TCCLog
    err := i.db.Where("tx_id = ? AND service = ?", txID, "inventory").First(&log).Error
    if err != nil {
        return err
    }
    
    if log.Status == "confirmed" {
        return nil // 已确认,幂等返回
    }
    
    // 2. 扣减冻结库存
    err = i.db.Exec(`
        UPDATE inventory 
        SET frozen_stock = frozen_stock - 1,
            sold_stock = sold_stock + 1
        WHERE product_id = ?
    `, log.ProductID).Error
    
    if err != nil {
        return err
    }
    
    // 3. 更新日志状态
    i.db.Model(&log).Update("status", "confirmed")
    
    return nil
}

// Cancel 取消预扣
func (i *InventoryTCC) Cancel(ctx context.Context, txID string) error {
    // 1. 检查幂等性
    var log TCCLog
    err := i.db.Where("tx_id = ? AND service = ?", txID, "inventory").First(&log).Error
    if err != nil {
        return err
    }
    
    if log.Status == "cancelled" {
        return nil // 已取消
    }
    
    // 2. 处理空回滚(Try未执行,直接Cancel)
    if log.Status == "" {
        // 记录空回滚
        i.db.Create(&TCCLog{
            TxID:    txID,
            Service: "inventory",
            Action:  "cancel",
            Status:  "empty_rollback",
        })
        return nil
    }
    
    // 3. 恢复库存(从冻结库存回到可用库存)
    err = i.db.Exec(`
        UPDATE inventory 
        SET available_stock = available_stock + 1,
            frozen_stock = frozen_stock - 1
        WHERE product_id = ?
    `, log.ProductID).Error
    
    if err != nil {
        return err
    }
    
    // 4. 更新日志状态
    i.db.Model(&log).Update("status", "cancelled")
    
    return nil
}

// Inventory 库存表
type Inventory struct {
    ProductID      int64 `gorm:"primary_key"`
    AvailableStock int   `gorm:"column:available_stock"` // 可用库存
    FrozenStock    int   `gorm:"column:frozen_stock"`    // 冻结库存
    SoldStock      int   `gorm:"column:sold_stock"`      // 已售库存
}

// TCCLog TCC事务日志
type TCCLog struct {
    ID        int64  `gorm:"primary_key"`
    TxID      string `gorm:"column:tx_id;index"`
    Service   string `gorm:"column:service"`
    Action    string `gorm:"column:action"` // try/confirm/cancel
    ProductID int64  `gorm:"column:product_id"`
    Quantity  int    `gorm:"column:quantity"`
    Status    string `gorm:"column:status"` // prepared/confirmed/cancelled/empty_rollback
}
java
// TCC事务管理器
@Service
public class TCCTransactionManager {
    
    public void execute(String txId, List<TCCParticipant> participants) throws Exception {
        try {
            // Try阶段
            for (TCCParticipant p : participants) {
                p.tryExecute(txId);
            }
            
            // Confirm阶段
            for (TCCParticipant p : participants) {
                p.confirm(txId);
            }
            
        } catch (Exception e) {
            // Cancel阶段
            for (TCCParticipant p : participants) {
                try {
                    p.cancel(txId);
                } catch (Exception ex) {
                    // 记录日志,后续补偿
                }
            }
            throw e;
        }
    }
}

// TCC参与者接口
interface TCCParticipant {
    void tryExecute(String txId) throws Exception;
    void confirm(String txId) throws Exception;
    void cancel(String txId) throws Exception;
}

// 库存服务TCC实现
@Service
public class InventoryTCCService implements TCCParticipant {
    
    @Autowired
    private InventoryMapper inventoryMapper;
    
    @Autowired
    private TCCLogMapper tccLogMapper;
    
    @Override
    @Transactional
    public void tryExecute(String txId) throws Exception {
        // 预扣库存
        int rows = inventoryMapper.preDeduct(1001, 1);
        if (rows == 0) {
            throw new Exception("库存不足");
        }
        
        // 记录TCC日志
        TCCLog log = new TCCLog();
        log.setTxId(txId);
        log.setService("inventory");
        log.setAction("try");
        log.setProductId(1001L);
        log.setQuantity(1);
        log.setStatus("prepared");
        tccLogMapper.insert(log);
    }
    
    @Override
    @Transactional
    public void confirm(String txId) throws Exception {
        // 检查幂等
        TCCLog log = tccLogMapper.findByTxIdAndService(txId, "inventory");
        if ("confirmed".equals(log.getStatus())) {
            return; // 已确认
        }
        
        // 确认扣库存
        inventoryMapper.confirmDeduct(log.getProductId(), log.getQuantity());
        
        // 更新日志
        log.setStatus("confirmed");
        tccLogMapper.update(log);
    }
    
    @Override
    @Transactional
    public void cancel(String txId) throws Exception {
        // 检查幂等
        TCCLog log = tccLogMapper.findByTxIdAndService(txId, "inventory");
        if (log == null) {
            // 空回滚
            TCCLog emptyLog = new TCCLog();
            emptyLog.setTxId(txId);
            emptyLog.setService("inventory");
            emptyLog.setStatus("empty_rollback");
            tccLogMapper.insert(emptyLog);
            return;
        }
        
        if ("cancelled".equals(log.getStatus())) {
            return; // 已取消
        }
        
        // 恢复库存
        inventoryMapper.rollbackDeduct(log.getProductId(), log.getQuantity());
        
        // 更新日志
        log.setStatus("cancelled");
        tccLogMapper.update(log);
    }
}

5.4 TCC三大问题

1. 幂等性

问题:Confirm/Cancel可能被重复调用

解决:

go
// 通过状态检查保证幂等
if log.Status == "confirmed" {
    return nil // 已确认,直接返回
}

2. 空回滚

问题:Try未执行,直接调用Cancel

解决:

go
// 检查Try是否执行
if log.Status == "" {
    // 记录空回滚,不做实际操作
    return nil
}

3. 悬挂

问题:Cancel先于Try执行

解决:

go
// Try时检查是否已Cancel
var cancelLog TCCLog
i.db.Where("tx_id = ? AND status = ?", txID, "cancelled").First(&cancelLog)
if cancelLog.ID > 0 {
    return fmt.Errorf("transaction已取消")
}

5.5 优缺点

优点

  • ✅ 性能高(无锁,不阻塞)
  • ✅ 最终一致性
  • ✅ 灵活(业务可控)

缺点

  • ❌ 业务侵入性强(需实现Try/Confirm/Cancel)
  • ❌ 复杂度高(需处理幂等、空回滚、悬挂)
  • ❌ 开发成本高

适用场景 ⭐:

  • 电商订单
  • 支付系统
  • 对一致性要求高但允许短暂延迟

六、SAGA模式

6.1 原理

将长事务拆分为多个本地短事务,每个事务有对应的补偿事务。

正向服务:Ti(Transaction) 补偿服务:Ci(Compensation)

执行顺序

成功:T1 → T2 → T3 → ... → Tn
失败:T1 → T2 → T3(失败) → C2 → C1

6.2 两种模式

编排模式(Orchestration)

  • 中心协调器控制流程
  • 类似2PC的协调者

协调模式(Choreography)

  • 事件驱动
  • 每个服务监听事件,自行决策

6.3 流程图

mermaid
sequenceDiagram
    participant O as 订单编排器
    participant I as 库存服务
    participant P as 支付服务
    participant L as 物流服务
    
    O->>I: T1: 扣库存
    I-->>O: 成功
    
    O->>P: T2: 扣款
    P-->>O: 成功
    
    O->>L: T3: 创建物流单
    L-->>O: 失败
    
    O->>P: C2: 退款(补偿)
    O->>I: C1: 恢复库存(补偿)

6.4 实现

go
package saga

import (
    "context"
    "fmt"
)

// SagaOrchestrator SAGA编排器
type SagaOrchestrator struct {
    steps []*SagaStep
}

// SagaStep SAGA步骤
type SagaStep struct {
    Name        string
    Transaction func(ctx context.Context) error
    Compensation func(ctx context.Context) error
}

// Execute 执行SAGA
func (s *SagaOrchestrator) Execute(ctx context.Context) error {
    executedSteps := 0
    
    // 执行正向事务
    for i, step := range s.steps {
        err := step.Transaction(ctx)
        if err != nil {
            // 事务失败,执行补偿
            s.compensate(ctx, executedSteps)
            return fmt.Errorf("step %s failed: %w", step.Name, err)
        }
        executedSteps = i + 1
    }
    
    return nil
}

// compensate 执行补偿
func (s *SagaOrchestrator) compensate(ctx context.Context, executedSteps int) {
    // 逆序执行补偿
    for i := executedSteps - 1; i >= 0; i-- {
        step := s.steps[i]
        err := step.Compensation(ctx)
        if err != nil {
            // 补偿失败,记录日志,后续人工介入
            fmt.Printf("compensation failed for step %s: %v\n", step.Name, err)
        }
    }
}

// 示例:订单SAGA
func CreateOrderSaga() *SagaOrchestrator {
    return &SagaOrchestrator{
        steps: []*SagaStep{
            {
                Name: "扣库存",
                Transaction: func(ctx context.Context) error {
                    return inventoryService.Deduct(ctx, 1001, 1)
                },
                Compensation: func(ctx context.Context) error {
                    return inventoryService.Restore(ctx, 1001, 1)
                },
            },
            {
                Name: "创建订单",
                Transaction: func(ctx context.Context) error {
                    return orderService.Create(ctx, &Order{...})
                },
                Compensation: func(ctx context.Context) error {
                    return orderService.Cancel(ctx, orderID)
                },
            },
            {
                Name: "扣款",
                Transaction: func(ctx context.Context) error {
                    return paymentService.Deduct(ctx, userID, 100)
                },
                Compensation: func(ctx context.Context) error {
                    return paymentService.Refund(ctx, userID, 100)
                },
            },
        },
    }
}
java
// SAGA编排器
public class SagaOrchestrator {
    
    private List<SagaStep> steps = new ArrayList<>();
    
    public void execute() throws Exception {
        int executedSteps = 0;
        
        try {
            // 执行正向事务
            for (SagaStep step : steps) {
                step.transaction();
                executedSteps++;
            }
        } catch (Exception e) {
            // 执行补偿
            compensate(executedSteps);
            throw e;
        }
    }
    
    private void compensate(int executedSteps) {
        // 逆序补偿
        for (int i = executedSteps - 1; i >= 0; i--) {
            try {
                steps.get(i).compensation();
            } catch (Exception e) {
                // 补偿失败,记录日志
                log.error("Compensation failed for step {}", i, e);
            }
        }
    }
}

// SAGA步骤
class SagaStep {
    private Runnable transaction;
    private Runnable compensation;
    
    public void transaction() throws Exception {
        transaction.run();
    }
    
    public void compensation() {
        compensation.run();
    }
}

6.5 优缺点

优点

  • ✅ 适合长事务
  • ✅ 不需要锁资源
  • ✅ 实现相对简单

缺点

  • ❌ 缺乏隔离性(补偿前数据可被读取)
  • ❌ 补偿复杂(需要业务逆操作)

适用场景

  • 长流程事务(订单流程、审批流程)
  • 对一致性要求不那么严格

七、本地消息表

7.1 原理

利用本地事务保证消息和业务操作的原子性。

核心思想

  1. 业务操作和消息插入在同一本地事务
  2. 定时扫描消息表,发送消息
  3. 消费者消费消息,执行业务

7.2 流程图

mermaid
sequenceDiagram
    participant A as 服务A
    participant DB as 数据库
    participant MQ as 消息队列
    participant B as 服务B
    
    A->>DB: BEGIN TRANSACTION
    A->>DB: 更新业务数据
    A->>DB: 插入消息表
    A->>DB: COMMIT
    
    loop 定时扫描
        A->>DB: 查询未发送消息
        A->>MQ: 发送消息
        A->>DB: 更新消息状态为已发送
    end
    
    MQ->>B: 消费消息
    B->>B: 执行业务逻辑
    B->>MQ: ACK

7.3 实现

go
package localmsg

import (
    "context"
    "encoding/json"
    "time"
    
    "gorm.io/gorm"
)

// LocalMessageService 本地消息服务
type LocalMessageService struct {
    db          *gorm.DB
    mqProducer  *MQProducer
}

// Message 本地消息表
type Message struct {
    ID          int64     `gorm:"primary_key"`
    Topic       string    `gorm:"column:topic"`
    Content     string    `gorm:"column:content"`
    Status      int       `gorm:"column:status"` // 0待发送 1已发送 2失败
    RetryCount  int       `gorm:"column:retry_count"`
    MaxRetry    int       `gorm:"column:max_retry"`
    CreateTime  time.Time `gorm:"column:create_time"`
    UpdateTime  time.Time `gorm:"column:update_time"`
}

// CreateOrderWithMessage 创建订单(本地消息表)
func (s *LocalMessageService) CreateOrderWithMessage(ctx context.Context, order *Order) error {
    // 开启事务
    return s.db.Transaction(func(tx *gorm.DB) error {
        // 1. 创建订单
        err := tx.Create(order).Error
        if err != nil {
            return err
        }
        
        // 2. 插入本地消息
        msgContent, _ := json.Marshal(order)
        msg := &Message{
            Topic:      "order_created",
            Content:    string(msgContent),
            Status:     0, // 待发送
            RetryCount: 0,
            MaxRetry:   3,
            CreateTime: time.Now(),
        }
        err = tx.Create(msg).Error
        if err != nil {
            return err
        }
        
        // 3. 提交事务(原子性保证)
        return nil
    })
}

// SendPendingMessages 发送待发送消息(定时任务)
func (s *LocalMessageService) SendPendingMessages(ctx context.Context) error {
    // 查询待发送消息
    var messages []Message
    err := s.db.Where("status = ? AND retry_count < max_retry", 0).
        Limit(100).
        Find(&messages).Error
    if err != nil {
        return err
    }
    
    for _, msg := range messages {
        // 发送到MQ
        err := s.mqProducer.Send(msg.Topic, msg.Content)
        if err != nil {
            // 发送失败,增加重试次数
            s.db.Model(&msg).Updates(map[string]interface{}{
                "retry_count": gorm.Expr("retry_count + 1"),
                "update_time": time.Now(),
            })
            continue
        }
        
        // 发送成功,更新状态
        s.db.Model(&msg).Updates(map[string]interface{}{
            "status":      1, // 已发送
            "update_time": time.Now(),
        })
    }
    
    return nil
}

// ConsumeOrderMessage 消费订单消息(服务B)
func ConsumeOrderMessage(msg string) error {
    var order Order
    json.Unmarshal([]byte(msg), &order)
    
    // 执行业务逻辑(幂等性保证)
    // 例如:发送通知、更新统计等
    
    return nil
}

7.4 优缺点

优点

  • ✅ 实现简单
  • ✅ 可靠性高(本地事务保证)
  • ✅ 性能好

缺点

  • ❌ 需要定时任务扫描
  • ❌ 消息表会膨胀(需定期清理)

适用场景 ⭐:

  • 异步场景(通知、日志)
  • 对实时性要求不高
  • 高可靠性要求

八、Seata框架

8.1 Seata简介

Seata(Simple Extensible Autonomous Transaction Architecture)是阿里开源的分布式事务解决方案。

支持模式

  • AT模式:自动两阶段提交
  • TCC模式:手动Try-Confirm-Cancel
  • SAGA模式:长事务补偿
  • XA模式:标准XA协议

8.2 AT模式(重点)

原理

  1. 一阶段:执行业务SQL,记录回滚日志(undo_log)
  2. 二阶段:
    • 提交:删除undo_log
    • 回滚:根据undo_log还原数据

示例

java
@GlobalTransactional
public void createOrder() {
    // 扣库存
    inventoryService.deduct(1001, 1);
    
    // 创建订单
    orderService.create(order);
    
    // 扣款
    paymentService.pay(100);
}

undo_log表

sql
CREATE TABLE undo_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    branch_id BIGINT NOT NULL,
    xid VARCHAR(100) NOT NULL,
    context VARCHAR(128),
    rollback_info LONGBLOB NOT NULL,
    log_status INT NOT NULL,
    log_created DATETIME NOT NULL,
    log_modified DATETIME NOT NULL,
    INDEX idx_xid (xid)
);

8.3 对比

模式业务侵入性能一致性适用场景
AT最终一致通用场景 ⭐
TCC最终一致核心业务
SAGA最终一致长事务
XA强一致金融系统

九、方案选择

9.1 选择建议

金融支付(强一致性)

  • 首选:XA / 2PC
  • 备选:TCC

电商订单(最终一致性)

  • 首选:TCC ⭐
  • 备选:SAGA / 本地消息表

异步通知(可靠性)

  • 首选:本地消息表 ⭐
  • 备选:MQ事务消息

长流程(审批、工单)

  • 首选:SAGA ⭐

9.2 实际应用

公司方案场景
阿里TCC + SAGA电商订单
蚂蚁金服TCC支付系统
美团本地消息表异步通知
滴滴SAGA订单流程

十、面试要点

10.1 常见追问

Q1: 2PC和3PC的区别?

A:

维度2PC3PC
阶段准备、提交询问、准备、提交
阻塞短(增加超时)
单点故障严重改善
应用

Q2: TCC如何保证幂等性?

A: 通过状态检查:

go
if log.Status == "confirmed" {
    return nil // 已确认,幂等返回
}

Q3: TCC的空回滚和悬挂如何处理?

A:

  • 空回滚:Try未执行,Cancel先到。记录空回滚状态,不做实际操作。
  • 悬挂:Cancel先于Try执行。Try时检查是否已Cancel。

Q4: SAGA的补偿操作如何设计?

A: 补偿操作要满足:

  • 幂等性:可重复执行
  • 可补偿性:能还原业务状态
  • 重试性:失败可重试

Q5: 如何选择分布式事务方案?

A: 根据业务特点:

  • 强一致性 → 2PC/XA
  • 高性能 + 最终一致 → TCC
  • 长事务 → SAGA
  • 异步场景 → 本地消息表

10.2 扩展知识点

相关场景题

相关技术文档

十一、总结

分布式事务解决方案对比:

方案一致性性能复杂度推荐指数
2PC/3PC⭐⭐
TCC最终⭐⭐⭐⭐⭐
SAGA最终⭐⭐⭐⭐
本地消息表最终⭐⭐⭐⭐⭐

核心原则

  1. 优先使用本地事务
  2. 能用补偿就别用2PC
  3. TCC适合核心业务
  4. 本地消息表适合异步场景

面试重点

  • 理解每种方案的原理和适用场景
  • 能说出优缺点和选择依据
  • 掌握TCC的三大问题
  • 了解Seata框架的AT模式

参考资料

  • Seata官方文档
  • 阿里《企业IT架构转型之道》
  • 美团技术团队分布式事务实践

正在精进