Skip to content

Redis集群架构高级解析

🏗️ Redis Cluster深度剖析

集群架构原理

Q1: Redis Cluster的架构设计和数据分片机制?如何实现自动故障转移?

难度: ⭐⭐⭐⭐⭐

答案: Redis Cluster采用去中心化架构,通过哈希槽(hash slot)实现数据分片和负载均衡。

1. 集群架构设计:

核心架构组件:

bash
# Redis集群拓扑结构
┌─────────────────────────────────────────────────────────────┐
                    Redis Cluster
├───────────────┬───────────────┬───────────────┬─────────────┤
   Master 1   Master 2   Master 3   Master N
   Slot 0-5460 Slot 5461- Slot 10923-   ...
   10922   16383
├───────────────┼───────────────┼───────────────┼─────────────┤
   Slave 1   Slave 2   Slave 3   Slave N
  (Master 1)   │  (Master 2)   │  (Master 3)   │   ...       │
└───────────────┴───────────────┴───────────────┴─────────────┘

集群搭建配置:

bash
# 创建6节点集群(3主3从)
mkdir redis-cluster
cd redis-cluster

# 创建6个Redis实例配置
for port in $(seq 7000 7005); do
  mkdir $port
  cat << EOF > $port/redis.conf
port $port
cluster-enabled yes
cluster-config-file nodes-${port}.conf
cluster-node-timeout 5000
cluster-announce-port $port
cluster-announce-bus-port 1${port}
appendonly yes
daemonize yes
pidfile /var/run/redis_${port}.pid
logfile ${port}/redis.log
dir ./${port}/
EOF
done

# 启动所有实例
for port in $(seq 7000 7005); do
  redis-server $port/redis.conf
done

# 创建集群
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1

2. Hash Slot分片机制:

哈希槽算法实现:

python
import crc16

def calculate_slot(key):
    """
    计算key对应的哈希槽
    Redis使用CRC16算法
    """
    # 处理hash tag
    start = key.find('{')
    if start != -1:
        end = key.find('}', start + 1)
        if end != -1 and end != start + 1:
            key = key[start + 1:end]
    
    # 计算CRC16并对16384取模
    return crc16.crc16xmodem(key.encode('utf-8')) % 16384

# 示例计算
keys = ['user:1001', 'user:1002', 'user:{1001}:profile', 'user:{1001}:orders']
for key in keys:
    slot = calculate_slot(key)
    print(f"Key: {key} -> Slot: {slot}")

# 输出示例:
# Key: user:1001 -> Slot: 9166
# Key: user:1002 -> Slot: 5586  
# Key: user:{1001}:profile -> Slot: 9166  (相同hash tag)
# Key: user:{1001}:orders -> Slot: 9166   (相同hash tag)

槽位分配策略:

bash
# 查看集群槽位分配
redis-cli -p 7000 cluster slots
# 输出格式:起始槽位 结束槽位 主节点信息 从节点信息

# 查看特定节点负责的槽位
redis-cli -p 7000 cluster nodes | grep master

# 手动分配槽位(用于集群扩容)
redis-cli -p 7000 cluster addslots {0..5460}
redis-cli -p 7001 cluster addslots {5461..10922}
redis-cli -p 7002 cluster addslots {10923..16383}

3. 数据路由机制:

客户端路由实现:

python
import redis
from rediscluster import RedisCluster

# Redis Cluster客户端配置
startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"},
    {"host": "127.0.0.1", "port": "7002"}
]

# 智能客户端自动路由
rc = RedisCluster(
    startup_nodes=startup_nodes, 
    decode_responses=True,
    skip_full_coverage_check=True,
    max_connections=32
)

# 批量操作优化
def batch_operations(rc, operations):
    """
    优化批量操作,按节点分组减少网络开销
    """
    # 按槽位分组操作
    slot_groups = {}
    
    for op_type, key, value in operations:
        slot = calculate_slot(key)
        node = rc.get_node_from_slot(slot)
        node_id = f"{node['host']}:{node['port']}"
        
        if node_id not in slot_groups:
            slot_groups[node_id] = []
        slot_groups[node_id].append((op_type, key, value))
    
    # 并行执行各节点操作
    results = {}
    for node_id, ops in slot_groups.items():
        host, port = node_id.split(':')
        node_client = redis.Redis(host=host, port=int(port))
        
        pipe = node_client.pipeline()
        for op_type, key, value in ops:
            if op_type == 'SET':
                pipe.set(key, value)
            elif op_type == 'GET':
                pipe.get(key)
        
        results[node_id] = pipe.execute()
    
    return results

4. 故障检测机制:

Gossip协议实现:

bash
# 集群节点通信端口配置
# 数据端口: 7000-7005
# 总线端口: 17000-17005 (数据端口 + 10000)

# 查看集群状态
redis-cli -p 7000 cluster info
# cluster_state:ok
# cluster_slots_assigned:16384
# cluster_slots_ok:16384
# cluster_slots_pfail:0
# cluster_slots_fail:0
# cluster_known_nodes:6
# cluster_size:3

# 节点故障检测配置
redis-cli -p 7000 config set cluster-node-timeout 5000
redis-cli -p 7000 config set cluster-slave-validity-factor 10
redis-cli -p 7000 config set cluster-migration-barrier 1

故障检测流程:

python
def failure_detection_process():
    """
    Redis Cluster故障检测流程
    """
    print("故障检测流程:")
    print("1. 节点定期发送PING消息")
    print("2. 超时未收到PONG标记为PFAIL(可能故障)")
    print("3. 通过Gossip协议传播PFAIL信息")
    print("4. 超过半数节点认为PFAIL则标记为FAIL")
    print("5. 广播FAIL消息到整个集群")
    print("6. 触发从节点选举成为新主节点")
    
    return {
        'ping_interval': '1 second',
        'pong_timeout': 'cluster-node-timeout',
        'pfail_threshold': '1 node',
        'fail_threshold': 'majority of masters',
        'election_timeout': '2 * cluster-node-timeout'
    }

5. 自动故障转移:

主从切换过程:

bash
# 模拟主节点故障
redis-cli -p 7000 debug segfault  # 强制主节点崩溃

# 监控故障转移过程
redis-cli -p 7003 cluster nodes
# 查看新的主从关系

# 手动故障转移
redis-cli -p 7003 cluster failover

# 手动故障转移(强制)
redis-cli -p 7003 cluster failover force

从节点选举算法:

python
def slave_election_algorithm(slaves):
    """
    从节点选举新主节点算法
    """
    eligible_slaves = []
    
    for slave in slaves:
        # 检查选举条件
        if (slave.replication_offset_delay < 5 and  # 复制延迟小于5秒
            slave.last_ping_time < 2000 and          # 最后ping时间小于2秒  
            slave.is_connected):                      # 连接正常
            eligible_slaves.append(slave)
    
    if not eligible_slaves:
        return None
    
    # 按复制偏移量排序(越大越优先)
    eligible_slaves.sort(key=lambda x: x.replication_offset, reverse=True)
    
    # 选择复制偏移量最大的从节点
    winner = eligible_slaves[0]
    
    print(f"Selected new master: {winner.node_id}")
    print(f"Replication offset: {winner.replication_offset}")
    
    return winner

面试要点:

  • 理解哈希槽分片的优势和计算方法
  • 掌握集群节点间的通信机制
  • 了解故障检测和自动切换的完整流程

Q2: Redis Cluster扩缩容机制?如何实现在线数据迁移?

难度: ⭐⭐⭐⭐

答案: Redis Cluster支持动态扩缩容,通过槽位迁移实现数据重新分布,保证服务不中断。

1. 集群扩容流程:

添加新节点:

bash
# 启动新的Redis实例
redis-server --port 7006 --cluster-enabled yes \
--cluster-config-file nodes-7006.conf --daemonize yes

redis-server --port 7007 --cluster-enabled yes \
--cluster-config-file nodes-7007.conf --daemonize yes

# 添加新主节点到集群
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000

# 添加从节点并指定主节点
redis-cli --cluster add-node 127.0.0.1:7007 127.0.0.1:7000 \
--cluster-slave --cluster-master-id <7006-node-id>

# 查看集群状态
redis-cli -p 7000 cluster nodes

槽位重新分配:

bash
# 自动重新分片(推荐)
redis-cli --cluster reshard 127.0.0.1:7000 \
--cluster-from all \
--cluster-to <new-node-id> \
--cluster-slots 1365 \
--cluster-yes

# 手动分配槽位
redis-cli --cluster reshard 127.0.0.1:7000
# 交互式选择源节点和目标节点

2. 在线数据迁移机制:

迁移状态管理:

bash
# 查看正在迁移的槽位
redis-cli -p 7000 cluster nodes | grep importing
redis-cli -p 7000 cluster nodes | grep migrating

# 查看特定槽位迁移状态
redis-cli -p 7000 cluster slots

ASK重定向机制:

python
import redis
import time

def simulate_migration_process():
    """
    模拟槽位迁移过程中的ASK重定向
    """
    source_client = redis.Redis(host='127.0.0.1', port=7000)
    target_client = redis.Redis(host='127.0.0.1', port=7006)
    
    slot = 1000
    
    print("1. 设置迁移状态")
    # 源节点设置migrating状态
    source_client.execute_command('CLUSTER', 'SETSLOT', slot, 'MIGRATING', target_node_id)
    
    # 目标节点设置importing状态  
    target_client.execute_command('CLUSTER', 'SETSLOT', slot, 'IMPORTING', source_node_id)
    
    print("2. 开始迁移槽位中的key")
    keys_in_slot = source_client.execute_command('CLUSTER', 'GETKEYSINSLOT', slot, 100)
    
    for key in keys_in_slot:
        try:
            # 使用MIGRATE命令迁移key
            source_client.execute_command(
                'MIGRATE', '127.0.0.1', 7006, key, 0, 5000, 
                'COPY', 'REPLACE'
            )
            print(f"Migrated key: {key}")
            
        except redis.RedisError as e:
            print(f"Migration failed for {key}: {e}")
    
    print("3. 完成迁移")
    # 更新槽位分配
    for node in cluster_nodes:
        node.execute_command('CLUSTER', 'SETSLOT', slot, 'NODE', target_node_id)

def handle_ask_redirection(key):
    """
    处理ASK重定向的客户端逻辑
    """
    try:
        # 尝试从源节点获取
        result = source_client.get(key)
        return result
        
    except redis.ResponseError as e:
        if 'ASK' in str(e):
            # 解析ASK响应: ASK slot target_host:target_port
            parts = str(e).split(' ')
            target_host, target_port = parts[2].split(':')
            
            # 向目标节点发送ASKING命令然后执行操作
            temp_client = redis.Redis(host=target_host, port=int(target_port))
            temp_client.execute_command('ASKING')
            result = temp_client.get(key)
            return result
        else:
            raise e

3. 数据一致性保证:

原子性迁移:

python
def atomic_key_migration(source, target, key, timeout=5000):
    """
    原子性key迁移实现
    """
    try:
        # 1. 使用MIGRATE命令进行原子性迁移
        result = source.execute_command(
            'MIGRATE',
            target.connection_pool.connection_kwargs['host'],
            target.connection_pool.connection_kwargs['port'], 
            key,
            0,  # destination database
            timeout,
            'COPY',    # 复制模式,不删除源key
            'REPLACE'  # 如果目标存在则替换
        )
        
        if result == b'OK':
            # 2. 迁移成功后删除源key
            source.delete(key)
            return True
            
    except redis.RedisError as e:
        print(f"Migration failed: {e}")
        return False
    
    return False

def batch_migration_with_consistency(source, target, keys):
    """
    批量迁移保证一致性
    """
    success_count = 0
    failed_keys = []
    
    for key in keys:
        # 检查key是否仍在源节点
        if not source.exists(key):
            continue
            
        # 执行原子性迁移
        if atomic_key_migration(source, target, key):
            success_count += 1
        else:
            failed_keys.append(key)
    
    return {
        'success': success_count,
        'failed': failed_keys,
        'total': len(keys)
    }

4. 集群缩容流程:

移除节点:

bash
# 1. 先迁移要删除节点的槽位到其他节点
redis-cli --cluster reshard 127.0.0.1:7000 \
--cluster-from <node-to-remove-id> \
--cluster-to <target-node-id> \
--cluster-slots <all-slots-count> \
--cluster-yes

# 2. 删除从节点
redis-cli --cluster del-node 127.0.0.1:7007 <slave-node-id>

# 3. 删除主节点(确保槽位已全部迁移)
redis-cli --cluster del-node 127.0.0.1:7006 <master-node-id>

# 4. 验证集群状态
redis-cli --cluster check 127.0.0.1:7000

安全缩容检查:

python
def safe_node_removal_check(node_to_remove):
    """
    安全移除节点前的检查
    """
    checks = {
        'has_slots': False,
        'has_data': False, 
        'replication_ok': True,
        'cluster_stable': True
    }
    
    # 检查节点是否还有槽位分配
    slots = node_to_remove.execute_command('CLUSTER', 'SLOTS')
    if any(slot_info for slot_info in slots if node_to_remove.node_id in slot_info):
        checks['has_slots'] = True
        
    # 检查节点是否还有数据
    db_size = node_to_remove.dbsize()
    if db_size > 0:
        checks['has_data'] = True
    
    # 检查集群复制状态
    cluster_info = node_to_remove.execute_command('CLUSTER', 'INFO')
    if 'cluster_state:fail' in cluster_info:
        checks['cluster_stable'] = False
    
    return checks

def perform_safe_removal(node_to_remove):
    """
    执行安全的节点移除
    """
    checks = safe_node_removal_check(node_to_remove)
    
    if checks['has_slots']:
        print("错误: 节点仍有槽位分配,请先迁移槽位")
        return False
        
    if checks['has_data']:
        print("警告: 节点仍有数据,建议先进行数据清理")
        
    if not checks['cluster_stable']:
        print("错误: 集群状态不稳定,请等待集群恢复正常")
        return False
    
    # 执行节点移除
    return True

5. 监控和性能优化:

迁移性能监控:

python
import time
import threading

class MigrationMonitor:
    def __init__(self, source_nodes, target_nodes):
        self.source_nodes = source_nodes
        self.target_nodes = target_nodes
        self.metrics = {
            'keys_migrated': 0,
            'migration_rate': 0,
            'errors': 0,
            'start_time': time.time()
        }
    
    def monitor_migration_progress(self):
        """
        监控迁移进度
        """
        while self.is_migration_active():
            # 统计已迁移的key数量
            total_keys = sum(node.dbsize() for node in self.target_nodes)
            
            # 计算迁移速率
            elapsed = time.time() - self.metrics['start_time']
            self.metrics['migration_rate'] = total_keys / elapsed if elapsed > 0 else 0
            
            print(f"Migration Progress:")
            print(f"  Keys migrated: {total_keys}")
            print(f"  Rate: {self.metrics['migration_rate']:.2f} keys/sec")
            print(f"  Elapsed: {elapsed:.2f} seconds")
            
            time.sleep(5)
    
    def is_migration_active(self):
        """
        检查是否还有迁移在进行
        """
        for node in self.source_nodes + self.target_nodes:
            cluster_info = node.execute_command('CLUSTER', 'INFO')
            if 'migrating' in cluster_info or 'importing' in cluster_info:
                return True
        return False

# 使用示例
monitor = MigrationMonitor(source_nodes, target_nodes)
monitor_thread = threading.Thread(target=monitor.monitor_migration_progress)
monitor_thread.start()

面试要点:

  • 理解槽位迁移的原子性和一致性保证
  • 掌握ASK重定向机制的实现原理
  • 了解扩缩容对业务的影响和优化策略

⚡ 性能优化策略

Q3: Redis Cluster的性能优化和最佳实践?如何处理热点数据和跨槽位操作?

难度: ⭐⭐⭐⭐⭐

答案: Redis Cluster的性能优化需要从网络、内存、CPU等多个维度进行系统性优化。

1. 热点数据处理策略:

热点检测算法:

python
import time
from collections import defaultdict
from threading import Lock

class HotspotDetector:
    def __init__(self, window_size=60, threshold=1000):
        self.window_size = window_size
        self.threshold = threshold
        self.access_counts = defaultdict(list)
        self.lock = Lock()
    
    def record_access(self, key, slot):
        """
        记录key访问
        """
        current_time = time.time()
        
        with self.lock:
            # 清理过期记录
            self.access_counts[key] = [
                timestamp for timestamp in self.access_counts[key]
                if current_time - timestamp <= self.window_size
            ]
            
            # 添加新访问记录
            self.access_counts[key].append(current_time)
    
    def detect_hotspots(self):
        """
        检测热点key
        """
        hotspots = {}
        current_time = time.time()
        
        with self.lock:
            for key, timestamps in self.access_counts.items():
                # 统计时间窗口内的访问次数
                recent_accesses = [
                    t for t in timestamps 
                    if current_time - t <= self.window_size
                ]
                
                if len(recent_accesses) >= self.threshold:
                    hotspots[key] = {
                        'access_count': len(recent_accesses),
                        'qps': len(recent_accesses) / self.window_size,
                        'slot': calculate_slot(key)
                    }
        
        return hotspots

def hotspot_mitigation_strategy(hotspots, cluster_client):
    """
    热点数据缓解策略
    """
    strategies = {}
    
    for key, info in hotspots.items():
        qps = info['qps']
        slot = info['slot']
        
        if qps > 10000:  # 超高QPS热点
            strategies[key] = {
                'strategy': 'local_cache',
                'ttl': 60,  # 本地缓存60秒
                'description': '使用本地缓存减少Redis访问'
            }
        elif qps > 5000:  # 高QPS热点
            strategies[key] = {
                'strategy': 'read_replica',
                'replicas': 3,
                'description': '增加读副本分散读压力'
            }
        elif qps > 1000:  # 中等热点
            strategies[key] = {
                'strategy': 'hash_tag_split',
                'split_count': 4,
                'description': '使用hash tag拆分到多个槽位'
            }
    
    return strategies

本地缓存实现:

python
import threading
import time
from functools import wraps

class LocalCache:
    def __init__(self, max_size=1000, ttl=300):
        self.cache = {}
        self.access_times = {}
        self.max_size = max_size
        self.ttl = ttl
        self.lock = threading.Lock()
    
    def get(self, key):
        with self.lock:
            if key in self.cache:
                # 检查是否过期
                if time.time() - self.access_times[key] <= self.ttl:
                    return self.cache[key]
                else:
                    del self.cache[key]
                    del self.access_times[key]
        return None
    
    def set(self, key, value):
        with self.lock:
            # LRU淘汰策略
            if len(self.cache) >= self.max_size:
                oldest_key = min(self.access_times, key=self.access_times.get)
                del self.cache[oldest_key]
                del self.access_times[oldest_key]
            
            self.cache[key] = value
            self.access_times[key] = time.time()

# 装饰器实现自动本地缓存
def with_local_cache(ttl=60):
    local_cache = LocalCache(ttl=ttl)
    
    def decorator(func):
        @wraps(func)
        def wrapper(key):
            # 先查本地缓存
            cached_value = local_cache.get(key)
            if cached_value is not None:
                return cached_value
            
            # 缓存miss,从Redis获取
            value = func(key)
            if value is not None:
                local_cache.set(key, value)
            
            return value
        return wrapper
    return decorator

# 使用示例
@with_local_cache(ttl=60)
def get_user_info(user_id):
    return redis_cluster.hgetall(f"user:{user_id}")

2. 跨槽位操作优化:

Hash Tag策略:

python
def design_hash_tag_strategy(related_keys):
    """
    设计Hash Tag策略确保相关key在同一槽位
    """
    strategies = {
        'user_data': {
            'pattern': 'user:{user_id}:{suffix}',
            'hash_tag': '{user_id}',
            'examples': [
                'user:{1001}:profile',
                'user:{1001}:orders', 
                'user:{1001}:preferences'
            ]
        },
        'session_data': {
            'pattern': 'session:{session_id}:{type}',
            'hash_tag': '{session_id}',
            'examples': [
                'session:{abc123}:data',
                'session:{abc123}:expires',
                'session:{abc123}:permissions'
            ]
        },
        'shopping_cart': {
            'pattern': 'cart:{user_id}:{item_type}',
            'hash_tag': '{user_id}',
            'examples': [
                'cart:{1001}:items',
                'cart:{1001}:total',
                'cart:{1001}:discount'
            ]
        }
    }
    
    return strategies

def batch_operation_with_hash_tags(operations):
    """
    基于Hash Tag的批量操作优化
    """
    # 按槽位分组操作
    slot_groups = defaultdict(list)
    
    for op_type, key, value in operations:
        slot = calculate_slot(key)
        slot_groups[slot].append((op_type, key, value))
    
    # 并行执行各槽位操作
    results = {}
    for slot, ops in slot_groups.items():
        node = get_node_by_slot(slot)
        pipe = node.pipeline()
        
        for op_type, key, value in ops:
            if op_type == 'SET':
                pipe.set(key, value)
            elif op_type == 'GET':
                pipe.get(key)
            elif op_type == 'HSET':
                pipe.hset(key, value['field'], value['value'])
        
        results[slot] = pipe.execute()
    
    return results

def transaction_with_hash_tag(user_id, operations):
    """
    使用Hash Tag的事务操作
    """
    # 确保所有key都使用相同的hash tag
    tagged_ops = []
    for op_type, key, value in operations:
        # 如果key不包含hash tag,添加用户ID作为hash tag
        if '{' not in key:
            tagged_key = f"{{{user_id}}}:{key}"
        else:
            tagged_key = key
        tagged_ops.append((op_type, tagged_key, value))
    
    # 所有操作将在同一节点执行
    slot = calculate_slot(f"{{{user_id}}}:")
    node = get_node_by_slot(slot)
    
    # 使用事务执行
    pipe = node.pipeline()
    pipe.multi()
    
    for op_type, key, value in tagged_ops:
        if op_type == 'SET':
            pipe.set(key, value)
        elif op_type == 'INCR':
            pipe.incr(key)
    
    return pipe.execute()

3. 网络和连接优化:

连接池优化:

python
from rediscluster import RedisCluster
import redis

def create_optimized_cluster_client():
    """
    创建优化的集群客户端
    """
    startup_nodes = [
        {"host": "127.0.0.1", "port": "7000"},
        {"host": "127.0.0.1", "port": "7001"},
        {"host": "127.0.0.1", "port": "7002"}
    ]
    
    return RedisCluster(
        startup_nodes=startup_nodes,
        decode_responses=True,
        
        # 连接池优化
        max_connections=32,          # 每个节点最大连接数
        max_connections_per_node=16, # 单节点连接限制
        
        # 健康检查
        health_check_interval=30,    # 健康检查间隔
        
        # 重试策略
        retry_on_timeout=True,
        retry_on_cluster_down=True,
        
        # 性能优化
        skip_full_coverage_check=True,  # 跳过完整覆盖检查
        socket_keepalive=True,          # 启用TCP keepalive
        socket_keepalive_options={
            'TCP_KEEPIDLE': 600,
            'TCP_KEEPINTVL': 30,
            'TCP_KEEPCNT': 3
        },
        
        # 超时配置
        socket_timeout=5,
        socket_connect_timeout=5,
    )

def pipeline_optimization_example():
    """
    Pipeline批量操作优化
    """
    rc = create_optimized_cluster_client()
    
    # 智能pipeline:按节点分组
    operations = [
        ('SET', 'user:1001', 'data1'),
        ('SET', 'user:1002', 'data2'), 
        ('GET', 'user:1001'),
        ('INCR', 'counter:1001')
    ]
    
    # 按节点分组操作
    node_operations = defaultdict(list)
    for op in operations:
        key = op[1]
        slot = calculate_slot(key)
        node = rc.get_node_from_slot(slot)
        node_operations[node['name']].append(op)
    
    # 并行执行pipeline
    results = {}
    for node_name, ops in node_operations.items():
        node_client = rc.get_redis_connection(node_name)
        pipe = node_client.pipeline()
        
        for op_type, key, *args in ops:
            getattr(pipe, op_type.lower())(key, *args)
        
        results[node_name] = pipe.execute()
    
    return results

4. 内存优化策略:

内存使用分析:

python
def analyze_cluster_memory_usage(cluster_nodes):
    """
    分析集群内存使用情况
    """
    memory_stats = {}
    
    for node in cluster_nodes:
        info = node.info('memory')
        slots_info = node.execute_command('CLUSTER', 'SLOTS')
        
        # 获取节点负责的槽位数量
        node_slots = sum(
            slot_range[1] - slot_range[0] + 1 
            for slot_range in slots_info
            if node.connection_pool.connection_kwargs['port'] in slot_range[2]
        )
        
        memory_stats[f"{node.connection_pool.connection_kwargs['host']}:{node.connection_pool.connection_kwargs['port']}"] = {
            'used_memory': info['used_memory'],
            'used_memory_human': info['used_memory_human'],
            'used_memory_rss': info['used_memory_rss'],
            'mem_fragmentation_ratio': info['mem_fragmentation_ratio'],
            'slots_count': node_slots,
            'keys_count': node.dbsize(),
            'avg_memory_per_key': info['used_memory'] / max(node.dbsize(), 1),
            'memory_usage_rate': info['used_memory'] / info.get('maxmemory', info['used_memory']) if info.get('maxmemory') else 0
        }
    
    return memory_stats

def optimize_memory_configuration(node):
    """
    优化节点内存配置
    """
    optimizations = []
    
    # 配置内存使用策略
    configs = {
        'maxmemory-policy': 'allkeys-lru',  # LRU淘汰策略
        'hash-max-ziplist-entries': 512,    # 哈希压缩列表优化
        'hash-max-ziplist-value': 64,
        'list-max-ziplist-entries': 512,    # 列表压缩列表优化
        'list-max-ziplist-value': 64,
        'set-max-intset-entries': 512,      # 集合整数编码优化
        'zset-max-ziplist-entries': 128,    # 有序集合压缩列表优化
        'zset-max-ziplist-value': 64
    }
    
    for config, value in configs.items():
        try:
            node.config_set(config, value)
            optimizations.append(f"Set {config} = {value}")
        except Exception as e:
            print(f"Failed to set {config}: {e}")
    
    return optimizations

5. 监控和告警系统:

python
import psutil
import json
from datetime import datetime

class ClusterMonitor:
    def __init__(self, cluster_nodes):
        self.nodes = cluster_nodes
        self.thresholds = {
            'memory_usage': 0.8,        # 内存使用率阈值
            'cpu_usage': 0.7,           # CPU使用率阈值
            'connection_usage': 0.9,     # 连接使用率阈值
            'response_time': 100,        # 响应时间阈值(ms)
            'error_rate': 0.01          # 错误率阈值
        }
    
    def collect_metrics(self):
        """
        收集集群指标
        """
        metrics = {
            'timestamp': datetime.now().isoformat(),
            'nodes': {}
        }
        
        for node in self.nodes:
            node_key = f"{node.connection_pool.connection_kwargs['host']}:{node.connection_pool.connection_kwargs['port']}"
            
            # Redis指标
            info = node.info()
            
            # 系统指标
            cpu_percent = psutil.cpu_percent()
            memory = psutil.virtual_memory()
            
            metrics['nodes'][node_key] = {
                # Redis指标
                'connected_clients': info['connected_clients'],
                'used_memory': info['used_memory'],
                'instantaneous_ops_per_sec': info['instantaneous_ops_per_sec'],
                'keyspace_hits': info['keyspace_hits'],
                'keyspace_misses': info['keyspace_misses'],
                'hit_rate': info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses']) if (info['keyspace_hits'] + info['keyspace_misses']) > 0 else 0,
                
                # 系统指标
                'cpu_usage': cpu_percent,
                'memory_usage': memory.percent / 100,
                
                # 集群指标
                'cluster_state': node.execute_command('CLUSTER', 'INFO').decode().split('\r\n')[0].split(':')[1],
                'cluster_slots_assigned': 16384,  # 总槽位数
            }
        
        return metrics
    
    def check_alerts(self, metrics):
        """
        检查告警条件
        """
        alerts = []
        
        for node_key, node_metrics in metrics['nodes'].items():
            # 内存使用率告警
            if node_metrics['memory_usage'] > self.thresholds['memory_usage']:
                alerts.append({
                    'type': 'high_memory_usage',
                    'node': node_key,
                    'value': node_metrics['memory_usage'],
                    'threshold': self.thresholds['memory_usage']
                })
            
            # CPU使用率告警
            if node_metrics['cpu_usage'] > self.thresholds['cpu_usage']:
                alerts.append({
                    'type': 'high_cpu_usage',
                    'node': node_key,
                    'value': node_metrics['cpu_usage'],
                    'threshold': self.thresholds['cpu_usage']
                })
            
            # 集群状态告警
            if node_metrics['cluster_state'] != 'ok':
                alerts.append({
                    'type': 'cluster_state_error',
                    'node': node_key,
                    'state': node_metrics['cluster_state']
                })
        
        return alerts

# 使用示例
monitor = ClusterMonitor(cluster_nodes)
metrics = monitor.collect_metrics()
alerts = monitor.check_alerts(metrics)

if alerts:
    print("⚠️  发现告警:")
    for alert in alerts:
        print(f"  - {alert}")

面试要点:

  • 理解热点数据的检测和处理策略
  • 掌握跨槽位操作的优化方法
  • 了解集群性能监控的关键指标

这些Redis集群架构高级面试题深入探讨了集群的分片机制、故障转移、扩缩容、性能优化等核心内容,适合高级后端工程师和架构师岗位的技术评估。

正在精进