前言

        在高并发场景下,同时操作数据库和缓存会存在数据不一致性的问题。这常常在面试时,面试官很喜欢问的一个问题,你们系统有用 Redis?使用Redis实现了哪些业务场景?如何保证数据的一致性?

问题

总体归纳 Redis 缓存与数据库 存在的问题,有以下两个:

1. 双写不一致的情况

数据库最终的 stock 库存值是12,但缓存最终的值却是10

2.读写并发不一致的情况

数据库正确的是stock=8 ,而缓存却是10

 针对以上的问题,如何解决?

解决方案:

  1. 对于并发几率很小的数据(如个人维度的订单数据、用户数据等),这种几乎不用考虑这个问题,很少会发生缓存不一致,可以给缓存数据加上过期时间,每隔一段时间触发读的主动更新即可。
  2. 就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期时间依然可以解决大部分业务对于缓存的要求。
  3. 如果不能容忍缓存数据不一致,可以通过加分布式读写锁保证并发读写或写写的时候按顺序排好队,读读的时候相当于无锁。
  4. 也可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是引入了新的中间件,增加了系统的复杂度。

以下使用分布式锁来解决并发问题。

案例实践

环境准备

1. 引入Maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
    </parent>

    <groupId>com.xinxin</groupId>
    <artifactId>cyh</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>19.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.16.5</version>
        </dependency>
    </dependencies>
</project>

 2. 配置 application.properties

spring.redis.host=localhost
spring.redis.port=6379

 3. 代码实现

  • redis 工具类
@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate redisTemplate;

    public void set(String key, Object value, long timeout, TimeUnit unit) {
        redisTemplate.opsForValue().set(key, value, timeout, unit);
    }


    public String get(String key) {
        return (String) redisTemplate.opsForValue().get(key);
    }

    public void expire(String key, long time, TimeUnit unit) {
        redisTemplate.expire(key, time, unit);
    }

}
  •  Model 类
@Data
public class Product {

    private Long  productId;
    /**
     * 商品名称
     */
    private String productName;
    /**
     * 商品图片
     */
    private String imageUrl;
    /**
     * 商品价格
     */
    private BigDecimal price;
    /**
     * 库存
     */
    private Integer stock;
}
  • Dao 类
@Component
public class ProductMapper {


    public Product update(Product product) {
        System.out.println("修改商品成功");
        return product;
    }

    public Product get(Long productId) {
        System.out.println("查询商品成功");
        Product product = new Product();
        product.setProductName("小米手机");
        product.setPrice(new BigDecimal(3999));
        product.setStock(10);
        product.setImageUrl("www.baidu.com");
        return product;
    }

}
  • Service 类
@Service
public class ProductService {

    @Autowired
    private ProductMapper productMapper;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private RedissonClient redissonClient;

    public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
    public static final String LOCK_PRODUCT_HOT_CACHE_PREFIX = "lock:product:hot_cache:";
    
    public static final String EMPTY_CACHE = "{}";

    public Product update(Product productParam) {
        Product productResult = productMapper.update(productParam);
        redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getProductId(), JSON.toJSONString(productResult),
                genProductCacheTimeout(), TimeUnit.SECONDS);
        return productResult;
    }

    public Product get(Long productId) {
        String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;

        String productStr = redisUtil.get(productCacheKey);
        if (productStr != null) {
            return JSON.parseObject(productStr, Product.class);
        }

        Product productResult = productMapper.get(productId);
        if (productResult != null) {
            redisUtil.set(productCacheKey, JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
            return productResult;
        }
        return null;
    }


    private Integer genProductCacheTimeout() {
        return getDayRemainingTime(new Date()) + new Random().nextInt(5) * 60 * 60 ;
    }

    private Integer genEmptyCacheTimeout() {
        return 60 + new Random().nextInt(30);
    }

    /**
     * 获取一天中剩余的时间(秒数)
     */
    public static Integer getDayRemainingTime(Date currentDate) {
        LocalDateTime midnight = LocalDateTime.ofInstant(currentDate.toInstant(),
                ZoneId.systemDefault()).plusDays(1).withHour(0).withMinute(0)
                .withSecond(0).withNano(0);
        LocalDateTime currentDateTime = LocalDateTime.ofInstant(currentDate.toInstant(),
                ZoneId.systemDefault());
        long seconds = ChronoUnit.SECONDS.between(currentDateTime, midnight);
        return (int) seconds;
    }

}
  • Controller 类
@RestController("/product")
public class ProductController {

    @Autowired
    private ProductService productService;


    @PostMapping(value = "/update")
    public Product update(@RequestBody Product productParam) {
        return productService.update(productParam);
    }

    @PostMapping("/get/{productId}")
    public Product getProduct(@PathVariable Long productId) {
        return productService.get(productId);
    }

}

针对双写不一致的问题,我们可以加把分布式锁,来使得写写能够按顺序排好队,进行操作。使得写数据库和写缓存变成原子性。我们针对update方法进行改造,用Redis 加一把写锁,代码如下:

    public Product update(Product productParam) {
        Product productResult = null;
        RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX.concat(productParam.getProductId().toString()));
        RLock writeLock = readWriteLock.writeLock();
        writeLock.lock();
        try {
            productResult = productMapper.update(productParam);
            redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getProductId(), JSON.toJSONString(productResult),
                    genProductCacheTimeout(), TimeUnit.SECONDS);
        } finally {
            writeLock.unlock();
        }

        return productResult;
    }

至此双写不一致问题就解决了,但有个问题是,所有的更新都走这个方法,不要有多个更新的方法,防止怕有些地方漏改到。

写写我们解决了,但读写并发问题不一致的情况还是会存在。结合上面图二 和 Service 层代码分析,当线程1 执行update方法,获取写锁,把商品库存10,写数据库和写缓存(图是删除缓存,结果是一样的,不影响),然后释放锁。此时,线程3 过来,执行 get方法,查询 redis 缓存,缓存为空,然后查询数据库,获取到商品的库存为10,可能因系统负载高,执行得有点慢,还没更新缓存的时候。线程2 ,可能已经获取到写锁,将库存8,更新到数据库和缓存。此时,线程3 再将商品库存10更新缓存,造成数据库和缓存的不一致。

如何解决读写并发不一致问题?

可以在get 方法的写数据库和更新缓存(删除缓存)前后加一把读锁,这个读锁的锁key和update方法的锁key是同一个key。加了读锁之后,多个线程来执行 get 方法读数据,不会阻塞,因为读读并行,但多个线程同时执行get 和 update 方法,没获取到锁的线程会被阻塞,因为读写阻塞。我们针对get方法进行改造,改造如下:

   public Product get(Long productId) {
        String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;

        String productStr = redisUtil.get(productCacheKey);
        if (productStr != null) {
            return JSON.parseObject(productStr, Product.class);
        }

        RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
        RLock readLock = readWriteLock.readLock();
        readLock.lock();
        try {
            Product productResult = productMapper.get(productId);
            if (productResult != null) {
                redisUtil.set(productCacheKey, JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
                return productResult;
            }
        } finally {
            readLock.unlock();
        }
        return null;
    }

至此双写不一致就解决了,但再想想这样的代码还有没有问题?

其实上面这些代码在中小型公司里,完全够用,但如果并发度很高的话,上面代码还是会出现问题。比如是缓存中经典的三个问题:缓存穿透,缓存失效(击穿),缓存雪崩。

缓存穿透

缓存穿透是指查询一个根本不存在的数据, 缓存层和存储层都不会命中,通常出于容错的考虑, 如果从存储层查不到数据则不写入缓存层。
缓存穿透将导致不存在的数据每次请求都要到存储层去查询, 失去了缓存保护后端存储的意义。

解决方法有两个:

  • 缓存空对象
  • 布隆过滤器

本文使用缓存空对象,来解决缓存穿透问题,改造代码如下:

 public Product get(Long productId) {
        String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;

        String productStr = redisUtil.get(productCacheKey);
        if (productStr != null) {
            return JSON.parseObject(productStr, Product.class);
        }

        RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
        RLock readLock = readWriteLock.readLock();
        readLock.lock();
        try {
            Product productResult = productMapper.get(productId);
            if (productResult != null) {
                redisUtil.set(productCacheKey, JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
                return productResult;
            }else {
                //解决缓存穿透问题,缓存空对象
                redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
            }
        } finally {
            readLock.unlock();
        }
        return null;
    }

缓存了空对象是否就解决了缓存穿透问题呢?如果有恶意攻击,同时调用 get 方法,如果缓存为空,那么大量请求都会打到数据库,那还是会存在缓存穿透问题,可能会造成数据库的崩溃,此时可以在查询数据库之前再加把锁,防止其他线程同时访问数据库。改造代码如下:

public Product get(Long productId) {
        String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;

        String productStr = redisUtil.get(productCacheKey);
        if (productStr != null) {
            return JSON.parseObject(productStr, Product.class);
        }
        RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
        hotCacheLock.lock();
        //DCL 双重检测
        productStr = redisUtil.get(productCacheKey);
        if (productStr != null) {
            return JSON.parseObject(productStr, Product.class);
        }

        try {
            RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
            RLock readLock = readWriteLock.readLock();
            readLock.lock();
            try {
                Product productResult = productMapper.get(productId);
                if (productResult != null) {
                    redisUtil.set(productCacheKey, JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
                    return productResult;
                } else {
                    //解决缓存穿透问题,缓存空对象
                    redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
                }
            } finally {
                readLock.unlock();
            }
        } finally {
            hotCacheLock.unlock();
        }
        return null;
    }

至此穿透问题解决了。

缓存失效(击穿)

由于大批量缓存在同一时间失效可能导致大量请求同时穿透缓存直达数据库,可能会造成数据库瞬间压力过大甚至挂掉。

解决方法:对于这种情况我们在批量增加缓存时最好将这一批数据的缓存过期时间设置为一个时间段内的不同时间。

可以查看上面代码中,我们设置redis的过期时间的时候,都会加上一个随机数,这样就可以把过期时间设置在不同的时间段,可以参考上面的genProductCacheTimeout方法和genEmptyCacheTimeout方法。

缓存雪崩

缓存雪崩指的是缓存层支撑不住或宕掉后, 流量会像奔逃的野牛一样, 打向后端存储层。大量请求都会打到存储层, 存储层的调用量会暴增, 造成存储层也会级联宕机的情况。

解决方法:

  1. 保证缓存层服务高可用性,比如使用Redis Sentinel或Redis Cluster。
  2. 依赖隔离组件为后端限流熔断并降级。比如使用Sentinel或Hystrix限流降级组件。

优化到此,上面的代码是否还有优化的空间。答案是肯定的,比如引入本地缓存,虽然Redis能够抗很大并发,但与本地缓存相比,还是差了一点。下面我们用本地缓存对代码再进行改造一下。那用什么来做本地缓存,Map可以吗?我觉得不行,因为Map的话,如果我们一直往里面存数据的话,会越来越大,会把我们服务器的内存空间用完,可以使用Google Guava的Caffeine,它可以指定数据大小和淘汰的策略。改造代码如下:

本地缓存

先引入 Maven 依赖

<dependency>
    <groupId>com.github.ben-manes.caffeine</groupId>
    <artifactId>caffeine</artifactId>
    <version>2.9.3</version>
</dependency>

 本地缓存代码改造如下,同时也优化了读缓存时,续命的机制。

@Service
public class ProductService {

    @Autowired
    private ProductMapper productMapper;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private RedissonClient redisson;

    public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
    public static final String LOCK_PRODUCT_HOT_CACHE_PREFIX = "lock:product:hot_cache:";

    public static final String EMPTY_CACHE = "{}";

    Cache<String, Product> productMap = Caffeine.newBuilder()     // 构建一个新的caffeine实例
            .maximumSize(100)   // 设置缓存之中保存的最大数据量
            .expireAfterAccess(3L, TimeUnit.MINUTES)    // 缓存数据在3秒内没被访问则失效
            .build();

    public Product update(Product productParam) {
        Product productResult = null;
        RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX.concat(productParam.getProductId().toString()));
        RLock writeLock = readWriteLock.writeLock();
        writeLock.lock();
        try {
            productResult = productMapper.update(productParam);
            redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getProductId(), JSON.toJSONString(productResult),
                    genProductCacheTimeout(), TimeUnit.SECONDS);
        } finally {
            writeLock.unlock();
        }

        return productResult;
    }

    public Product get(Long productId) {
        String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;

        Product productResult = getProductFromCache(productCacheKey);
        if (productResult != null) {
            return productResult;
        }

        RLock hotCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_PREFIX + productId);
        hotCacheLock.lock();
        //DCL 双重检测
        productResult = getProductFromCache(productCacheKey);
        if (productResult != null) {
            return productResult;
        }

        try {
            RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
            RLock readLock = readWriteLock.readLock();
            readLock.lock();
            try {
                productResult = productMapper.get(productId);
                if (productResult != null) {
                    redisUtil.set(productCacheKey, JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
                    //放多一份到本地缓存
                    productMap.put(productCacheKey, productResult);
                    return productResult;
                } else {
                    //解决缓存穿透问题,缓存空对象
                    redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
                }
            } finally {
                readLock.unlock();
            }
        } finally {
            hotCacheLock.unlock();
        }
        return null;
    }

    private Product getProductFromCache(String productCacheKey) {
        Product productResult = productMap.getIfPresent(productCacheKey);
        if (productResult != null) {
            return productResult;
        }

        String productStr = redisUtil.get(productCacheKey);
        if (productStr != null) {
            if (EMPTY_CACHE.equals(productStr)) {
                redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
                return new Product();
            }
            redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS); //读延期
            return JSON.parseObject(productStr, Product.class);
        }
        return null;
    }

    private Integer genEmptyCacheTimeout() {
        return 60 + new Random().nextInt(30);
    }

    private Integer genProductCacheTimeout() {
        return getDayRemainingTime(new Date()) + new Random().nextInt(5) * 60 * 60;
    }

    /**
     * 获取一天中剩余的时间(秒数)
     */
    public static Integer getDayRemainingTime(Date currentDate) {
        LocalDateTime midnight = LocalDateTime.ofInstant(currentDate.toInstant(),
                ZoneId.systemDefault()).plusDays(1).withHour(0).withMinute(0)
                .withSecond(0).withNano(0);
        LocalDateTime currentDateTime = LocalDateTime.ofInstant(currentDate.toInstant(),
                ZoneId.systemDefault());
        long seconds = ChronoUnit.SECONDS.between(currentDateTime, midnight);
        return (int) seconds;
    }

}

至此所有的优化完毕,此代码可以直接应用在生产上,能解决缓存和数据库不一致的所有的问题。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部