更新時間:2022-02-15 10:18:51 來源:動力節點 瀏覽2559次
Redis 單節點實現了分布式鎖。如果你通過 Sentinel 有高可用,如果主會話發生變化,如果某些原因發生變化,就會發生鎖丟失。
(1)客戶端 1 獲得 Redis 的 Master 節點上的鎖。
(2)Master宕機,存儲鎖的鑰匙還沒有到Slave。
(3)Master故障,故障轉移,SLAVE節點升級為Master節點。
(4)Client 2 從新的 master 獲取相同資源對應的鎖。
這樣,客戶端1和客戶端2同時持有相同資源的鎖。鎖的安全性被破壞了。對于這個問題。Redis 作者 Antirez 提出了 Redlock 算法來解決這個問題。
當不同進程需要互斥訪問共享資源時,分布式鎖是一種非常有用的技術手段。實現高效分布式鎖的三個屬性需要考慮:
(1)安全屬性:互斥,無論何時,只有一個客戶端持有鎖。
(2)效率屬性 A:不會死。
(3)效率屬性B:容錯性,只要大部分Redis節點正常工作,客戶端都可以獲取和釋放鎖。
在算法的分布式版本中,我們假設我們有 N 個完全獨立的 Redis Master 節點,我們沒有任何副本或其他隱式分布式協調算法。我們已經描述了如何在單節點環境中安全地獲取和釋放鎖。因此,我們自然應該使用這種方法來獲取和釋放每個單個節點中的鎖。在我們的示例中,我們將 N 設置為 5,這個數字是一個比較合理的值,所以我們需要在不同的計算機或虛擬機上運行 5 個 Master 節點,以確保它們不會同時出現。機器。客戶端需要執行以下操作才能獲得鎖:
(1)獲取當前時間(單位為毫秒)。
(2)turnt用于n個節點上相同的key和隨機值。在這一步中,當客戶端請求每個master上的鎖時,會有一個比總鎖釋放時間小的超時。時間。比如鎖自動釋放時間為10秒,每個節點鎖請求的超時時間可能在5-50毫秒的范圍內,這樣可以防止客戶端在一個過期的Master節點上阻塞太久,如果一個Master節點是不可用,我們應該盡快嘗試下一個Master節點。
(3)客戶端計算第二步花費鎖的時間,只有當客戶端成功獲取鎖(這里是3),并且總時間消耗不超過鎖釋放時間,這個鎖才算成功.
(4)如果鎖獲取成功,鎖自動釋放時間是在鎖消耗之前花費初始鎖釋放時間的時間。
(5)如果鎖獲取失敗,無論是因為鎖成功不超過一半(N/2+1)還是因為總耗時時間超過鎖釋放時間,客戶端都會釋放每個Master節點上的鎖,即使是那些相信沒有成功鎖定的人。
Redisson 包已經封裝了 RedLock 算法,該算法需要使用 Redisson 包查看分布式鎖的正確姿勢。
<!-- JDK 1.8 + 兼容 -->
<依賴>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<版本> 3.9。0 </版本>
</依賴>
<!-- JDK 1.6 + 兼容 -->
<依賴>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<版本> 2.14。0 </版本>
</依賴>
Redisson管理類:
導入 org.redisson.Redisson;
導入 org.redisson.api.RAtomicLong;
導入 org.redisson.api.RedissonClient;
導入 org.redisson.config.Config;
公共 類RedissonManager {
私有 靜態配置配置 =新配置();
私有 靜態RedissonClient redisson = null ;
private static final String RAtomicName = " genId_ " ;
公共 靜態 無效初始化(){
嘗試{
config.useClusterServers()
.setScanInterval( 200000 ) //設置集群狀態掃描間隔
.setMasterConnectionPoolSize( 10000 ) //設置Master節點的連接池最大連接數為10000
.setSlaveConnectionPoolSize( 10000 ) //設置Master節點的最大連接數SLAVE 節點的連接池為 500
.setIdleConnectionTimeout( 10000 ) //如果當前連接池中的連接數超過了最小空閑連接,同時還有一個連接空閑時間超過了該值,那么這些連接將被自動關閉并從連接池中移除。時間單位是毫秒。
.setConnectTimeout( 30000 ) //等待與任意節點建立連接。時間單位是毫秒。
.setTimeout( 3000 ) //等待節點回復命令。該時間從命令發送成功時開始。
.setRetryInterval( 3000 ) //當與節點斷開連接時,等待時間間隔重新建立連接。時間單位是毫秒。
.addNodeAddress( " redis://127.0.0.1:7000 " , " redis://127.0.0.1:7001 " , " redis://127.0.0.1:7002 " , " redis://127.0.0.1:7003 " , " redis://127.0.0.1:7004 " , " redis://127.0.0.1:7005 " );
redisson = Redisson.create(config);
RAtomicLong atomicLong = redisson.getAtomicLong (RAtomicName);
原子長。設置(0);//自增設置為從 0 開始
} catch (Exception e){
e.printStackTrace();
}
}
公共 靜態RedissonClient getRedisson(){
if (redisson == null ){
RedissonManager.init(); //初始化
}
return redisson;
}
我們已經配置了很多參數,其實有十個參數,我們只是設置了幾個比較重要的。
getredisson 方法是用戶初始化 Redisson。
NEXTID 方法 返回 RatomicName 變量的總次數,也就是我成功使用分布式鎖的次數。
分布式鎖定操作類:
導入 com.config.RedissonManager;
導入 org.redisson.api.RLock;
導入 org.redisson.api.RedissonClient;
導入 org.slf4j.Logger;
導入 org.slf4j.LoggerFactory;
導入 org.springframework.stereotype.Component;
導入 java.util.concurrent.TimeUnit;
@零件
public class RedissonLock {
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonLock.class ) ;
私有 靜態RedissonClient redissonClient = RedissonManager.getRedisson();
公共 無效 鎖(字符串鎖名){
字符串鍵=鎖名;
RLock myLock = redissonClient.getLock(key);
// Lock 提供 Timeout 參數,Timeout 結束強制解鎖防止死鎖
myLock. 鎖定(2 ,TimeUnit.SECONDS);
// 1. 最常用的使用方法
// lock.lock();
// 2. 支持過期解鎖功能,10秒后自動解鎖,無需調用UNLOCK方法手動解鎖
// lock.lock(10, TimeUnit.SECONDS);
// 3. 嘗試加鎖,等待3秒,加鎖后10秒自動解鎖
// try {
// boolean res = mylock.tryLock(3, 10, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.err.println( " ======lock====== " + Thread.currentThread().getName());
}
公共 無效解鎖(字符串鎖名){
字符串鍵=鎖名;
RLock myLock = redissonClient.getLock(key);
myLock.unlock();
System.err.println( " ======解鎖====== " + Thread.currentThread().getName());
}
}
LOCK方法是鎖定操作,UNLOCK方法是解鎖。如果您想了解更多相關知識,可以關注一下動力節點的Java在線學習,里面的課程從入門到精通,通俗易懂,適合小白學習,希望對大家能夠有所幫助。
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習