我用Redis分布式锁,抢了瓶茅台,然后GG了~~"/>
我用Redis分布式锁,抢了瓶茅台,然后GG了~~
在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常,我们以synchronized 、Lock
来使用它(单机情况)
我们来看一个案例:
高并发下单超卖问题
@Autowired
RedisTemplate<String,String> redisTemplate;
String maotai = "maotai20210321001";//茅台商品编号
@PostConstruct
public void init(){
//此处模拟向缓存中存入商品库存操作
redisTemplate.opsForValue().set(maotai,"100");
}
@GetMapping("/get/maotai2")
public String seckillMaotai2() {
synchronized (this) {
Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1
//如果还有库存
if (count > 0) {
//抢到了茅台,库存减一
redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));
//后续操作 do something
log.info("我抢到茅台了!");
return "ok";
}else {
return "no";
}
}
}
复制代码
问题分析:
- 现象:本地锁在多节点下失效(集群/分布式)
- 原因:本地锁它只能锁住本地JVM进程中的多个线程,对于多个JVM进程的不同线程间是锁不住的
- 解决:分布式锁(在分布式环境下提供锁服务,并且达到本地锁的效果)
何为分布式锁
- 当在分布式架构下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
- 用一个状态值表示锁,对锁的占用和释放通过状态值来标识。
分布式锁特点
- 互斥性:不仅要在同一jvm进程下的不同线程间互斥,更要在不同jvm进程下的不同线程间互斥
- 锁超时:支持锁的自动释放,防止死锁
- 正确,高效,高可用:解铃还须系铃人(加锁和解锁必须是同一个线程),加锁和解锁操作一定要高效,提供锁的服务要具备容错性
- 可重入:如果一个线程拿到了锁之后继续去获取锁还能获取到,我们称锁是可重入的(方法的递归调用)
- 阻塞/非阻塞:如果获取不到直接返回视为非阻塞的,如果获取不到会等待锁的释放直到获取锁或者等待超时,视为阻塞的
- 公平/非公平:按照请求的顺序获取锁视为公平的
基于Redis实现分布式锁
实现思路:
锁的实现主要基于redis的SETNX
命令:
SETNX key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
返回值: 设置成功,返回 1 设置失败,返回 0
使用SETNX
完成同步锁的流程及事项如下:
- 使用
SETNX
命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功 - 为了防止获取锁后程序出现异常,导致其他线程/进程调用
SETNX
命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间 - 释放锁,使用
DEL
命令将锁数据删除
实现代码版本1:
@GetMapping("/get/maotai3")public String seckillMaotai3() {//获取锁Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey, "1");if (islock) {//设置锁的过期时间redisTemplate.expire(lockey,5, TimeUnit.SECONDS);try {Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1//如果还有库存if (count > 0) {//抢到了茅台,库存减一redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));//后续操作 do somethinglog.info("我抢到茅台了!");return "ok";}else {return "no";}} catch (Exception e) {e.printStackTrace();} finally {//释放锁redisTemplate.delete(lockey);}}return "dont get lock";}
复制代码
问题分析:
-
- setnx 和 expire是非原子性操作(解决:2.6以前可用使用lua脚本,2.6以后可用set命令)
- 2.错误解锁(如何保证解铃还须系铃人:给锁加一个唯一标识)
错误解锁问题解决:
@GetMapping("/get/maotai4")public String seckillMaotai4() {String requestid = UUID.randomUUID().toString() + Thread.currentThread().getId();/*String locklua ="" +"if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then redis.call('expire',KEYS[1],ARGV[2]) ; return true " +"else return false " +"end";Boolean islock = redisTemplate.execute(new RedisCallback<Boolean>() {@Overridepublic Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {Boolean eval = redisConnection.eval(locklua.getBytes(),ReturnType.BOOLEAN,1,lockey.getBytes(),requestid.getBytes(),"5".getBytes());return eval;}});*///获取锁Boolean islock = redisTemplate.opsForValue().setIfAbsent(lockey,requestid,5,TimeUnit.SECONDS);if (islock) {try {Integer count = Integer.parseInt(redisTemplate.opsForValue().get(maotai)); // 1//如果还有库存if (count > 0) {//抢到了茅台,库存减一redisTemplate.opsForValue().set(maotai,String.valueOf(count-1));//后续操作 do somethinglog.info("我抢到茅台了!");return "ok";}else {return "no";}} catch (Exception e) {e.printStackTrace();} finally {//释放锁//判断是自己的锁才能去释放 这种操作不是原子性的/*String id = redisTemplate.opsForValue().get(lockey);if (id !=null && id.equals(requestid)) {redisTemplate.delete(lockey);}*/String unlocklua = "" +"if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('del',KEYS[1]) ; return true " +"else return false " +"end";redisTemplate.execute(new RedisCallback<Boolean>() {@Overridepublic Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {Boolean eval = redisConnection.eval(unlocklua.getBytes(),ReturnType.BOOLEAN,1,lockey.getBytes(),requestid.getBytes());return eval;}});}}return "dont get lock";}
复制代码
锁续期/锁续命
/*** 3,锁续期/锁续命* 拿到锁之后执行业务,业务的执行时间超过了锁的过期时间** 如何做?* 给拿到锁的线程创建一个守护线程(看门狗),守护线程定时/延迟 判断拿到锁的线程是否还继续持有锁,如果持有则为其续期**///模拟一下守护线程为其续期ScheduledExecutorService executorService;//创建守护线程池ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<String>();//队列@PostConstructpublic void init2(){executorService = Executors.newScheduledThreadPool(1);//编写续期的luaString expirrenew = "" +"if redis.call('get',KEYS[1]) == ARGV[1] then redis.call('expire',KEYS[1],ARGV[2]) ; return true " +"else return false " +"end";executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {Iterator<String> iterator = set.iterato
更多推荐
我用Redis分布式锁,抢了瓶茅台,然后GG了~~
发布评论