Redis 缓存策略实战:穿透、雪崩、击穿的解决方案

Redis 做缓存看起来简单——查之前先查 Redis,没有就查数据库然后写入 Redis。但生产环境中,这个"简单"的流程有三个经典的坑。

一、缓存穿透

问题:查询一个数据库中不存在的数据。缓存永远不会命中,每次请求都打到数据库。

场景:恶意用户用不存在的 ID 频繁查询接口。

方案一:缓存空值

def get_user(user_id):
    cache_key = f"user:{user_id}"
    cached = redis.get(cache_key)

    if cached is not None:
        if cached == "NULL":
            return None
        return json.loads(cached)

    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    if user is None:
        redis.setex(cache_key, 300, "NULL")  # 缓存空值 5 分钟
        return None

    redis.setex(cache_key, 3600, json.dumps(user))
    return user

方案二:布隆过滤器

在缓存前加一层布隆过滤器,快速判断 key 是否可能存在:

# 初始化时把所有有效 ID 加入布隆过滤器
bloom_filter = BloomFilter(capacity=1000000, error_rate=0.01)

def get_user(user_id):
    if not bloom_filter.exists(f"user:{user_id}"):
        return None  # 一定不存在,直接返回
    # 继续正常缓存流程...

二、缓存雪崩

问题:大量缓存同时过期,请求瞬间全部打到数据库。

场景:系统重启时,所有缓存同时失效。

方案一:过期时间加随机值

import random

def set_cache(key, value, base_ttl=3600):
    random_offset = random.randint(0, 600)  # 0-10 分钟随机偏移
    ttl = base_ttl + random_offset
    redis.setex(key, ttl, json.dumps(value))

方案二:缓存预热

系统启动时主动加载热点数据:

def warm_up_cache():
    hot_users = db.query("SELECT * FROM users ORDER BY access_count DESC LIMIT 1000")
    for user in hot_users:
        redis.setex(f"user:{user['id']}", 3600, json.dumps(user))

方案三:多级缓存

# L1: 本地内存缓存(TTL 短)
# L2: Redis 缓存(TTL 长)
# L3: 数据库

local_cache = TTLCache(maxsize=1000, ttl=60)

def get_data(key):
    # L1
    if key in local_cache:
        return local_cache[key]

    # L2
    cached = redis.get(key)
    if cached:
        local_cache[key] = json.loads(cached)
        return json.loads(cached)

    # L3
    data = db.query(key)
    redis.setex(key, 3600, json.dumps(data))
    local_cache[key] = data
    return data

三、缓存击穿

问题:某个热点 key 过期的瞬间,大量并发请求同时查数据库。

场景:明星发微博,缓存的热门帖子刚好过期。

方案:互斥锁

import time

def get_hot_data(key):
    cached = redis.get(key)
    if cached:
        return json.loads(cached)

    # 尝试获取锁
    lock_key = f"lock:{key}"
    if redis.set(lock_key, "1", nx=True, ex=10):
        try:
            data = db.query(key)
            redis.setex(key, 3600, json.dumps(data))
            return data
        finally:
            redis.delete(lock_key)
    else:
        # 等待其他进程加载完
        time.sleep(0.1)
        return get_hot_data(key)

四、缓存与数据库的一致性

Cache Aside Pattern(推荐)

  1. **读**:先读缓存,未命中则读数据库,然后写入缓存
  2. **写**:先更新数据库,再删除缓存
def update_user(user_id, data):
    db.update("UPDATE users SET ... WHERE id = %s", user_id)
    redis.delete(f"user:{user_id}")  # 删除而不是更新

为什么不更新缓存而是删除?因为并发写入时,更新缓存的顺序可能和数据库不一致。

五、监控指标

生产环境必须监控:

  • **缓存命中率**:低于 80% 要警惕
  • **Redis 内存使用率**:超过 80% 要扩容
  • **慢查询**:超过 10ms 的命令要排查
  • **连接数**:接近上限要调整连接池

Redis 缓存不是"加一层"那么简单。穿透、雪崩、击穿三个问题,每个都有对应的工程方案。理解原理,才能在具体场景中做出正确的选择。