细谈分布式锁

简介

锁是对于资源的生产消费,占用与解除占用来说无法逃避的话题,而分布式的依赖中间件的锁是咱今天要说的。
不同以往的是,咱们今天不老生常谈,原理基础大多博文其实这一块都说明的很好了,咱们今天说说存在的问题和对于问题解决的折中方案。

getSet redis锁

不说那么多,先读下代码,咱们再来分析问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class RedisLock {

private static Logger log = LoggerFactory.getLogger(RedisLock.class);

private final int DEFAULT_SLEEP_MILLIS;

private RedissonClient redisson;

private final String nameSpace;

private final String name;

private Long lockValue;

/**
* 锁失效时间(毫秒)
*/
private long lockExpiresMilliseconds;

public RedisLock(RedissonClient redisson, String nameSpace, String name) {
//默认30分钟
this(redisson, nameSpace, name, 30, TimeUnit.MINUTES, 100);
}

public RedisLock(RedissonClient redisson, String nameSpace, String name, long expire, TimeUnit unit, int sleepMillis) {
this.redisson = redisson;
this.nameSpace = nameSpace;
this.name = name;
this.lockExpiresMilliseconds = unit.toMillis(expire);
this.DEFAULT_SLEEP_MILLIS = sleepMillis;
}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
String lockKey = getLockKey();

long deadline = System.currentTimeMillis() + unit.toMillis(time);

while (deadline >= System.currentTimeMillis()) {
Long serverTime = getServerTime();
lockValue = serverTime + lockExpiresMilliseconds;

if (redisson.getBucket(lockKey).trySet(lockValue)) {
log.info(lockKey + " locked by getSet");
return true;
}
Long currentValue = (Long) redisson.getBucket(lockKey).get();
//判断锁是否失效
if (currentValue != null && currentValue < serverTime) {
Long oldValueStr = (Long) redisson.getBucket(lockKey).getAndSet(lockValue);
if (oldValueStr != null && oldValueStr.equals(currentValue)) {
log.info("locked by getSet");
return true;
}
}
Thread.sleep(DEFAULT_SLEEP_MILLIS);
}
return false;
}

public void unlock() {
String lockKey = getLockKey();
Long currValue = (Long) redisson.getBucket(lockKey).get();
if (currValue != null && currValue.equals(lockValue)) {
log.info(lockKey + " unlock");
redisson.getBucket(lockKey).delete();
}
}

/**
* @return
*/
private String getLockKey() {
return nameSpace + ":" + name;
}

/**
* 获取服务器时间
*
* @return
*/
private Long getServerTime() {
return System.currentTimeMillis();
}
}

我们可以发现,这是一个getSet标记时间的redis锁的实现:
getSet lock

这里咱们很容易产生以下的思考:

  • expire设置多大呢?
  • 过大再应用获取资源后挂掉该资源长期不可用咋办?
  • 过小无法cover业务执行时间导致无法保证资源独占怎们办?
  • 设置的不大不小万一有个rpc服务间调用因为网络的问题阻塞正好在锁过期后完成,且已有线程抢占获取该锁时,因为没有锁id,释放了其它线程的锁怎么办?

这还只是基于代码层面的思考,我们继续往下想:

  • 如果我们用的是主从架构,在同步时另一线程再令一节点抢占是否会获取锁?
  • 为了解决该问题我们需要同时指定所有的getSet去master来保证原子性,而本来我们的操作算作是CAS做了不断的重试,在这类业务聚集时,咱们就相当于在对master做ddos
  • 那是不是意味着咱们必须做分片?做完分片完事存在主备是否依旧要把getSet打在同一服务上?

上面这块的锁是在阿里云的哨兵集群redis中跑的,锁过期给的30分钟。

setNx redis锁

老规矩,show me the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final class RedisLock {

private RedisLock() { }

/**
* 加锁
* @param key redis key
* @param expire 过期时间,单位秒
* @return true:加锁成功,false,加锁失败
*/
public static boolean lock(String key, int expire) {

RedisService redisService = SpringUtils.getBean(RedisService.class);
long status = redisService.setnx(key, "1");

if(status == 1) {
redisService.expire(key, expire);
return true;
}

return false;
}

public static void unLock(String key) {
RedisService redisService = SpringUtils.getBean(RedisService.class);
redisService.del(key);
}

}

这逻辑简单到不想画图,但是这个代码说是问题最大的也不为过,首先

1
2
3
4
5
6
7
8
9
10
11
12
public static boolean lock(String key, int expire) {

RedisService redisService = SpringUtils.getBean(RedisService.class);
long status = redisService.setnx(key, "1");
// <-------------------------------在这停顿
if(status == 1) {
redisService.expire(key, expire);
return true;
}

return false;
}

在这里挂掉,这个资源永远锁死,你永远不知道你的docker会啥时候把你的容器秒了,可能是超出了分配内存,可能是滚动更新,你的服务能完整的跑完代码才是一件幸运的事情。

我们改做:

1
redisService.evel("set " + key + " 1 NX PX " + expire);

保证原子性

再者,我们为了解锁的安全, 要添加锁id:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ThreadLocal<Long> lid = new ThreadLocal<>();

public static boolean lock(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class);
lid.set(IdUtils.genId());

long status = redisService.evel("set " + key + " " + lid.get() + " NX PX " + expire);
return status == lid.get();
}
public static void unLock(String key) {
RedisService redisService = SpringUtils.getBean(RedisService.class);

long olid = redisService.get(key, "0");
// <------------------------------- 在这停顿
if (olid == lid.get()) {
redisService.del(key);
}
}

同样的,我们为了保证解锁的原子性

1
redisService.evel("if redis.call('get', '" + key + "') == " + lid.get() + " then return redis.call('del', '" + key + "'); end return 0");

一个逻辑较为简单、原子性都还行且没有大量getSet存在redis锁就成了,但是仅仅也只是代码上看。

咱们从整体架构上呢? 依然没有解决可用性的问题,如果只依赖单点的redis不存在问题但是可用性不高,如果使用主从则同步会发生问题,必须将读写打在同一台上。

更扯淡和令人难受的是,某些云厂商的托管redis不给你用evel。

redlock方案

redis社区推荐的方案是redlock,为了保证redis锁服务的高可用且资源的隔离,我们部署N个redis单点,这些节点间不存在主从复制,我们确保将在N个实例上使用与单例redis下同样的方法
获取锁和释放锁。

而当获取一个资源的锁时:

  • 获取当前时间
  • 依次尝试从N个实例下获取锁,而每个单例锁获取都有个超时时间,这个超时时间用于当某个节点挂掉时不至于等待过长的时间来获取该实例的锁,可以快速的跳到下个实例
    进行尝试
  • 当获取超过半数实例的锁时才会真正的获得资源
  • 当没有获取半数实例的锁的情况下(过半数失败)释放以获得所有的锁

而具体的实现方案redisson已经做好了封装,详细可看:
Distributed locks and synchronizers

缺点就是为了获取各实例的锁所消耗的时间,以及你为了买各个实例所花掉的钱。。。。。

zookeeper方案

其实我很喜欢这个方案,zk天生就是做这玩意的料,做个可重入、公平锁简单可靠,而临时节点基于会话,对于应用程序错误导致资源的释放不依赖过期判断。

但是毕竟是引入的中间件,而且其确实有一定的成本和风险(CP系统的可用性),在场景上我们要有些取舍。

利用名称唯一实现独占锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public class ZookeeperLock implements Lock, Watcher {
private ZooKeeper zk;
private String root = "/locks";//根
private String lockName;//竞争资源的标志
private String myZnode;//当前锁
private int sessionTimeout = 30000;
private List<Exception> exception = new ArrayList<Exception>();

/**
* 创建分布式锁,使用前请确认config配置的zookeeper服务可用
* @param config 127.0.0.1:2181
* @param lockName 竞争资源标志,lockName中不能包含单词lock
*/
public ZookeeperLock(String config, String lockName){
this.lockName = lockName;
// 创建一个与服务器的连接
try {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
// 创建根节点
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
exception.add(e);
} catch (KeeperException e) {
exception.add(e);
} catch (InterruptedException e) {
exception.add(e);
}
}

public void lock() {
if(exception.size() > 0){
throw new LockException(exception.get(0));
}
if(!tryLock()) {
throw new LockException("您的操作太频繁,请稍后再试");
}
}

public void lockInterruptibly() throws InterruptedException {
this.lock();
}

public boolean tryLock() {
try {
myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock();
}

public void unlock() {
try {
zk.delete(myZnode, -1);
myZnode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

public Condition newCondition() {
return null;
}

public void process(WatchedEvent watchedEvent) {
//
}

}

整一个公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public class DistributedLock implements Lock, Watcher{
private ZooKeeper zk;
private String root = "/locks";//根
private String lockName;//竞争资源的标志
private String waitNode;//等待前一个锁
private String myZnode;//当前锁
private CountDownLatch latch;//计数器
private int sessionTimeout = 30000;
private List<Exception> exception = new ArrayList<Exception>();

/**
* 创建分布式锁,使用前请确认config配置的zookeeper服务可用
* @param config 127.0.0.1:2181
* @param lockName 竞争资源标志,lockName中不能包含单词lock
*/
public DistributedLock(String config, String lockName){
this.lockName = lockName;
// 创建一个与服务器的连接
try {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
// 创建根节点
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
exception.add(e);
} catch (KeeperException e) {
exception.add(e);
} catch (InterruptedException e) {
exception.add(e);
}
}

/**
* zookeeper节点的监视器
*/
public void process(WatchedEvent event) {
if(this.latch != null) {
this.latch.countDown();
}
}

public void lock() {
if(exception.size() > 0){
throw new LockException(exception.get(0));
}
try {
if(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
return;
}
else{
waitForLock(waitNode, sessionTimeout);//等待锁
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public boolean tryLock() {
try {
String splitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName can not contains \\u000B");
//创建临时子节点
myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " is created ");
//取出所有子节点
List<String> subNodes = zk.getChildren(root, false);
//取出所有lockName的锁
List<String> lockObjNodes = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}

public boolean tryLock(long time, TimeUnit unit) {
try {
if(this.tryLock()){
return true;
}
return waitForLock(waitNode,time);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}

public void unlock() {
try {
System.out.println("unlock " + myZnode);
zk.delete(myZnode,-1);
myZnode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

public void lockInterruptibly() throws InterruptedException {
this.lock();
}

public Condition newCondition() {
return null;
}

public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}

}

更多工程上的支持

无论是redis或者zk的DLock实现,对于工程上我们都有更多的需求:

  • 可重入锁实现
  • 锁扩散策略
  • 基于注解和aop的易用性支持等

咱们将在后续慢慢补完