当前位置:网站首页>分布式笔记(05)— 分布式锁之 etcd(分布式锁原理、etcd特点、分布式锁实现方案)
分布式笔记(05)— 分布式锁之 etcd(分布式锁原理、etcd特点、分布式锁实现方案)
2022-07-20 04:23:00 【wohu1104】
1. 分布式锁原理
分布式环境下,多台机器上多个进程对同一个共享资源(数据、文件等)进行操作,如果不做互斥,就有可能出现“余额扣成负数”,或者“商品超卖”的情况。为了解决这个问题,需要分布式锁服务。首先,来看一下分布式锁应该具备哪些条件。
互斥性:在任意时刻,对于同一个锁,只有一个客户端能持有,从而保证一个共享资源同一时间只能被一个客户端操作;
安全性:即不会形成死锁,当一个客户端在持有锁的期间崩溃而没有主动解锁的情况下,其持有的锁也能够被正确释放,并保证后续其它客户端能加锁;
可用性:当提供锁服务的节点发生宕机等不可恢复性故障时,“热备” 节点能够接替故障的节点继续提供服务,并保证自身持有的数据与故障节点一致。
对称性:对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。
2. etcd 关键特点
Etcd
支持的以下机制:Watch
机制、Lease
机制、Revision
机制和 Prefix
机制,正是这些机制赋予了 Etcd
实现分布式锁的能力。
Watch
机制:即监听机制,Watch
机制支持监听某个固定的Key
,也支持监听一个范围(前缀机制),当被监听的Key
或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,可通过Prefix
机制返回的Key-Value
列表获得Revision
比自己小且相差最小的Key
(称为Pre-Key
),对Pre-Key
进行监听,因为只有它释放锁,自己才能获得锁,如果监听到Pre-Key
的DELETE
事件,则说明Pre-Key
已经释放,自己已经持有锁。Lease
机制:即租约机制(TTL
,Time To Live
),Etcd
可以为存储的Key-Value
对设置租约,当租约到期,Key-Value
将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,以避免Key-Value
对过期失效。Lease
机制可以保证分布式锁的安全性,为锁对应的Key
配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。Revision
机制:每个Key
带有一个Revision
号,每进行一次事务便加一,因此它是全局唯一的,如初始值为 0,进行一次put(key, value)
,Key
的Revision
变为 1,同样的操作,再进行一次,Revision
变为 2;换成key1
进行put(key1, value)
操作,Revision
将变为 3;这种机制有一个作用:通过Revision
的大小就可以知道写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,根据Revision
号大小依次获得锁,可以避免 “羊群效应” (也称“惊群效应”),实现公平锁。Prefix
机制:即前缀机制,也称目录机制,例如,一个名为/mylock
的锁,两个争抢它的客户端进行写操作,实际写入的Key
分别为:key1="/mylock/UUID1"
,key2="/mylock/UUID2"
,其中,UUID
表示全局唯一的ID
,确保两个Key
的唯一性。很显然,写操作都会成功,但返回的Revision
不一样,那么,如何判断谁获得了锁呢?通过前缀/mylock
查询,返回包含两个Key-Value
对的Key-Value
列表,同时也包含它们的Revision
,通过Revision
大小,客户端可以判断自己是否获得锁,如果抢锁失败,则等待锁释放(对应的Key
被删除或者租约过期),然后再判断自己是否可以获得锁。
3. 基于 Etcd 的分布式锁方案一
下面描述了使用 Etcd
实现分布式锁的业务流程,假设对某个共享资源设置的锁名为:/lock/mylock
。
步骤1:准备
客户端连接 Etcd
,以 /lock/mylock
为前缀创建全局唯一的 Key
,假设第一个客户端对应的 Key="/lock/mylock/UUID1"
,第二个为 Key="/lock/mylock/UUID2"
;客户端分别为自己的 Key
创建租约 Lease
,租约的长度根据业务耗时确定,假设为 15s。
步骤2:创建定时任务作为租约的“心跳”
在一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,Key
将因租约到期而被删除,从而锁释放,避免死锁。
步骤3:客户端将自己全局唯一的 Key 写入 Etcd
进行 Put
操作,将步骤 1 中创建的 Key
绑定租约写入 Etcd
,根据 Etcd
的 Revision
机制,假设两个客户端 Put
操作返回的 Revision
分别为1、2,客户端需记录 Revision
用以接下来判断自己是否获得锁。
步骤4:客户端判断是否获得锁
客户端以前缀 /lock/mylock
读取 Key-Value
列表(Key-Value
中带有 Key
对应的 Revision
),判断自己 Key
的 Revision
是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision
比自己小的 Key
的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。
步骤5:执行业务
获得锁后,操作共享资源,执行业务代码。
步骤6:释放锁
完成业务流程后,删除对应的 Key
释放锁。
代码实现
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.coreos.jetcd.Client;
import com.coreos.jetcd.KV;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Watch.Watcher;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.GetOption.SortTarget;
import com.coreos.jetcd.options.PutOption;
import com.coreos.jetcd.watch.WatchEvent;
import com.coreos.jetcd.watch.WatchResponse;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.kv.PutResponse;
import java.util.UUID;
/** * Etcd 客户端代码,用多个线程“抢锁”模拟分布式系统中,多个进程“抢锁” * */
public class EtcdClient
{
public static void main(String[] args) throws InterruptedException, ExecutionException,
TimeoutException, ClassNotFoundException
{
// 创建Etcd客户端,Etcd服务端为单机模式
Client client = Client.builder().endpoints("http://localhost:2379").build();
// 对于某共享资源制定的锁名
String lockName = "/lock/mylock";
// 模拟分布式场景下,多个进程“抢锁”
for (int i = 0; i < 3; i++)
{
new MyThread(lockName, client).start();
}
}
/** * 加锁方法,返回值为加锁操作中实际存储于Etcd中的Key,即:lockName+UUID, * 根据返回的Key,可删除存储于Etcd中的键值对,达到释放锁的目的。 * * @param lockName * @param client * @param leaseId * @return */
public static String lock(String lockName, Client client, long leaseId)
{
// lockName作为实际存储在Etcd的中的Key的前缀,后缀是一个全局唯一的ID,从而确保:对于同一个锁,不同进程存储的Key具有相同的前缀,不同的后缀
StringBuffer strBufOfRealKey = new StringBuffer();
strBufOfRealKey.append(lockName);
strBufOfRealKey.append("/");
strBufOfRealKey.append(UUID.randomUUID().toString());
// 加锁操作实际上是一个put操作,每一次put操作都会使revision增加1,因此,对于任何一次操作,这都是唯一的。(get,delete也一样)
// 可以通过revision的大小确定进行抢锁操作的时序,先进行抢锁的,revision较小,后面依次增加。
// 用于记录自己“抢锁”的Revision,初始值为0L
long revisionOfMyself = 0L;
KV kvClient = client.getKVClient();
// lock,尝试加锁,加锁只关注Key,value不为空即可。
// 注意:这里没有考虑可靠性和重试机制,实际应用中应考虑put操作而重试
try
{
PutResponse putResponse = kvClient
.put(ByteSequence.fromString(strBufOfRealKey.toString()),
ByteSequence.fromString("value"),
PutOption.newBuilder().withLeaseId(leaseId).build())
.get(10, TimeUnit.SECONDS);
// 获取自己加锁操作的Revision号
revisionOfMyself = putResponse.getHeader().getRevision();
}
catch (InterruptedException | ExecutionException | TimeoutException e1)
{
System.out.println("[error]: lock operation failed:" + e1);
}
try
{
// lockName作为前缀,取出所有键值对,并且根据Revision进行升序排列,版本号小的在前
List<KeyValue> kvList = kvClient.get(ByteSequence.fromString(lockName),
GetOption.newBuilder().withPrefix(ByteSequence.fromString(lockName))
.withSortField(SortTarget.MOD).build())
.get().getKvs();
// 如果自己的版本号最小,则表明自己持有锁成功,否则进入监听流程,等待锁释放
if (revisionOfMyself == kvList.get(0).getModRevision())
{
System.out.println("[lock]: lock successfully. [revision]:" + revisionOfMyself);
// 加锁成功,返回实际存储于Etcd中的Key
return strBufOfRealKey.toString();
}
else
{
// 记录自己加锁操作的前一个加锁操作的索引,因为只有前一个加锁操作完成并释放,自己才能获得锁
int preIndex = 0;
for (int index = 0; index < kvList.size(); index++)
{
if (kvList.get(index).getModRevision() == revisionOfMyself)
{
preIndex = index - 1;// 前一个加锁操作,故比自己的索引小1
}
}
// 根据索引,获得前一个加锁操作对应的Key
ByteSequence preKeyBS = kvList.get(preIndex).getKey();
// 创建一个Watcher,用于监听前一个Key
Watcher watcher = client.getWatchClient().watch(preKeyBS);
WatchResponse res = null;
// 监听前一个Key,将处于阻塞状态,直到前一个Key发生delete事件
// 需要注意的是,一个Key对应的事件不只有delete,不过,对于分布式锁来说,除了加锁就是释放锁
// 因此,这里只要监听到事件,必然是delete事件或者Key因租约过期而失效删除,结果都是锁被释放
try
{
System.out.println("[lock]: keep waiting until the lock is released.");
res = watcher.listen();
}
catch (InterruptedException e)
{
System.out.println("[error]: failed to listen key.");
}
// 为了便于读者理解,此处写一点冗余代码,判断监听事件是否为DELETE,即释放锁
List<WatchEvent> eventlist = res.getEvents();
for (WatchEvent event : eventlist)
{
// 如果监听到DELETE事件,说明前一个加锁操作完成并已经释放,自己获得锁,返回
if (event.getEventType().toString().equals("DELETE"))
{
System.out.println("[lock]: lock successfully. [revision]:" + revisionOfMyself);
return strBufOfRealKey.toString();
}
}
}
}
catch (InterruptedException | ExecutionException e)
{
System.out.println("[error]: lock operation failed:" + e);
}
return strBufOfRealKey.toString();
}
/** * 释放锁方法,本质上就是删除实际存储于Etcd中的Key * * @param lockName * @param client */
public static void unLock(String realLockName, Client client)
{
try
{
client.getKVClient().delete(ByteSequence.fromString(realLockName)).get(10,
TimeUnit.SECONDS);
System.out.println("[unLock]: unlock successfully.[lockName]:" + realLockName);
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
System.out.println("[error]: unlock failed:" + e);
}
}
/** * 自定义一个线程类,模拟分布式场景下多个进程 "抢锁" */
public static class MyThread extends Thread
{
private String lockName;
private Client client;
MyThread(String lockName, Client client)
{
this.client = client;
this.lockName = lockName;
}
@Override
public void run()
{
// 创建一个租约,有效期15s
Lease leaseClient = client.getLeaseClient();
Long leaseId = null;
try
{
leaseId = leaseClient.grant(15).get(10, TimeUnit.SECONDS).getID();
}
catch (InterruptedException | ExecutionException | TimeoutException e1)
{
System.out.println("[error]: create lease failed:" + e1);
return;
}
// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
// 同时,一旦客户端发生故障,心跳便会中断,锁也会应租约过期而被动释放,避免死锁
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
// 续约心跳为12s,仅作为举例
service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), 1, 12, TimeUnit.SECONDS);
// 1. try to lock
String realLoclName = lock(lockName, client, leaseId);
// 2. to do something
try
{
Thread.sleep(6000);
}
catch (InterruptedException e2)
{
System.out.println("[error]:" + e2);
}
// 3. unlock
service.shutdown();// 关闭续约的定时任务
unLock(realLoclName, client);
}
}
/** * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的Key-Value不会失效 * */
public static class KeepAliveTask implements Runnable
{
private Lease leaseClient;
private long leaseId;
KeepAliveTask(Lease leaseClient, long leaseId)
{
this.leaseClient = leaseClient;
this.leaseId = leaseId;
}
@Override
public void run()
{
leaseClient.keepAliveOnce(leaseId);
}
}
}
输出结果
[lock]: lock successfully. [revision]:44
[lock]: keep waiting until the lock is released.
[lock]: keep waiting until the lock is released.
[unLock]: unlock successfully.
[lock]: lock successfully. [revision]:45
[unLock]: unlock successfully.
[lock]: lock successfully. [revision]:46
[unLock]: unlock successfully.
4. 基于 Etcd 的分布式锁方案二
Etcd
最新版本提供了支持分布式锁的基础接口,其本质就是将上节介绍的实现细节进行了封装。单从使用的角度来看,这是非常有益的,大大降低了分布式锁的实现难度。但与此同时,简化的接口也无形中为用户理解内部原理设置了屏障。
我们先用 Etcd
官方提供的 Go
语言客户端(etcdctl
)验证一下分布式锁的实现原理。解压官方提供的 Etcd
安装包,里面有两个可执行文件:etcd
和 etcdctl
,其中 etcd
是服务端,etcdctl
是客户端。在服务端启动的前提下,执行以下命令验证分布式锁原理。
(1)分别开启两个窗口,进入 etcdctl
所在目录,执行以下命令,显式指定 API
版本为 V3
,老版本 V2
不支持分布式锁接口。
export ETCDCTL_API=3
(2)分别在两个窗口执行相同的加锁命令:
./etcdctl.exe lock mylock
(3)可以观察到,只有一个加锁成功,并返回了实际存储与 Etcd 中 Key 值:
$ ./etcdctl.exe lock mylock
mylock/694d65eb367c7ec4
(4)在加锁成功的窗口执行命令:Ctrl+c
,释放锁;与此同时,另一个窗口加锁成功,如下所示:
$ ./etcdctl.exe lock mylock
mylock/694d65eb367c7ec8
很明显,同样的锁名 mylock
,两个客户端分别进行加锁操作,实际存储于 Etcd
中的 Key
并不是 mylock
,而是以 mylock
为前缀,分别加了一个全局唯一的 ID
。是不是和 “路线一” 中介绍得原理一致呢。
分布式锁代码实现
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.coreos.jetcd.Client;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Lock;
import com.coreos.jetcd.data.ByteSequence;
public class DistributedLock
{
private static DistributedLock lockProvider = null;
private static Object mutex = new Object();
private Client client;
private Lock lockClient;
private Lease leaseClient;
private DistributedLock()
{
super();
// 创建Etcd客户端,本例中Etcd集群只有一个节点
this.client = Client.builder().endpoints("http://localhost:2379").build();
this.lockClient = client.getLockClient();
this.leaseClient = client.getLeaseClient();
}
public static DistributedLock getInstance()
{
synchronized (mutex)
{
if (null == lockProvider)
{
lockProvider = new DistributedLock();
}
}
return lockProvider;
}
/** * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。 * * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名 * @param TTL : Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁 * @return LockResult */
public LockResult lock(String lockName, long TTL)
{
LockResult lockResult = new LockResult();
/*1.准备阶段*/
// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
// 初始化返回值lockResult
lockResult.setIsLockSuccess(false);
lockResult.setService(service);
// 记录租约ID,初始值设为 0L
Long leaseId = 0L;
/*2.创建租约*/
// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定。
try
{
leaseId = leaseClient.grant(TTL).get().getID();
lockResult.setLeaseId(leaseId);
// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定。
long period = TTL - TTL / 5;
service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period,
TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException e)
{
System.out.println("[error]: Create lease failed:" + e);
return lockResult;
}
System.out.println("[lock]: start to lock." + Thread.currentThread().getName());
/*3.加锁操作*/
// 执行加锁操作,并为锁对应的Key绑定租约
try
{
lockClient.lock(ByteSequence.fromString(lockName), leaseId).get();
}
catch (InterruptedException | ExecutionException e1)
{
System.out.println("[error]: lock failed:" + e1);
return lockResult;
}
System.out.println("[lock]: lock successfully." + Thread.currentThread().getName());
lockResult.setIsLockSuccess(true);
return lockResult;
}
/** * 解锁操作,释放锁、关闭定时任务、解除租约 * * @param lockName:锁名 * @param lockResult:加锁操作返回的结果 */
public void unLock(String lockName, LockResult lockResult)
{
System.out.println("[unlock]: start to unlock." + Thread.currentThread().getName());
try
{
// 释放锁
lockClient.unlock(ByteSequence.fromString(lockName)).get();
// 关闭定时任务
lockResult.getService().shutdown();
// 删除租约
if (lockResult.getLeaseId() != 0L)
{
leaseClient.revoke(lockResult.getLeaseId());
}
}
catch (InterruptedException | ExecutionException e)
{
System.out.println("[error]: unlock failed: " + e);
}
System.out.println("[unlock]: unlock successfully." + Thread.currentThread().getName());
}
/** * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效 * */
public static class KeepAliveTask implements Runnable
{
private Lease leaseClient;
private long leaseId;
KeepAliveTask(Lease leaseClient, long leaseId)
{
this.leaseClient = leaseClient;
this.leaseId = leaseId;
}
@Override
public void run()
{
// 续约一次
leaseClient.keepAliveOnce(leaseId);
}
}
/** * 该class用于描述加锁的结果,同时携带解锁操作所需参数 * */
public static class LockResult
{
private boolean isLockSuccess;
private long leaseId;
private ScheduledExecutorService service;
LockResult()
{
super();
}
public void setIsLockSuccess(boolean isLockSuccess)
{
this.isLockSuccess = isLockSuccess;
}
public void setLeaseId(long leaseId)
{
this.leaseId = leaseId;
}
public void setService(ScheduledExecutorService service)
{
this.service = service;
}
public boolean getIsLockSuccess()
{
return this.isLockSuccess;
}
public long getLeaseId()
{
return this.leaseId;
}
public ScheduledExecutorService getService()
{
return this.service;
}
}
}
测试代码
public class DistributedLockTest
{
public static void main(String[] args)
{
// 模拟分布式场景下,多个进程 “抢锁”
for (int i = 0; i < 5; i++)
{
new MyThread().start();
}
}
public static class MyThread extends Thread
{
@Override
public void run()
{
String lockName = "/lock/mylock";
// 1. 加锁
LockResult lockResult = DistributedLock.getInstance().lock(lockName, 30);
// 2. 执行业务
if (lockResult.getIsLockSuccess())
{
// 获得锁后,执行业务,用sleep方法模拟.
try
{
Thread.sleep(10000);
}
catch (InterruptedException e)
{
System.out.println("[error]:" + e);
}
}
// 3. 解锁
DistributedLock.getInstance().unLock(lockName, lockResult);
}
}
}
输出结果:
[lock]: start to lock.Thread-4
[lock]: start to lock.Thread-3
[lock]: start to lock.Thread-1
[lock]: start to lock.Thread-0
[lock]: start to lock.Thread-2
[lock]: lock successfully.Thread-3
[unlock]: start to unlock.Thread-3
[unlock]: unlock successfully.Thread-3
[lock]: lock successfully.Thread-2
[unlock]: start to unlock.Thread-2
[unlock]: unlock successfully.Thread-2
[lock]: lock successfully.Thread-1
[unlock]: start to unlock.Thread-1
[unlock]: unlock successfully.Thread-1
[lock]: lock successfully.Thread-0
[unlock]: start to unlock.Thread-0
[unlock]: unlock successfully.Thread-0
[lock]: lock successfully.Thread-4
[unlock]: start to unlock.Thread-4
[unlock]: unlock successfully.Thread-4
参考:
https://download.csdn.net/columnTopic/5b8760d3ea7bc512755e5fad
边栏推荐
- Please stop using simpledateformat to format time. Datetimeformatter is better!
- 【MUDUO 日志系统1】Logger输出
- MySQL field values contain_ And \ escape characters, how to eliminate the influence when comparing the values of two tables?
- apple 为什么要改 objc
- Relationship between accuracy, recall and confidence
- A detailed explanation of the implementation principle of go Distributed Link Tracking
- RGB dot matrix display based on esp32
- 数据可视化之堆叠柱状图带你了解宠物数据
- Inserting data in stored procedures and for loops
- Returns the quarter and year of the date provided
猜你喜欢
基于ESP32的RGB点阵显示器
excel怎么选取特定数字求和?excel选中特定数字求和的方法
EF Core学习笔记:额外的外键属性 / 单项导航属性
从0到1建设智能灰度数据体系:以vivo游戏中心为例
Solve gradient explosion and gradient disappearance
Exploration and practice of dewu app data simulation platform
I summarize some experience of the whole R & D process
Explain IOU, giou, Diou, ciou, eiou and Diou NMS in detail
cnvd_2019_22238
Advanced one stage of target detection
随机推荐
怎么 111
Word文档怎么删除页眉页脚和横线
买量洞察与渠道评估,助力营销决策优化
一文详解|Go 分布式链路追踪实现原理
jdbc error code
HMS Core音频编辑服务支持7种音频特效,助力一站式音频处理
typora测试版过期无法正常使用
EF Core学习笔记:额外的外键属性 / 单项导航属性
excel怎么选取特定数字求和?excel选中特定数字求和的方法
Explain IOU, giou, Diou, ciou, eiou and Diou NMS in detail
爱学啊的博客-人生苦短,只做好课!
Structure completion (flexible array)
Idea activation
返回日期对应的一年中的第几周
Get the way to optimize the one-stop worktable of customer service
MySQL 字段值里包含_和\转义符,如何在比对两个表的值时消除影响?
Redis distributed lock + thread pool
添加给定的工作日数后计算日期
Relationship between accuracy, recall and confidence
MySQL field values contain_ And \ escape characters, how to eliminate the influence when comparing the values of two tables?