简介
锁是对于资源的生产消费,占用与解除占用来说无法逃避的话题,而分布式的依赖中间件的锁是咱今天要说的。
不同以往的是,咱们今天不老生常谈,原理基础大多博文其实这一块都说明的很好了,咱们今天说说存在的问题和对于问题解决的折中方案。
getSet redis锁
不说那么多,先读下代码,咱们再来分析问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class RedisLock {
private static Logger log = LoggerFactory.getLogger(RedisLock.class);
private final int DEFAULT_SLEEP_MILLIS;
private RedissonClient redisson;
private final String nameSpace;
private final String name;
private Long lockValue;
private long lockExpiresMilliseconds;
public RedisLock(RedissonClient redisson, String nameSpace, String name) { this(redisson, nameSpace, name, 30, TimeUnit.MINUTES, 100); }
public RedisLock(RedissonClient redisson, String nameSpace, String name, long expire, TimeUnit unit, int sleepMillis) { this.redisson = redisson; this.nameSpace = nameSpace; this.name = name; this.lockExpiresMilliseconds = unit.toMillis(expire); this.DEFAULT_SLEEP_MILLIS = sleepMillis; }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { String lockKey = getLockKey();
long deadline = System.currentTimeMillis() + unit.toMillis(time);
while (deadline >= System.currentTimeMillis()) { Long serverTime = getServerTime(); lockValue = serverTime + lockExpiresMilliseconds;
if (redisson.getBucket(lockKey).trySet(lockValue)) { log.info(lockKey + " locked by getSet"); return true; } Long currentValue = (Long) redisson.getBucket(lockKey).get(); if (currentValue != null && currentValue < serverTime) { Long oldValueStr = (Long) redisson.getBucket(lockKey).getAndSet(lockValue); if (oldValueStr != null && oldValueStr.equals(currentValue)) { log.info("locked by getSet"); return true; } } Thread.sleep(DEFAULT_SLEEP_MILLIS); } return false; }
public void unlock() { String lockKey = getLockKey(); Long currValue = (Long) redisson.getBucket(lockKey).get(); if (currValue != null && currValue.equals(lockValue)) { log.info(lockKey + " unlock"); redisson.getBucket(lockKey).delete(); } }
private String getLockKey() { return nameSpace + ":" + name; }
private Long getServerTime() { return System.currentTimeMillis(); } }
|
我们可以发现,这是一个getSet标记时间的redis锁的实现:
这里咱们很容易产生以下的思考:
- expire设置多大呢?
- 过大再应用获取资源后挂掉该资源长期不可用咋办?
- 过小无法cover业务执行时间导致无法保证资源独占怎们办?
- 设置的不大不小万一有个rpc服务间调用因为网络的问题阻塞正好在锁过期后完成,且已有线程抢占获取该锁时,因为没有锁id,释放了其它线程的锁怎么办?
这还只是基于代码层面的思考,我们继续往下想:
- 如果我们用的是主从架构,在同步时另一线程再令一节点抢占是否会获取锁?
- 为了解决该问题我们需要同时指定所有的getSet去master来保证原子性,而本来我们的操作算作是CAS做了不断的重试,在这类业务聚集时,咱们就相当于在对master做ddos
- 那是不是意味着咱们必须做分片?做完分片完事存在主备是否依旧要把getSet打在同一服务上?
上面这块的锁是在阿里云的哨兵集群redis中跑的,锁过期给的30分钟。
setNx redis锁
老规矩,show me the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public final class RedisLock {
private RedisLock() { }
public static boolean lock(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1");
if(status == 1) { redisService.expire(key, expire); return true; }
return false; }
public static void unLock(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); redisService.del(key); }
}
|
这逻辑简单到不想画图,但是这个代码说是问题最大的也不为过,首先
1 2 3 4 5 6 7 8 9 10 11 12
| public static boolean lock(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1"); if(status == 1) { redisService.expire(key, expire); return true; }
return false; }
|
在这里挂掉,这个资源永远锁死,你永远不知道你的docker会啥时候把你的容器秒了,可能是超出了分配内存,可能是滚动更新,你的服务能完整的跑完代码才是一件幸运的事情。
我们改做:
1
| redisService.evel("set " + key + " 1 NX PX " + expire);
|
保证原子性
再者,我们为了解锁的安全, 要添加锁id:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private ThreadLocal<Long> lid = new ThreadLocal<>();
public static boolean lock(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); lid.set(IdUtils.genId());
long status = redisService.evel("set " + key + " " + lid.get() + " NX PX " + expire); return status == lid.get(); } public static void unLock(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class);
long olid = redisService.get(key, "0"); if (olid == lid.get()) { redisService.del(key); } }
|
同样的,我们为了保证解锁的原子性
1
| redisService.evel("if redis.call('get', '" + key + "') == " + lid.get() + " then return redis.call('del', '" + key + "'); end return 0");
|
一个逻辑较为简单、原子性都还行且没有大量getSet存在redis锁就成了,但是仅仅也只是代码上看。
咱们从整体架构上呢? 依然没有解决可用性的问题,如果只依赖单点的redis不存在问题但是可用性不高,如果使用主从则同步会发生问题,必须将读写打在同一台上。
更扯淡和令人难受的是,某些云厂商的托管redis不给你用evel。
redlock方案
redis社区推荐的方案是redlock,为了保证redis锁服务的高可用且资源的隔离,我们部署N个redis单点,这些节点间不存在主从复制,我们确保将在N个实例上使用与单例redis下同样的方法
获取锁和释放锁。
而当获取一个资源的锁时:
- 获取当前时间
- 依次尝试从N个实例下获取锁,而每个单例锁获取都有个超时时间,这个超时时间用于当某个节点挂掉时不至于等待过长的时间来获取该实例的锁,可以快速的跳到下个实例
进行尝试
- 当获取超过半数实例的锁时才会真正的获得资源
- 当没有获取半数实例的锁的情况下(过半数失败)释放以获得所有的锁
而具体的实现方案redisson已经做好了封装,详细可看:
Distributed locks and synchronizers
缺点就是为了获取各实例的锁所消耗的时间,以及你为了买各个实例所花掉的钱。。。。。
zookeeper方案
其实我很喜欢这个方案,zk天生就是做这玩意的料,做个可重入、公平锁简单可靠,而临时节点基于会话,对于应用程序错误导致资源的释放不依赖过期判断。
但是毕竟是引入的中间件,而且其确实有一定的成本和风险(CP系统的可用性),在场景上我们要有些取舍。
利用名称唯一实现独占锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| public class ZookeeperLock implements Lock, Watcher { private ZooKeeper zk; private String root = "/locks"; private String lockName; private String myZnode; private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>();
public ZookeeperLock(String config, String lockName){ this.lockName = lockName; try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } }
public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } if(!tryLock()) { throw new LockException("您的操作太频繁,请稍后再试"); } }
public void lockInterruptibly() throws InterruptedException { this.lock(); }
public boolean tryLock() { try { myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryLock(); }
public void unlock() { try { zk.delete(myZnode, -1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
public Condition newCondition() { return null; }
public void process(WatchedEvent watchedEvent) { }
}
|
整一个公平锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
| public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks"; private String lockName; private String waitNode; private String myZnode; private CountDownLatch latch; private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>();
public DistributedLock(String config, String lockName){ this.lockName = lockName; try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } }
public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } }
public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout); } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } }
public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); List<String> subNodes = zk.getChildren(root, false); List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ return true; } String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; }
public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; }
private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; }
public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
public void lockInterruptibly() throws InterruptedException { this.lock(); }
public Condition newCondition() { return null; }
public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } }
}
|
更多工程上的支持
无论是redis或者zk的DLock实现,对于工程上我们都有更多的需求:
- 可重入锁实现
- 锁扩散策略
- 基于注解和aop的易用性支持等
咱们将在后续慢慢补完