package com.sqi;
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams;
import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException;
/** * @author 麒 * @version 1.0 * @since 2019-03-22 */ public class RedisLockUtil { private static final String LOCK_SUCCESS = "OK"; private JedisPool jedisPool; private final Map<String, Boolean> DAEMON_MAP = new ConcurrentHashMap<String, Boolean>(); private static final String REENTRANT_LOCK_SCRIPT = "local value = redis.call('get',KEYS[1]) " + "if value then " + " if value == ARGV[1] then " + " return redis.call('pexpire', KEYS[1], ARGV[2]) " + " else " + " return 0 " + " end " + "else " + " redis.call('set',KEYS[1],ARGV[1],'NX','PX',ARGV[2]) " + " return 1 " + "end"; private static final String WAIT_LOCK_SCRIPT = "local value = redis.call('get',KEYS[1]) " + "if value then " + " if value == ARGV[1] then " + " return redis.call('pexpire', KEYS[1], ARGV[2]) " + " else " + " return redis.call('pttl', KEYS[1]) " + " end " + "else " + " redis.call('set',KEYS[1],ARGV[1],'NX','PX',ARGV[2]) " + " return 1 " + "end"; private static final String REFRESH_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('pexpire', KEYS[1],ARGV[2]) " + "else " + " return 0 " + "end"; private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end";
public RedisLockUtil(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
protected Jedis getJedis() {
return jedisPool.getResource();
}
/\*\*
\* 获取redis分布式锁,只要锁存在就返回false
\*
\* @param lockKey 锁的名字
\* @param lockOwner 锁的所有者,通常为当前请求的requestID
\* @param expire 锁过期时间单位毫秒
\* @return
\*/
public boolean getSimpleRedisLock(String lockKey, String lockOwner, Long expire) {
SetParams setParams = SetParams.setParams().nx().px(expire);
String result = getJedis().set(lockKey, lockOwner, setParams);
return result != null ? LOCK\_SUCCESS.equals(result) : false;
}
/\*\*
\* 获取redis分布式锁, 如果锁存在且属于当前申请者,则刷新锁过期时间,返回true
\*
\* @param lockKey 锁的名字
\* @param lockOwner 锁的所有者,通常为当前请求的requestID
\* @param expire 锁过期时间单位毫秒
\* @return
\*/
public boolean getReentrantRedisLock(String lockKey, String lockOwner, Long expire) {
List keyList = Collections.singletonList(lockKey);
List argvList = Arrays.asList(lockOwner,expire.toString());
Long result = (Long) getJedis().eval(REENTRANT\_LOCK\_SCRIPT, keyList, argvList);
return result > 0;
}
/\*\*
\* 获取Redis锁,如果锁被占用则阻塞
\*
\* @param lockKey 锁的名字
\* @param lockOwner 锁的所有者,通常为当前请求的requestID
\* @param expire 锁过期时间单位毫秒
\* @param timeOut 超时时间毫秒
\* @return
\* @throws InterruptedException
\* @throws TimeoutException
\*/
public boolean waitRedisLock(String lockKey, String lockOwner, Long expire, Long timeOut) throws InterruptedException, TimeoutException {
long startTime = System.currentTimeMillis();
List keyList = Collections.singletonList(lockKey);
List argvList = Arrays.asList(lockOwner,expire.toString());
while (true) {
long nowTime = System.currentTimeMillis();
if (timeOut > 0 && (nowTime - startTime) >= timeOut) {
throw new TimeoutException();
}
try {
Long result = (Long) getJedis().eval(WAIT\_LOCK\_SCRIPT, keyList, argvList);
if (result == 1) {
return true;
}
Thread.sleep(result);
} catch (Exception e) {
Thread.sleep((timeOut >> 8) + 10);
}
}
}
/\*\*
\* 刷新锁过期时间
\*
\* @param lockKey
\* @param lockOwner
\* @param expire
\*/
public boolean refreshLockExpire(String lockKey, String lockOwner, Long expire) {
List keyList = Collections.singletonList(lockKey);
List argvList = Arrays.asList(lockOwner,expire.toString());
return (Long) getJedis().eval(REFRESH\_LOCK\_SCRIPT, keyList, argvList) > 0;
}
/\*\*
\* 释放锁
\*
\* @param lockKey
\* @param lockOwner
\* @return
\*/
public boolean releaseLock(String lockKey, String lockOwner) {
Long result = (Long) getJedis().eval(RELEASE\_LOCK\_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(lockOwner));
if (result > 0) {
DAEMON\_MAP.remove(lockKey);
return true;
}
return false;
}
/\*\*
\* 获取Redis锁,如果锁被占用则阻塞,启动一个守护线程用来保证在任务进行中锁不过期
\*
\* @param lockKey 锁的名字
\* @param lockOwner 锁的所有者,通常为当前请求的requestID
\* @param expire 锁过期时间单位毫秒
\* @return
\*/
public boolean waitRedisLockWithDaemon(final String lockKey, final String lockOwner, final Long expire, final long timeOut) throws TimeoutException, InterruptedException {
boolean sig = waitRedisLock(lockKey, lockOwner, expire, timeOut);
if (sig) {
DAEMON\_MAP.put(lockKey, true);
lockDaemon(lockKey, lockOwner, expire);
}
return sig;
}
/\*\*
\* 获取redis分布式锁,启动一个守护线程用来保证在任务进行中锁不过期
\*
\* @param lockKey 锁的名字
\* @param lockOwner 锁的所有者,通常为当前请求的requestID
\* @param expire 锁过期时间单位毫秒
\* @return
\*/
public boolean getRedisLockWithDaemon(final String lockKey, final String lockOwner, final Long expire) {
boolean sig = getSimpleRedisLock(lockKey, lockOwner, expire);
if (sig) {
DAEMON\_MAP.put(lockKey, true);
lockDaemon(lockKey, lockOwner, expire);
}
return sig;
}
/\*\*
\* 锁的守护线程,保证在锁持有期内不会过期
\*
\* @param lockKey
\* @param lockOwner
\* @param expire
\*/
protected void lockDaemon(final String lockKey, final String lockOwner, final Long expire) {
long exp;
if (expire > 1000) {
exp = expire >> 1;
} else {
exp = expire;
}
final long finalExp = exp;
Thread thread = new Thread(new Runnable() {
public void run() {
Boolean sig = DAEMON\_MAP.get(lockKey);
if (sig == null) {
sig = false;
}
try {
while (sig) {
if (refreshLockExpire(lockKey, lockOwner, expire)) {
Thread.sleep(finalExp);
sig = DAEMON\_MAP.get(lockKey);
} else {
sig = false;
DAEMON\_MAP.remove(lockKey);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
}