Redis集群架构高级解析
🏗️ Redis Cluster深度剖析
集群架构原理
Q1: Redis Cluster的架构设计和数据分片机制?如何实现自动故障转移?
难度: ⭐⭐⭐⭐⭐
答案: Redis Cluster采用去中心化架构,通过哈希槽(hash slot)实现数据分片和负载均衡。
1. 集群架构设计:
核心架构组件:
bash
# Redis集群拓扑结构
┌─────────────────────────────────────────────────────────────┐
│ Redis Cluster │
├───────────────┬───────────────┬───────────────┬─────────────┤
│ Master 1 │ Master 2 │ Master 3 │ Master N │
│ Slot 0-5460 │ Slot 5461- │ Slot 10923- │ ... │
│ │ 10922 │ 16383 │ │
├───────────────┼───────────────┼───────────────┼─────────────┤
│ Slave 1 │ Slave 2 │ Slave 3 │ Slave N │
│ (Master 1) │ (Master 2) │ (Master 3) │ ... │
└───────────────┴───────────────┴───────────────┴─────────────┘集群搭建配置:
bash
# 创建6节点集群(3主3从)
mkdir redis-cluster
cd redis-cluster
# 创建6个Redis实例配置
for port in $(seq 7000 7005); do
mkdir $port
cat << EOF > $port/redis.conf
port $port
cluster-enabled yes
cluster-config-file nodes-${port}.conf
cluster-node-timeout 5000
cluster-announce-port $port
cluster-announce-bus-port 1${port}
appendonly yes
daemonize yes
pidfile /var/run/redis_${port}.pid
logfile ${port}/redis.log
dir ./${port}/
EOF
done
# 启动所有实例
for port in $(seq 7000 7005); do
redis-server $port/redis.conf
done
# 创建集群
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 12. Hash Slot分片机制:
哈希槽算法实现:
python
import crc16
def calculate_slot(key):
"""
计算key对应的哈希槽
Redis使用CRC16算法
"""
# 处理hash tag
start = key.find('{')
if start != -1:
end = key.find('}', start + 1)
if end != -1 and end != start + 1:
key = key[start + 1:end]
# 计算CRC16并对16384取模
return crc16.crc16xmodem(key.encode('utf-8')) % 16384
# 示例计算
keys = ['user:1001', 'user:1002', 'user:{1001}:profile', 'user:{1001}:orders']
for key in keys:
slot = calculate_slot(key)
print(f"Key: {key} -> Slot: {slot}")
# 输出示例:
# Key: user:1001 -> Slot: 9166
# Key: user:1002 -> Slot: 5586
# Key: user:{1001}:profile -> Slot: 9166 (相同hash tag)
# Key: user:{1001}:orders -> Slot: 9166 (相同hash tag)槽位分配策略:
bash
# 查看集群槽位分配
redis-cli -p 7000 cluster slots
# 输出格式:起始槽位 结束槽位 主节点信息 从节点信息
# 查看特定节点负责的槽位
redis-cli -p 7000 cluster nodes | grep master
# 手动分配槽位(用于集群扩容)
redis-cli -p 7000 cluster addslots {0..5460}
redis-cli -p 7001 cluster addslots {5461..10922}
redis-cli -p 7002 cluster addslots {10923..16383}3. 数据路由机制:
客户端路由实现:
python
import redis
from rediscluster import RedisCluster
# Redis Cluster客户端配置
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
]
# 智能客户端自动路由
rc = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True,
max_connections=32
)
# 批量操作优化
def batch_operations(rc, operations):
"""
优化批量操作,按节点分组减少网络开销
"""
# 按槽位分组操作
slot_groups = {}
for op_type, key, value in operations:
slot = calculate_slot(key)
node = rc.get_node_from_slot(slot)
node_id = f"{node['host']}:{node['port']}"
if node_id not in slot_groups:
slot_groups[node_id] = []
slot_groups[node_id].append((op_type, key, value))
# 并行执行各节点操作
results = {}
for node_id, ops in slot_groups.items():
host, port = node_id.split(':')
node_client = redis.Redis(host=host, port=int(port))
pipe = node_client.pipeline()
for op_type, key, value in ops:
if op_type == 'SET':
pipe.set(key, value)
elif op_type == 'GET':
pipe.get(key)
results[node_id] = pipe.execute()
return results4. 故障检测机制:
Gossip协议实现:
bash
# 集群节点通信端口配置
# 数据端口: 7000-7005
# 总线端口: 17000-17005 (数据端口 + 10000)
# 查看集群状态
redis-cli -p 7000 cluster info
# cluster_state:ok
# cluster_slots_assigned:16384
# cluster_slots_ok:16384
# cluster_slots_pfail:0
# cluster_slots_fail:0
# cluster_known_nodes:6
# cluster_size:3
# 节点故障检测配置
redis-cli -p 7000 config set cluster-node-timeout 5000
redis-cli -p 7000 config set cluster-slave-validity-factor 10
redis-cli -p 7000 config set cluster-migration-barrier 1故障检测流程:
python
def failure_detection_process():
"""
Redis Cluster故障检测流程
"""
print("故障检测流程:")
print("1. 节点定期发送PING消息")
print("2. 超时未收到PONG标记为PFAIL(可能故障)")
print("3. 通过Gossip协议传播PFAIL信息")
print("4. 超过半数节点认为PFAIL则标记为FAIL")
print("5. 广播FAIL消息到整个集群")
print("6. 触发从节点选举成为新主节点")
return {
'ping_interval': '1 second',
'pong_timeout': 'cluster-node-timeout',
'pfail_threshold': '1 node',
'fail_threshold': 'majority of masters',
'election_timeout': '2 * cluster-node-timeout'
}5. 自动故障转移:
主从切换过程:
bash
# 模拟主节点故障
redis-cli -p 7000 debug segfault # 强制主节点崩溃
# 监控故障转移过程
redis-cli -p 7003 cluster nodes
# 查看新的主从关系
# 手动故障转移
redis-cli -p 7003 cluster failover
# 手动故障转移(强制)
redis-cli -p 7003 cluster failover force从节点选举算法:
python
def slave_election_algorithm(slaves):
"""
从节点选举新主节点算法
"""
eligible_slaves = []
for slave in slaves:
# 检查选举条件
if (slave.replication_offset_delay < 5 and # 复制延迟小于5秒
slave.last_ping_time < 2000 and # 最后ping时间小于2秒
slave.is_connected): # 连接正常
eligible_slaves.append(slave)
if not eligible_slaves:
return None
# 按复制偏移量排序(越大越优先)
eligible_slaves.sort(key=lambda x: x.replication_offset, reverse=True)
# 选择复制偏移量最大的从节点
winner = eligible_slaves[0]
print(f"Selected new master: {winner.node_id}")
print(f"Replication offset: {winner.replication_offset}")
return winner面试要点:
- 理解哈希槽分片的优势和计算方法
- 掌握集群节点间的通信机制
- 了解故障检测和自动切换的完整流程
Q2: Redis Cluster扩缩容机制?如何实现在线数据迁移?
难度: ⭐⭐⭐⭐
答案: Redis Cluster支持动态扩缩容,通过槽位迁移实现数据重新分布,保证服务不中断。
1. 集群扩容流程:
添加新节点:
bash
# 启动新的Redis实例
redis-server --port 7006 --cluster-enabled yes \
--cluster-config-file nodes-7006.conf --daemonize yes
redis-server --port 7007 --cluster-enabled yes \
--cluster-config-file nodes-7007.conf --daemonize yes
# 添加新主节点到集群
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000
# 添加从节点并指定主节点
redis-cli --cluster add-node 127.0.0.1:7007 127.0.0.1:7000 \
--cluster-slave --cluster-master-id <7006-node-id>
# 查看集群状态
redis-cli -p 7000 cluster nodes槽位重新分配:
bash
# 自动重新分片(推荐)
redis-cli --cluster reshard 127.0.0.1:7000 \
--cluster-from all \
--cluster-to <new-node-id> \
--cluster-slots 1365 \
--cluster-yes
# 手动分配槽位
redis-cli --cluster reshard 127.0.0.1:7000
# 交互式选择源节点和目标节点2. 在线数据迁移机制:
迁移状态管理:
bash
# 查看正在迁移的槽位
redis-cli -p 7000 cluster nodes | grep importing
redis-cli -p 7000 cluster nodes | grep migrating
# 查看特定槽位迁移状态
redis-cli -p 7000 cluster slotsASK重定向机制:
python
import redis
import time
def simulate_migration_process():
"""
模拟槽位迁移过程中的ASK重定向
"""
source_client = redis.Redis(host='127.0.0.1', port=7000)
target_client = redis.Redis(host='127.0.0.1', port=7006)
slot = 1000
print("1. 设置迁移状态")
# 源节点设置migrating状态
source_client.execute_command('CLUSTER', 'SETSLOT', slot, 'MIGRATING', target_node_id)
# 目标节点设置importing状态
target_client.execute_command('CLUSTER', 'SETSLOT', slot, 'IMPORTING', source_node_id)
print("2. 开始迁移槽位中的key")
keys_in_slot = source_client.execute_command('CLUSTER', 'GETKEYSINSLOT', slot, 100)
for key in keys_in_slot:
try:
# 使用MIGRATE命令迁移key
source_client.execute_command(
'MIGRATE', '127.0.0.1', 7006, key, 0, 5000,
'COPY', 'REPLACE'
)
print(f"Migrated key: {key}")
except redis.RedisError as e:
print(f"Migration failed for {key}: {e}")
print("3. 完成迁移")
# 更新槽位分配
for node in cluster_nodes:
node.execute_command('CLUSTER', 'SETSLOT', slot, 'NODE', target_node_id)
def handle_ask_redirection(key):
"""
处理ASK重定向的客户端逻辑
"""
try:
# 尝试从源节点获取
result = source_client.get(key)
return result
except redis.ResponseError as e:
if 'ASK' in str(e):
# 解析ASK响应: ASK slot target_host:target_port
parts = str(e).split(' ')
target_host, target_port = parts[2].split(':')
# 向目标节点发送ASKING命令然后执行操作
temp_client = redis.Redis(host=target_host, port=int(target_port))
temp_client.execute_command('ASKING')
result = temp_client.get(key)
return result
else:
raise e3. 数据一致性保证:
原子性迁移:
python
def atomic_key_migration(source, target, key, timeout=5000):
"""
原子性key迁移实现
"""
try:
# 1. 使用MIGRATE命令进行原子性迁移
result = source.execute_command(
'MIGRATE',
target.connection_pool.connection_kwargs['host'],
target.connection_pool.connection_kwargs['port'],
key,
0, # destination database
timeout,
'COPY', # 复制模式,不删除源key
'REPLACE' # 如果目标存在则替换
)
if result == b'OK':
# 2. 迁移成功后删除源key
source.delete(key)
return True
except redis.RedisError as e:
print(f"Migration failed: {e}")
return False
return False
def batch_migration_with_consistency(source, target, keys):
"""
批量迁移保证一致性
"""
success_count = 0
failed_keys = []
for key in keys:
# 检查key是否仍在源节点
if not source.exists(key):
continue
# 执行原子性迁移
if atomic_key_migration(source, target, key):
success_count += 1
else:
failed_keys.append(key)
return {
'success': success_count,
'failed': failed_keys,
'total': len(keys)
}4. 集群缩容流程:
移除节点:
bash
# 1. 先迁移要删除节点的槽位到其他节点
redis-cli --cluster reshard 127.0.0.1:7000 \
--cluster-from <node-to-remove-id> \
--cluster-to <target-node-id> \
--cluster-slots <all-slots-count> \
--cluster-yes
# 2. 删除从节点
redis-cli --cluster del-node 127.0.0.1:7007 <slave-node-id>
# 3. 删除主节点(确保槽位已全部迁移)
redis-cli --cluster del-node 127.0.0.1:7006 <master-node-id>
# 4. 验证集群状态
redis-cli --cluster check 127.0.0.1:7000安全缩容检查:
python
def safe_node_removal_check(node_to_remove):
"""
安全移除节点前的检查
"""
checks = {
'has_slots': False,
'has_data': False,
'replication_ok': True,
'cluster_stable': True
}
# 检查节点是否还有槽位分配
slots = node_to_remove.execute_command('CLUSTER', 'SLOTS')
if any(slot_info for slot_info in slots if node_to_remove.node_id in slot_info):
checks['has_slots'] = True
# 检查节点是否还有数据
db_size = node_to_remove.dbsize()
if db_size > 0:
checks['has_data'] = True
# 检查集群复制状态
cluster_info = node_to_remove.execute_command('CLUSTER', 'INFO')
if 'cluster_state:fail' in cluster_info:
checks['cluster_stable'] = False
return checks
def perform_safe_removal(node_to_remove):
"""
执行安全的节点移除
"""
checks = safe_node_removal_check(node_to_remove)
if checks['has_slots']:
print("错误: 节点仍有槽位分配,请先迁移槽位")
return False
if checks['has_data']:
print("警告: 节点仍有数据,建议先进行数据清理")
if not checks['cluster_stable']:
print("错误: 集群状态不稳定,请等待集群恢复正常")
return False
# 执行节点移除
return True5. 监控和性能优化:
迁移性能监控:
python
import time
import threading
class MigrationMonitor:
def __init__(self, source_nodes, target_nodes):
self.source_nodes = source_nodes
self.target_nodes = target_nodes
self.metrics = {
'keys_migrated': 0,
'migration_rate': 0,
'errors': 0,
'start_time': time.time()
}
def monitor_migration_progress(self):
"""
监控迁移进度
"""
while self.is_migration_active():
# 统计已迁移的key数量
total_keys = sum(node.dbsize() for node in self.target_nodes)
# 计算迁移速率
elapsed = time.time() - self.metrics['start_time']
self.metrics['migration_rate'] = total_keys / elapsed if elapsed > 0 else 0
print(f"Migration Progress:")
print(f" Keys migrated: {total_keys}")
print(f" Rate: {self.metrics['migration_rate']:.2f} keys/sec")
print(f" Elapsed: {elapsed:.2f} seconds")
time.sleep(5)
def is_migration_active(self):
"""
检查是否还有迁移在进行
"""
for node in self.source_nodes + self.target_nodes:
cluster_info = node.execute_command('CLUSTER', 'INFO')
if 'migrating' in cluster_info or 'importing' in cluster_info:
return True
return False
# 使用示例
monitor = MigrationMonitor(source_nodes, target_nodes)
monitor_thread = threading.Thread(target=monitor.monitor_migration_progress)
monitor_thread.start()面试要点:
- 理解槽位迁移的原子性和一致性保证
- 掌握ASK重定向机制的实现原理
- 了解扩缩容对业务的影响和优化策略
⚡ 性能优化策略
Q3: Redis Cluster的性能优化和最佳实践?如何处理热点数据和跨槽位操作?
难度: ⭐⭐⭐⭐⭐
答案: Redis Cluster的性能优化需要从网络、内存、CPU等多个维度进行系统性优化。
1. 热点数据处理策略:
热点检测算法:
python
import time
from collections import defaultdict
from threading import Lock
class HotspotDetector:
def __init__(self, window_size=60, threshold=1000):
self.window_size = window_size
self.threshold = threshold
self.access_counts = defaultdict(list)
self.lock = Lock()
def record_access(self, key, slot):
"""
记录key访问
"""
current_time = time.time()
with self.lock:
# 清理过期记录
self.access_counts[key] = [
timestamp for timestamp in self.access_counts[key]
if current_time - timestamp <= self.window_size
]
# 添加新访问记录
self.access_counts[key].append(current_time)
def detect_hotspots(self):
"""
检测热点key
"""
hotspots = {}
current_time = time.time()
with self.lock:
for key, timestamps in self.access_counts.items():
# 统计时间窗口内的访问次数
recent_accesses = [
t for t in timestamps
if current_time - t <= self.window_size
]
if len(recent_accesses) >= self.threshold:
hotspots[key] = {
'access_count': len(recent_accesses),
'qps': len(recent_accesses) / self.window_size,
'slot': calculate_slot(key)
}
return hotspots
def hotspot_mitigation_strategy(hotspots, cluster_client):
"""
热点数据缓解策略
"""
strategies = {}
for key, info in hotspots.items():
qps = info['qps']
slot = info['slot']
if qps > 10000: # 超高QPS热点
strategies[key] = {
'strategy': 'local_cache',
'ttl': 60, # 本地缓存60秒
'description': '使用本地缓存减少Redis访问'
}
elif qps > 5000: # 高QPS热点
strategies[key] = {
'strategy': 'read_replica',
'replicas': 3,
'description': '增加读副本分散读压力'
}
elif qps > 1000: # 中等热点
strategies[key] = {
'strategy': 'hash_tag_split',
'split_count': 4,
'description': '使用hash tag拆分到多个槽位'
}
return strategies本地缓存实现:
python
import threading
import time
from functools import wraps
class LocalCache:
def __init__(self, max_size=1000, ttl=300):
self.cache = {}
self.access_times = {}
self.max_size = max_size
self.ttl = ttl
self.lock = threading.Lock()
def get(self, key):
with self.lock:
if key in self.cache:
# 检查是否过期
if time.time() - self.access_times[key] <= self.ttl:
return self.cache[key]
else:
del self.cache[key]
del self.access_times[key]
return None
def set(self, key, value):
with self.lock:
# LRU淘汰策略
if len(self.cache) >= self.max_size:
oldest_key = min(self.access_times, key=self.access_times.get)
del self.cache[oldest_key]
del self.access_times[oldest_key]
self.cache[key] = value
self.access_times[key] = time.time()
# 装饰器实现自动本地缓存
def with_local_cache(ttl=60):
local_cache = LocalCache(ttl=ttl)
def decorator(func):
@wraps(func)
def wrapper(key):
# 先查本地缓存
cached_value = local_cache.get(key)
if cached_value is not None:
return cached_value
# 缓存miss,从Redis获取
value = func(key)
if value is not None:
local_cache.set(key, value)
return value
return wrapper
return decorator
# 使用示例
@with_local_cache(ttl=60)
def get_user_info(user_id):
return redis_cluster.hgetall(f"user:{user_id}")2. 跨槽位操作优化:
Hash Tag策略:
python
def design_hash_tag_strategy(related_keys):
"""
设计Hash Tag策略确保相关key在同一槽位
"""
strategies = {
'user_data': {
'pattern': 'user:{user_id}:{suffix}',
'hash_tag': '{user_id}',
'examples': [
'user:{1001}:profile',
'user:{1001}:orders',
'user:{1001}:preferences'
]
},
'session_data': {
'pattern': 'session:{session_id}:{type}',
'hash_tag': '{session_id}',
'examples': [
'session:{abc123}:data',
'session:{abc123}:expires',
'session:{abc123}:permissions'
]
},
'shopping_cart': {
'pattern': 'cart:{user_id}:{item_type}',
'hash_tag': '{user_id}',
'examples': [
'cart:{1001}:items',
'cart:{1001}:total',
'cart:{1001}:discount'
]
}
}
return strategies
def batch_operation_with_hash_tags(operations):
"""
基于Hash Tag的批量操作优化
"""
# 按槽位分组操作
slot_groups = defaultdict(list)
for op_type, key, value in operations:
slot = calculate_slot(key)
slot_groups[slot].append((op_type, key, value))
# 并行执行各槽位操作
results = {}
for slot, ops in slot_groups.items():
node = get_node_by_slot(slot)
pipe = node.pipeline()
for op_type, key, value in ops:
if op_type == 'SET':
pipe.set(key, value)
elif op_type == 'GET':
pipe.get(key)
elif op_type == 'HSET':
pipe.hset(key, value['field'], value['value'])
results[slot] = pipe.execute()
return results
def transaction_with_hash_tag(user_id, operations):
"""
使用Hash Tag的事务操作
"""
# 确保所有key都使用相同的hash tag
tagged_ops = []
for op_type, key, value in operations:
# 如果key不包含hash tag,添加用户ID作为hash tag
if '{' not in key:
tagged_key = f"{{{user_id}}}:{key}"
else:
tagged_key = key
tagged_ops.append((op_type, tagged_key, value))
# 所有操作将在同一节点执行
slot = calculate_slot(f"{{{user_id}}}:")
node = get_node_by_slot(slot)
# 使用事务执行
pipe = node.pipeline()
pipe.multi()
for op_type, key, value in tagged_ops:
if op_type == 'SET':
pipe.set(key, value)
elif op_type == 'INCR':
pipe.incr(key)
return pipe.execute()3. 网络和连接优化:
连接池优化:
python
from rediscluster import RedisCluster
import redis
def create_optimized_cluster_client():
"""
创建优化的集群客户端
"""
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
]
return RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
# 连接池优化
max_connections=32, # 每个节点最大连接数
max_connections_per_node=16, # 单节点连接限制
# 健康检查
health_check_interval=30, # 健康检查间隔
# 重试策略
retry_on_timeout=True,
retry_on_cluster_down=True,
# 性能优化
skip_full_coverage_check=True, # 跳过完整覆盖检查
socket_keepalive=True, # 启用TCP keepalive
socket_keepalive_options={
'TCP_KEEPIDLE': 600,
'TCP_KEEPINTVL': 30,
'TCP_KEEPCNT': 3
},
# 超时配置
socket_timeout=5,
socket_connect_timeout=5,
)
def pipeline_optimization_example():
"""
Pipeline批量操作优化
"""
rc = create_optimized_cluster_client()
# 智能pipeline:按节点分组
operations = [
('SET', 'user:1001', 'data1'),
('SET', 'user:1002', 'data2'),
('GET', 'user:1001'),
('INCR', 'counter:1001')
]
# 按节点分组操作
node_operations = defaultdict(list)
for op in operations:
key = op[1]
slot = calculate_slot(key)
node = rc.get_node_from_slot(slot)
node_operations[node['name']].append(op)
# 并行执行pipeline
results = {}
for node_name, ops in node_operations.items():
node_client = rc.get_redis_connection(node_name)
pipe = node_client.pipeline()
for op_type, key, *args in ops:
getattr(pipe, op_type.lower())(key, *args)
results[node_name] = pipe.execute()
return results4. 内存优化策略:
内存使用分析:
python
def analyze_cluster_memory_usage(cluster_nodes):
"""
分析集群内存使用情况
"""
memory_stats = {}
for node in cluster_nodes:
info = node.info('memory')
slots_info = node.execute_command('CLUSTER', 'SLOTS')
# 获取节点负责的槽位数量
node_slots = sum(
slot_range[1] - slot_range[0] + 1
for slot_range in slots_info
if node.connection_pool.connection_kwargs['port'] in slot_range[2]
)
memory_stats[f"{node.connection_pool.connection_kwargs['host']}:{node.connection_pool.connection_kwargs['port']}"] = {
'used_memory': info['used_memory'],
'used_memory_human': info['used_memory_human'],
'used_memory_rss': info['used_memory_rss'],
'mem_fragmentation_ratio': info['mem_fragmentation_ratio'],
'slots_count': node_slots,
'keys_count': node.dbsize(),
'avg_memory_per_key': info['used_memory'] / max(node.dbsize(), 1),
'memory_usage_rate': info['used_memory'] / info.get('maxmemory', info['used_memory']) if info.get('maxmemory') else 0
}
return memory_stats
def optimize_memory_configuration(node):
"""
优化节点内存配置
"""
optimizations = []
# 配置内存使用策略
configs = {
'maxmemory-policy': 'allkeys-lru', # LRU淘汰策略
'hash-max-ziplist-entries': 512, # 哈希压缩列表优化
'hash-max-ziplist-value': 64,
'list-max-ziplist-entries': 512, # 列表压缩列表优化
'list-max-ziplist-value': 64,
'set-max-intset-entries': 512, # 集合整数编码优化
'zset-max-ziplist-entries': 128, # 有序集合压缩列表优化
'zset-max-ziplist-value': 64
}
for config, value in configs.items():
try:
node.config_set(config, value)
optimizations.append(f"Set {config} = {value}")
except Exception as e:
print(f"Failed to set {config}: {e}")
return optimizations5. 监控和告警系统:
python
import psutil
import json
from datetime import datetime
class ClusterMonitor:
def __init__(self, cluster_nodes):
self.nodes = cluster_nodes
self.thresholds = {
'memory_usage': 0.8, # 内存使用率阈值
'cpu_usage': 0.7, # CPU使用率阈值
'connection_usage': 0.9, # 连接使用率阈值
'response_time': 100, # 响应时间阈值(ms)
'error_rate': 0.01 # 错误率阈值
}
def collect_metrics(self):
"""
收集集群指标
"""
metrics = {
'timestamp': datetime.now().isoformat(),
'nodes': {}
}
for node in self.nodes:
node_key = f"{node.connection_pool.connection_kwargs['host']}:{node.connection_pool.connection_kwargs['port']}"
# Redis指标
info = node.info()
# 系统指标
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
metrics['nodes'][node_key] = {
# Redis指标
'connected_clients': info['connected_clients'],
'used_memory': info['used_memory'],
'instantaneous_ops_per_sec': info['instantaneous_ops_per_sec'],
'keyspace_hits': info['keyspace_hits'],
'keyspace_misses': info['keyspace_misses'],
'hit_rate': info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses']) if (info['keyspace_hits'] + info['keyspace_misses']) > 0 else 0,
# 系统指标
'cpu_usage': cpu_percent,
'memory_usage': memory.percent / 100,
# 集群指标
'cluster_state': node.execute_command('CLUSTER', 'INFO').decode().split('\r\n')[0].split(':')[1],
'cluster_slots_assigned': 16384, # 总槽位数
}
return metrics
def check_alerts(self, metrics):
"""
检查告警条件
"""
alerts = []
for node_key, node_metrics in metrics['nodes'].items():
# 内存使用率告警
if node_metrics['memory_usage'] > self.thresholds['memory_usage']:
alerts.append({
'type': 'high_memory_usage',
'node': node_key,
'value': node_metrics['memory_usage'],
'threshold': self.thresholds['memory_usage']
})
# CPU使用率告警
if node_metrics['cpu_usage'] > self.thresholds['cpu_usage']:
alerts.append({
'type': 'high_cpu_usage',
'node': node_key,
'value': node_metrics['cpu_usage'],
'threshold': self.thresholds['cpu_usage']
})
# 集群状态告警
if node_metrics['cluster_state'] != 'ok':
alerts.append({
'type': 'cluster_state_error',
'node': node_key,
'state': node_metrics['cluster_state']
})
return alerts
# 使用示例
monitor = ClusterMonitor(cluster_nodes)
metrics = monitor.collect_metrics()
alerts = monitor.check_alerts(metrics)
if alerts:
print("⚠️ 发现告警:")
for alert in alerts:
print(f" - {alert}")面试要点:
- 理解热点数据的检测和处理策略
- 掌握跨槽位操作的优化方法
- 了解集群性能监控的关键指标
这些Redis集群架构高级面试题深入探讨了集群的分片机制、故障转移、扩缩容、性能优化等核心内容,适合高级后端工程师和架构师岗位的技术评估。
