首页 » 网站推广 » phpredistimeout技巧_Redis分布式锁深入分析

phpredistimeout技巧_Redis分布式锁深入分析

访客 2024-11-28 0

扫一扫用手机浏览

文章目录 [+]

针对RedisTemplate实现WatchDog – Karos (wzl1.top)

但是这个方法仍旧有问题存在,下面我们来聊一聊

phpredistimeout技巧_Redis分布式锁深入分析

把稳,这篇文章实质上只是谈论分布式锁的问题,如果要看源代码剖析,可以看看这位大佬的代码,我在这里引入代码只是想要解释一些办理方案:分布式锁 | Joseph's Blog (gitee.io)

phpredistimeout技巧_Redis分布式锁深入分析
(图片来自网络侵删)
最原始的Redis分布式锁

最开始大家刚学分布式锁的时候,用的是这个指令,

setnx key value

然后利用expire给他设置过期韶光

看似没有问题

难道真的没问题吗?

试想一下,在高并发下,redis涌现了雪崩,那么你设置了setnx,但是在设置expire之前崩了,呃呃呃~

没错,要办理这个问题,得实现原子性,原子性,我们在MySQL里面通过学习了事务来办理

那么redis,能不能类似实现事务呢?实在redis本身是有事务的,但是这种大略的语句,用Lua也行(没错,便是你打游戏开脚本哪个)

但是在这里我们不讲Lua,紧张说一下思想,实在便是通过lua将两个原子语句封装在一起,再发送给redis做事器进行实行

lua-redis快速入门直接看末了

这个分布式锁实现过于大略,就不在这里说了,hhh~

Redis官方针对SETNX的改动

实在Redis官方在后面也看出了SETNX的缺陷,以是他在2.6.12版本开始,加入了一个新的指令

bash复制代码set key value EX|PX nx|ex

EX|PX是expire和pexpire,nx是不存在则实行,ex是存在则实行

大略说下,然后RedisTemplate.setValue().setIfAbsent()方法也进行了重写

这样就担保了原子性,这个方法在我之前的文章里面也用过。

并且我参考Redisson的思想制作了分布式锁看门狗机制

当时实在是在想续期问题如何办理,办理之后就觉得自己

直到最近,群佬看博客,指出了一个问题,这个方法是否可重入呢?

说到这里,你可能对可重入有点迷惑,那么现在,我们来先容一下可重入锁

可重入锁

什么是可重入锁?

来看看先容吧。

可重入便是说某个线程已经得到某个锁,可以再次获取锁而不会涌现去世锁。

大略的例子,这里我用伪代码来阐明

java复制代码syn(this){ sout("加锁成功A") syn(this){ sout("加锁成功B") }}

那么这里,我们会发生什么呢?

按照没打仗可重入锁的情形或者没有这样试过的情形来说,实行完 sout("加锁成功A")后便会产生去世锁问题

而可重入锁,便是说,在此时,你依然可以进入并实行sout("加锁成功B")

那么运用处景?

最随意马虎想到的是递归调用,但是还有其他的业务方面可以说一说,

比如你要调用业务方法A,业务A中有操纵了要上锁业务B,同时业务A又须要全局上锁,那么这个地方就须要可重入了

基于Redis-Hash的可重入锁实现

在Redisson中,采取的是hash进行锁的存储,然后对hash设置一个过期韶光

大概的数据构造是这样的

hashname为key,hashkey为thread1,value是锁的重入次数

但是这里我要提一点,这里的thread1,可不仅仅是threadId,利用分布式锁常日是在分布式、微做事i项眼前,不同的做事中也有可能涌现线程ID相同的问题,以是这里加一个做事名,实在天生个UUID就可以了

大概的格式便是这样:

bash复制代码mylock:HEX(uuid+theadId):num

但是还有个设置过期韶光的问题,如何设置?

我这里随着我之前的帖子来讲,在那里我是利用的RedisTemplate来实现分布式锁+看门狗机制

但是没有考虑可重入的问题,那么我这次就加上

我们要加过期韶光,同时又要确保原子性,那么就用Lua

加锁

对付加锁的Lua如下

lua复制代码 -- 如果Lock存在 if redis.call("exists",KEYS[1]) ~= 0 then -- 如果不是自己的锁 if redis.call("exists",KEYS[1],ARGS[1]) == 0 then -- 不是自己的锁,返回剩余韶光 return redis.call("pttl",KEYS[1]); end -- 如果是自己的锁就记录次数 redis.call("hincrby",KEYS[1],ARGS[1],1); -- 延期 redis.call("pexpire",KEYS[1],ARGS[2]); else redis.call("hset",KEYS[1],ARGS[1],1); -- 设置默认延期 redis.call("pexpire",KEYS[1],ARGS[2]); end -- 如果Lock不存在,那么就直接加上就可以了,hhh return nil;

这里阐明一下KEY和ARG,key是hash名,args是指命令携带参数

key1:索命

args1:做事线程唯一ID

args2:过期韶光

然后在代码里面的实现

解锁

解锁也差不多

lua复制代码-- 解锁的逻辑和加锁相似 -- 如果Lock存在 if redis.call("exists",KEYS[1]) ~= 0 then -- 如果是自己的锁 if redis.call("hexists",KEYS[1],ARGS[1]) ~= 0 then -- 如果是末了一层 直接delete if redis.call("hget",KEYS[1],ARGS[1]) == 0 then redis.call("del",KEYs[1]); a=0 else -- 如果不是,那么久锁层数减一 a=redis.call("hincrby",KEYS[1],ARGS[1],-1); end end return a; end -- 如果Lock不存在,那么就return,hhh return nil;

续期的话本来便是一条语句,不变就可以了

然后我和之前的代码比较,自旋锁改了一下,hhh

看门狗机制实现

之前实在已经实现过,这里就再来看看吧,这里我为了方便一点,用的Hutool来演示,但是实际用的时候还是用Netty等框架比较好,毕竟Redission也是用的Netty

目前还存在的问题+Reddisson源码剖析 —— 自旋锁

没错,别以为这样就完了,细心的话会创造我上面的代码里面,写的是最暴力的自旋锁(图一个方便,hhh)

如果说一贯循环下去,那么无疑是非常摧残浪费蹂躏CPU的

那么如何办理?

办理方案

细心的同学已经创造了,在我加锁失落败的时候,会返回一个ttl,也便是当前key还有多久失落效

那么我们是不是可以在while里面是指一个壅塞,然后等过了这么久再唤醒线程就可以了?

没错,Reddisson底层也是这样实现的,基于Redis发布订阅,但是这里我给大家大略引个路子

你可以理解为把壅塞的线程ID放进一个壅塞行列步队里面,而我们的做事器就去订阅这个行列步队,实在这个行列步队在Redis里面叫做Channel,感兴趣的可以去看看。

那么是如何订阅的呢?

其实在源代码中,Redisson是放了一个“检测器”来进行监听

下面来看看Redisson加锁的代码

壅塞加锁源码 lock()

java复制代码//壅塞加锁private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return;//这里拿到锁了 } CompletableFuture<RedissonLockEntry> future = subscribe(threadId);//对当前哨程进行订阅 pubSub.timeout(future);//设置订阅超时 RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { // 循环重试获取锁,直至重新获取锁成功才跳出循环 // 此种做法壅塞进程,一贯处于等待锁手动开释或者超时才连续线程 ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); }// get(lockAsync(leaseTime, unit)); }非壅塞加锁

java复制代码//不壅塞加锁,waitTime是最大容忍韶光,这个观点不做过多阐明,便是等待你自选的韶光@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 打算第一次考试测验获取锁后剩余的韶光 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId);//获取失落败 return false; } current = System.currentTimeMillis(); //订阅 CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS);//设置一个最多订阅韶光 } catch (TimeoutException e) { if (!subscribeFuture.completeExceptionally(new RedisTimeoutException( "Unable to acquire subscription lock after " + time + "ms. " + "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);//壅塞,等待 } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);//壅塞,等待 } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); }// return get(tryLockAsync(waitTime, leaseTime, unit)); }订阅

其上述类容中的订阅,都通过下面的方法进行回调,在解锁的时候会发布

java复制代码package org.redisson.pubsub;import org.redisson.RedissonLockEntry;import java.util.concurrent.CompletableFuture;/ LockPubSub类是一个用于锁的发布-订阅实现。
它继续自PublishSubscribe类,用于处理锁的订阅和发布。
锁的订阅者是RedissonLockEntry工具。
当吸收到特定的时,会实行相应的操作。
/public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { // 解锁 public static final Long UNLOCK_MESSAGE = 0L; // 读锁解锁 public static final Long READ_UNLOCK_MESSAGE = 1L; public LockPubSub(PublishSubscribeService service) { super(service); } @Override protected RedissonLockEntry createEntry(CompletableFuture<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(UNLOCK_MESSAGE)) { // 获取等待实行的Runnable工具,并实行 Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } // 开释锁计数器 value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { // 循环实行等待实行的Runnable工具,并实行 while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } // 开释锁计数器,开释所有等待的读锁 value.getLatch().release(value.getLatch().getQueueLength()); } }}
总结

实在这里收成最大的,该当是自旋锁,虽然说在末了利用了发布订阅来完成异步唤醒,但还是有一些缺陷,比如这个锁是否公正,如果说,这里要让你实现公正锁,读者你又打算如何办理?

想到这里,我又想到了Reactor模型,实在我们可以做一个BossGroup来存放一下壅塞线程ID,实在便是一个壅塞行列步队

再用一个WorkerGroup来对每个ThreadID进行处理,当然这里供应的是一个思路,如果要完成的话,相称于是写一个小型中间件,也挺故意思,后面打算试一试,hhh

Redis-Lua快速学习

当编写 Lua 脚本与 Redis 进行交互时,以下是一些常用的 Lua 脚本指南和技巧:

命令调用:利用 redis.call 函数来调用 Redis 命令。
例如,redis.call('GET', 'mykey') 将调用 Redis 的 GET 命令并返回键为 'mykey' 的值。
参数访问:可以利用 KEYS 表来访问通报给 Lua 脚本的键列表,利用 ARGV 表来访问通报给 Lua 脚本的额外参数。
例如,KEYS[1] 表示第一个键,ARGV[1] 表示第一个额外参数。
返回结果:Lua 脚本可以通过利用 return 语句来返回结果。
例如,return redis.call('GET', 'mykey') 将返回键为 'mykey' 的值。
循环和条件:Lua 供应了一些基本的循环和条件语句,例如 for、while、if 等,可以在 Lua 脚本中利用。
容错处理:在编写 Lua 脚本时,可以考虑添加容错处理,例如利用 pcall 函数来捕获 Redis 命令的缺点并进行处理。
事务支持:Redis 的 Lua 脚本支持事务,可以利用 redis.call('MULTI') 开始事务,然后利用 redis.call('EXEC') 实行事务。
在事务中,可以实行多个 Redis 命令,并将其作为一个原子操作进行提交或回滚。
脚本缓存:Redis 可以缓存 Lua 脚本,以提高实行效率。
您可以利用 EVALSHA 命令来实行缓存的脚本。
在 Java RedisTemplate 中,您可以利用 execute 方法的 execute(script, keys, args) 形式来实行缓存的脚本。

这些指南和技巧可帮助您编写更繁芜和灵巧的 Lua 脚本与 Redis 进行交互。
在编写 Lua 脚本时,请参考 Redis 官方文档以及 Lua 官方文档,以理解更多 Lua 编程措辞和 Redis 命令的细节和用法。

当编写 Lua 脚本时,可以利用循环和条件语句来实现逻辑掌握。
以下是一些示例:

利用 for 循环:

lua复制代码for i = 1, 10 do -- 实行操作,例如打印循环变量 print(i)end利用 while 循环:

lua复制代码local i = 1while i <= 10 do -- 实行操作,例如打印循环变量 print(i) i = i + 1end利用 if-else 条件:

lua复制代码local num = 5if num < 0 then print("Number is negative")elseif num == 0 then print("Number is zero")else print("Number is positive")end利用 repeat-until 循环:

lua复制代码local i = 1repeat -- 实行操作,例如打印循环变量 print(i) i = i + 1until i > 10

这些示例展示了在 Lua 脚本中利用循环和条件语句的基本用法。
您可以根据自己的需求和逻辑在 Lua 脚本中编写更繁芜的循环和条件掌握构造。
请把稳,在 Lua 中,条件语句利用 if-elseif-else 构造,而不是像其他编程措辞中的 if-else 构造。
此外,Lua 的索引从 1 开始,而不是从 0 开始,这与一些其他编程措辞有所不同。

请确保根据您的实际需求和逻辑编写精确的循环和条件掌握构造,并根据 Redis 脚本的哀求将其集成到您的 Lua 脚本中。

作者:Karos链接:https://juejin.cn/post/7244820297290645541

标签:

相关文章

IT官方自营,品质保证的智慧选择

随着互联网技术的飞速发展,我国信息技术产业迎来了前所未有的机遇。在这个背景下,IT官方自营逐渐成为广大用户的首选。本文将从多个角度...

网站推广 2024-12-26 阅读0 评论0

IT审计,守护企业数字安全的坚实防线

随着信息技术的飞速发展,企业对信息系统的依赖程度日益加深。在这个数字化时代,信息安全已成为企业发展的关键因素。而IT审计作为一项专...

网站推广 2024-12-26 阅读0 评论0

IT审计硕士,未来数字经济的守护者

随着信息技术的飞速发展,数字经济已成为推动经济增长的重要引擎。在数字化浪潮中,IT审计作为一种新兴的职业,扮演着守护者的重要角色。...

网站推广 2024-12-26 阅读0 评论0

CMS一50,介绍中国医疗健康大数据的领航者

随着科技的飞速发展,大数据时代已经来临。在众多领域,大数据都发挥着至关重要的作用。而在医疗健康领域,大数据更是被赋予了特殊的意义。...

网站推广 2024-12-26 阅读0 评论0