针对RedisTemplate实现WatchDog – Karos (wzl1.top)
但是这个方法仍旧有问题存在,下面我们来聊一聊
把稳,这篇文章实质上只是谈论分布式锁的问题,如果要看源代码剖析,可以看看这位大佬的代码,我在这里引入代码只是想要解释一些办理方案:分布式锁 | Joseph's Blog (gitee.io)

最开始大家刚学分布式锁的时候,用的是这个指令,
setnx key value
然后利用expire给他设置过期韶光
看似没有问题
难道真的没问题吗?
试想一下,在高并发下,redis涌现了雪崩,那么你设置了setnx,但是在设置expire之前崩了,呃呃呃~
没错,要办理这个问题,得实现原子性,原子性,我们在MySQL里面通过学习了事务来办理
那么redis,能不能类似实现事务呢?实在redis本身是有事务的,但是这种大略的语句,用Lua也行(没错,便是你打游戏开脚本哪个)
但是在这里我们不讲Lua,紧张说一下思想,实在便是通过lua将两个原子语句封装在一起,再发送给redis做事器进行实行
lua-redis快速入门直接看末了
这个分布式锁实现过于大略,就不在这里说了,hhh~
Redis官方针对SETNX的改动实在Redis官方在后面也看出了SETNX的缺陷,以是他在2.6.12版本开始,加入了一个新的指令
bash复制代码set key value EX|PX nx|ex
EX|PX是expire和pexpire,nx是不存在则实行,ex是存在则实行
大略说下,然后RedisTemplate.setValue().setIfAbsent()方法也进行了重写
这样就担保了原子性,这个方法在我之前的文章里面也用过。
并且我参考Redisson的思想制作了分布式锁看门狗机制
当时实在是在想续期问题如何办理,办理之后就觉得自己
直到最近,群佬看博客,指出了一个问题,这个方法是否可重入呢?
说到这里,你可能对可重入有点迷惑,那么现在,我们来先容一下可重入锁
可重入锁什么是可重入锁?
来看看先容吧。
可重入便是说某个线程已经得到某个锁,可以再次获取锁而不会涌现去世锁。
大略的例子,这里我用伪代码来阐明
java复制代码syn(this){ sout("加锁成功A") syn(this){ sout("加锁成功B") }}
那么这里,我们会发生什么呢?
按照没打仗可重入锁的情形或者没有这样试过的情形来说,实行完 sout("加锁成功A")后便会产生去世锁问题
而可重入锁,便是说,在此时,你依然可以进入并实行sout("加锁成功B")
那么运用处景?
最随意马虎想到的是递归调用,但是还有其他的业务方面可以说一说,
比如你要调用业务方法A,业务A中有操纵了要上锁业务B,同时业务A又须要全局上锁,那么这个地方就须要可重入了
基于Redis-Hash的可重入锁实现在Redisson中,采取的是hash进行锁的存储,然后对hash设置一个过期韶光
大概的数据构造是这样的
hashname为key,hashkey为thread1,value是锁的重入次数
但是这里我要提一点,这里的thread1,可不仅仅是threadId,利用分布式锁常日是在分布式、微做事i项眼前,不同的做事中也有可能涌现线程ID相同的问题,以是这里加一个做事名,实在天生个UUID就可以了
大概的格式便是这样:
bash复制代码mylock:HEX(uuid+theadId):num
但是还有个设置过期韶光的问题,如何设置?
我这里随着我之前的帖子来讲,在那里我是利用的RedisTemplate来实现分布式锁+看门狗机制
但是没有考虑可重入的问题,那么我这次就加上
我们要加过期韶光,同时又要确保原子性,那么就用Lua
加锁对付加锁的Lua如下
lua复制代码 -- 如果Lock存在 if redis.call("exists",KEYS[1]) ~= 0 then -- 如果不是自己的锁 if redis.call("exists",KEYS[1],ARGS[1]) == 0 then -- 不是自己的锁,返回剩余韶光 return redis.call("pttl",KEYS[1]); end -- 如果是自己的锁就记录次数 redis.call("hincrby",KEYS[1],ARGS[1],1); -- 延期 redis.call("pexpire",KEYS[1],ARGS[2]); else redis.call("hset",KEYS[1],ARGS[1],1); -- 设置默认延期 redis.call("pexpire",KEYS[1],ARGS[2]); end -- 如果Lock不存在,那么就直接加上就可以了,hhh return nil;
这里阐明一下KEY和ARG,key是hash名,args是指命令携带参数
key1:索命
args1:做事线程唯一ID
args2:过期韶光
然后在代码里面的实现
解锁
解锁也差不多
lua复制代码-- 解锁的逻辑和加锁相似 -- 如果Lock存在 if redis.call("exists",KEYS[1]) ~= 0 then -- 如果是自己的锁 if redis.call("hexists",KEYS[1],ARGS[1]) ~= 0 then -- 如果是末了一层 直接delete if redis.call("hget",KEYS[1],ARGS[1]) == 0 then redis.call("del",KEYs[1]); a=0 else -- 如果不是,那么久锁层数减一 a=redis.call("hincrby",KEYS[1],ARGS[1],-1); end end return a; end -- 如果Lock不存在,那么就return,hhh return nil;
续期的话本来便是一条语句,不变就可以了
然后我和之前的代码比较,自旋锁改了一下,hhh
看门狗机制实现
之前实在已经实现过,这里就再来看看吧,这里我为了方便一点,用的Hutool来演示,但是实际用的时候还是用Netty等框架比较好,毕竟Redission也是用的Netty
目前还存在的问题+Reddisson源码剖析 —— 自旋锁
没错,别以为这样就完了,细心的话会创造我上面的代码里面,写的是最暴力的自旋锁(图一个方便,hhh)
如果说一贯循环下去,那么无疑是非常摧残浪费蹂躏CPU的
那么如何办理?
办理方案细心的同学已经创造了,在我加锁失落败的时候,会返回一个ttl,也便是当前key还有多久失落效
那么我们是不是可以在while里面是指一个壅塞,然后等过了这么久再唤醒线程就可以了?
没错,Reddisson底层也是这样实现的,基于Redis发布订阅,但是这里我给大家大略引个路子
你可以理解为把壅塞的线程ID放进一个壅塞行列步队里面,而我们的做事器就去订阅这个行列步队,实在这个行列步队在Redis里面叫做Channel,感兴趣的可以去看看。
那么是如何订阅的呢?
其实在源代码中,Redisson是放了一个“检测器”来进行监听
下面来看看Redisson加锁的代码
壅塞加锁源码 lock()java复制代码//壅塞加锁private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return;//这里拿到锁了 } CompletableFuture<RedissonLockEntry> future = subscribe(threadId);//对当前哨程进行订阅 pubSub.timeout(future);//设置订阅超时 RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { // 循环重试获取锁,直至重新获取锁成功才跳出循环 // 此种做法壅塞进程,一贯处于等待锁手动开释或者超时才连续线程 ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); }// get(lockAsync(leaseTime, unit)); }
非壅塞加锁
java复制代码//不壅塞加锁,waitTime是最大容忍韶光,这个观点不做过多阐明,便是等待你自选的韶光@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 打算第一次考试测验获取锁后剩余的韶光 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId);//获取失落败 return false; } current = System.currentTimeMillis(); //订阅 CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS);//设置一个最多订阅韶光 } catch (TimeoutException e) { if (!subscribeFuture.completeExceptionally(new RedisTimeoutException( "Unable to acquire subscription lock after " + time + "ms. " + "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);//壅塞,等待 } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);//壅塞,等待 } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); }// return get(tryLockAsync(waitTime, leaseTime, unit)); }
订阅
其上述类容中的订阅,都通过下面的方法进行回调,在解锁的时候会发布
java复制代码package org.redisson.pubsub;import org.redisson.RedissonLockEntry;import java.util.concurrent.CompletableFuture;/ LockPubSub类是一个用于锁的发布-订阅实现。 它继续自PublishSubscribe类,用于处理锁的订阅和发布。 锁的订阅者是RedissonLockEntry工具。 当吸收到特定的时,会实行相应的操作。 /public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { // 解锁 public static final Long UNLOCK_MESSAGE = 0L; // 读锁解锁 public static final Long READ_UNLOCK_MESSAGE = 1L; public LockPubSub(PublishSubscribeService service) { super(service); } @Override protected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(UNLOCK_MESSAGE)) { // 获取等待实行的Runnable工具,并实行 Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } // 开释锁计数器 value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { // 循环实行等待实行的Runnable工具,并实行 while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } // 开释锁计数器,开释所有等待的读锁 value.getLatch().release(value.getLatch().getQueueLength()); } }}
总结
实在这里收成最大的,该当是自旋锁,虽然说在末了利用了发布订阅来完成异步唤醒,但还是有一些缺陷,比如这个锁是否公正,如果说,这里要让你实现公正锁,读者你又打算如何办理?
想到这里,我又想到了Reactor模型,实在我们可以做一个BossGroup来存放一下壅塞线程ID,实在便是一个壅塞行列步队
再用一个WorkerGroup来对每个ThreadID进行处理,当然这里供应的是一个思路,如果要完成的话,相称于是写一个小型中间件,也挺故意思,后面打算试一试,hhh
Redis-Lua快速学习当编写 Lua 脚本与 Redis 进行交互时,以下是一些常用的 Lua 脚本指南和技巧:
命令调用:利用 redis.call 函数来调用 Redis 命令。例如,redis.call('GET', 'mykey') 将调用 Redis 的 GET 命令并返回键为 'mykey' 的值。参数访问:可以利用 KEYS 表来访问通报给 Lua 脚本的键列表,利用 ARGV 表来访问通报给 Lua 脚本的额外参数。例如,KEYS[1] 表示第一个键,ARGV[1] 表示第一个额外参数。返回结果:Lua 脚本可以通过利用 return 语句来返回结果。例如,return redis.call('GET', 'mykey') 将返回键为 'mykey' 的值。循环和条件:Lua 供应了一些基本的循环和条件语句,例如 for、while、if 等,可以在 Lua 脚本中利用。容错处理:在编写 Lua 脚本时,可以考虑添加容错处理,例如利用 pcall 函数来捕获 Redis 命令的缺点并进行处理。事务支持:Redis 的 Lua 脚本支持事务,可以利用 redis.call('MULTI') 开始事务,然后利用 redis.call('EXEC') 实行事务。在事务中,可以实行多个 Redis 命令,并将其作为一个原子操作进行提交或回滚。脚本缓存:Redis 可以缓存 Lua 脚本,以提高实行效率。您可以利用 EVALSHA 命令来实行缓存的脚本。在 Java RedisTemplate 中,您可以利用 execute 方法的 execute(script, keys, args) 形式来实行缓存的脚本。这些指南和技巧可帮助您编写更繁芜和灵巧的 Lua 脚本与 Redis 进行交互。在编写 Lua 脚本时,请参考 Redis 官方文档以及 Lua 官方文档,以理解更多 Lua 编程措辞和 Redis 命令的细节和用法。
当编写 Lua 脚本时,可以利用循环和条件语句来实现逻辑掌握。以下是一些示例:
利用 for 循环:lua复制代码for i = 1, 10 do -- 实行操作,例如打印循环变量 print(i)end
利用 while 循环:
lua复制代码local i = 1while i <= 10 do -- 实行操作,例如打印循环变量 print(i) i = i + 1end
利用 if-else 条件:
lua复制代码local num = 5if num < 0 then print("Number is negative")elseif num == 0 then print("Number is zero")else print("Number is positive")end
利用 repeat-until 循环:
lua复制代码local i = 1repeat -- 实行操作,例如打印循环变量 print(i) i = i + 1until i > 10
这些示例展示了在 Lua 脚本中利用循环和条件语句的基本用法。您可以根据自己的需求和逻辑在 Lua 脚本中编写更繁芜的循环和条件掌握构造。请把稳,在 Lua 中,条件语句利用 if-elseif-else 构造,而不是像其他编程措辞中的 if-else 构造。此外,Lua 的索引从 1 开始,而不是从 0 开始,这与一些其他编程措辞有所不同。
请确保根据您的实际需求和逻辑编写精确的循环和条件掌握构造,并根据 Redis 脚本的哀求将其集成到您的 Lua 脚本中。
作者:Karos链接:https://juejin.cn/post/7244820297290645541