getSet redis锁
| public class RedisLock {
private static Logger log = LoggerFactory.getLogger(RedisLock.class);
private final int DEFAULT_SLEEP_MILLIS;
private RedissonClient redisson;
private final String nameSpace;
private final String name;
private Long lockValue;
private long lockExpiresMilliseconds;
public RedisLock(RedissonClient redisson, String nameSpace, String name) { this(redisson, nameSpace, name, 30, TimeUnit.MINUTES, 100); }
public RedisLock(RedissonClient redisson, String nameSpace, String name, long expire, TimeUnit unit, int sleepMillis) { this.redisson = redisson; this.nameSpace = nameSpace; this.name = name; this.lockExpiresMilliseconds = unit.toMillis(expire); this.DEFAULT_SLEEP_MILLIS = sleepMillis; }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { String lockKey = getLockKey();
long deadline = System.currentTimeMillis() + unit.toMillis(time);
while (deadline >= System.currentTimeMillis()) { Long serverTime = getServerTime(); lockValue = serverTime + lockExpiresMilliseconds;
if (redisson.getBucket(lockKey).trySet(lockValue)) { log.info(lockKey + " locked by getSet"); return true; } Long currentValue = (Long) redisson.getBucket(lockKey).get(); if (currentValue != null && currentValue < serverTime) { Long oldValueStr = (Long) redisson.getBucket(lockKey).getAndSet(lockValue); if (oldValueStr != null && oldValueStr.equals(currentValue)) { log.info("locked by getSet"); return true; } } Thread.sleep(DEFAULT_SLEEP_MILLIS); } return false; }
public void unlock() { String lockKey = getLockKey(); Long currValue = (Long) redisson.getBucket(lockKey).get(); if (currValue != null && currValue.equals(lockValue)) { log.info(lockKey + " unlock"); redisson.getBucket(lockKey).delete(); } }
private String getLockKey() { return nameSpace + ":" + name; }
private Long getServerTime() { return System.currentTimeMillis(); } }
- expire设置多大呢?
- 过大再应用获取资源后挂掉该资源长期不可用咋办?
- 过小无法cover业务执行时间导致无法保证资源独占怎们办?
- 设置的不大不小万一有个rpc服务间调用因为网络的问题阻塞正好在锁过期后完成,且已有线程抢占获取该锁时,因为没有锁id,释放了其它线程的锁怎么办?
- 如果我们用的是主从架构,在同步时另一线程再令一节点抢占是否会获取锁?
- 为了解决该问题我们需要同时指定所有的getSet去master来保证原子性,而本来我们的操作算作是CAS做了不断的重试,在这类业务聚集时,咱们就相当于在对master做ddos
- 那是不是意味着咱们必须做分片?做完分片完事存在主备是否依旧要把getSet打在同一服务上?
setNx redis锁
老规矩,show me the code:
| public final class RedisLock {
private RedisLock() { }
public static boolean lock(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1");
if(status == 1) { redisService.expire(key, expire); return true; }
return false; }
public static void unLock(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); redisService.del(key); }
| public static boolean lock(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1"); if(status == 1) { redisService.expire(key, expire); return true; }
return false; }
| redisService.evel("set " + key + " 1 NX PX " + expire);
再者,我们为了解锁的安全, 要添加锁id:
| private ThreadLocal<Long> lid = new ThreadLocal<>();
public static boolean lock(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); lid.set(IdUtils.genId());
long status = redisService.evel("set " + key + " " + lid.get() + " NX PX " + expire); return status == lid.get(); } public static void unLock(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class);
long olid = redisService.get(key, "0"); if (olid == lid.get()) { redisService.del(key); } }
| redisService.evel("if redis.call('get', '" + key + "') == " + lid.get() + " then return redis.call('del', '" + key + "'); end return 0");
咱们从整体架构上呢? 依然没有解决可用性的问题,如果只依赖单点的redis不存在问题但是可用性不高,如果使用主从则同步会发生问题,必须将读写打在同一台上。
- 获取当前时间
- 依次尝试从N个实例下获取锁,而每个单例锁获取都有个超时时间,这个超时时间用于当某个节点挂掉时不至于等待过长的时间来获取该实例的锁,可以快速的跳到下个实例
- 当获取超过半数实例的锁时才会真正的获得资源
- 当没有获取半数实例的锁的情况下(过半数失败)释放以获得所有的锁
Distributed locks and synchronizers
| public class ZookeeperLock implements Lock, Watcher { private ZooKeeper zk; private String root = "/locks"; private String lockName; private String myZnode; private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>();
public ZookeeperLock(String config, String lockName){ this.lockName = lockName; try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } }
public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } if(!tryLock()) { throw new LockException("您的操作太频繁,请稍后再试"); } }
public void lockInterruptibly() throws InterruptedException { this.lock(); }
public boolean tryLock() { try { myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryLock(); }
public void unlock() { try { zk.delete(myZnode, -1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
public Condition newCondition() { return null; }
public void process(WatchedEvent watchedEvent) { }
| public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks"; private String lockName; private String waitNode; private String myZnode; private CountDownLatch latch; private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>();
public DistributedLock(String config, String lockName){ this.lockName = lockName; try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } }
public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } }
public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout); } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } }
public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); List<String> subNodes = zk.getChildren(root, false); List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ return true; } String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; }
public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; }
private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; }
public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
public void lockInterruptibly() throws InterruptedException { this.lock(); }
public Condition newCondition() { return null; }
public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } }
- 可重入锁实现
- 锁扩散策略
- 基于注解和aop的易用性支持等