之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。
首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。
现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。以下详细讲解了数据库的交易同步锁和交易重试补偿锁的实现。
一、数据库的设计
数据库锁表的表结构如下:
field | type | comment |
ID | bigint | 主键 |
OUTER\_SERIAL\_NO | varchar | 流水号 |
CUST\_NO | char | 客户号 |
SOURCE\_CODE | varchar | 锁操作 |
THREAD\_NO | varchar | 线程号 |
STATUS | char | 锁状态 |
REMARK | varchar | 备注 |
CREATED\_AT | timestamp | 创建时间 |
UPDATED\_AT | timestamp | 更新时间 |
作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性
流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)
客户号:客户的唯一标识
锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作
线程号:当前操作线程的线程号,比如取当前线程的uuid
锁状态:P处理中,F失败,Y成功
二、代码设计
代码的目录结构如下:
主要贴一下锁操作的核心代码实现:
锁接口定义:DbLockManager.java
锁接口实现类:DbLockManagerImpl.java
/\*\* \* \* 数据库锁实现<br> \* \* @author fugaoyang \* \*/ @Service public class DbLockManagerImpl implements DbLockManager {</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">final</span> Logger LOG = LoggerFactory.getLogger(<span style="color: #0000ff">this</span><span style="color: #000000">.getClass()); @Autowired </span><span style="color: #0000ff">private</span><span style="color: #000000"> DbSyncLockMapper lockMapper; @Transactional </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> lock(String outerSerialNo, String custNo, LockSource source) { </span><span style="color: #0000ff">boolean</span> isLock = <span style="color: #0000ff">false</span><span style="color: #000000">; TradeSyncLock lock </span>= <span style="color: #0000ff">null</span><span style="color: #000000">; </span><span style="color: #0000ff">try</span><span style="color: #000000"> { lock </span>=<span style="color: #000000"> lockMapper.find(outerSerialNo, custNo, source.getCode()); </span><span style="color: #0000ff">if</span> (<span style="color: #0000ff">null</span> ==<span style="color: #000000"> lock) { lock </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> TradeSyncLock(); createLock(lock, outerSerialNo, custNo, source); </span><span style="color: #0000ff">int</span> num =<span style="color: #000000"> lockMapper.insert(lock); </span><span style="color: #0000ff">if</span> (num == 1<span style="color: #000000">) { isLock </span>= <span style="color: #0000ff">true</span><span style="color: #000000">; } LOG.info(ThreadLogUtils.getLogPrefix() </span>+ "加入锁,客户号[{}],锁类型[{}]"<span style="color: #000000">, custNo, source.getCode()); </span><span style="color: #0000ff">return</span><span style="color: #000000"> isLock; } </span><span style="color: #008000">//</span><span style="color: #008000"> 根据交易类型进行加锁</span> isLock =<span style="color: #000000"> switchSynsLock(lock, source); LOG.info(ThreadLogUtils.getLogPrefix() </span>+ "更新锁,客户号[{}],锁类型[{}]"<span style="color: #000000">, custNo, source.getCode()); } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) { LOG.error(ThreadLogUtils.getLogPrefix() </span>+ "交易加锁异常, 客户号:" +<span style="color: #000000"> custNo, e); } </span><span style="color: #0000ff">return</span><span style="color: #000000"> isLock; } @Transactional </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) { </span><span style="color: #0000ff">try</span><span style="color: #000000"> { TradeSyncLock lock </span>=<span style="color: #000000"> lockMapper.find(outerSerialNo, custNo, source.getCode()); </span><span style="color: #0000ff">if</span> (<span style="color: #0000ff">null</span> !=<span style="color: #000000"> lock) { lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(), ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid()); } LOG.info(ThreadLogUtils.getLogPrefix() </span>+ "释放锁,客户号[{}],锁类型[{}]"<span style="color: #000000">, custNo, source.getCode()); } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) { LOG.error(ThreadLogUtils.getLogPrefix() </span>+ "释放锁异常, 客户号:{}"<span style="color: #000000">, custNo, e); } } </span><span style="color: #008000">/**</span><span style="color: #008000"> * 匹配加锁 </span><span style="color: #008000">*/</span> <span style="color: #0000ff">private</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> switchSynsLock(TradeSyncLock lock, LockSource source) { </span><span style="color: #0000ff">boolean</span> isLock = <span style="color: #0000ff">false</span><span style="color: #000000">; </span><span style="color: #0000ff">switch</span><span style="color: #000000"> (source) { </span><span style="color: #0000ff">case</span><span style="color: #000000"> WITHDRAW: ; isLock </span>=<span style="color: #000000"> tradeSynsLock(lock); </span><span style="color: #0000ff">break</span><span style="color: #000000">; </span><span style="color: #0000ff">case</span><span style="color: #000000"> WITHDRAW_RETRY: ; isLock </span>=<span style="color: #000000"> retrySynsLock(lock); </span><span style="color: #0000ff">break</span><span style="color: #000000">; </span><span style="color: #0000ff">default</span><span style="color: #000000">: ; } </span><span style="color: #0000ff">return</span><span style="color: #000000"> isLock; } </span><span style="color: #008000">/**</span><span style="color: #008000"> * 交易同步锁 </span><span style="color: #008000">*/</span> <span style="color: #0000ff">private</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> tradeSynsLock(TradeSyncLock lock) { </span><span style="color: #008000">//</span><span style="color: #008000"> 处理中的不加锁,即不执行交易操作</span> <span style="color: #0000ff">if</span><span style="color: #000000"> (LockStatus.P.getName().equals(lock.getStatus())) { </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">; } </span><span style="color: #0000ff">int</span> num =<span style="color: #000000"> lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(), ThreadLogUtils.getCurrThreadUuid(), </span><span style="color: #0000ff">null</span><span style="color: #000000">); </span><span style="color: #0000ff">if</span> (num == 1<span style="color: #000000">) { </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">true</span><span style="color: #000000">; } </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">; } </span><span style="color: #008000">/**</span><span style="color: #008000"> * 补偿同步锁 </span><span style="color: #008000">*/</span> <span style="color: #0000ff">private</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> retrySynsLock(TradeSyncLock lock) { </span><span style="color: #008000">//</span><span style="color: #008000"> 处理中或处理完成的不加锁,即不执行补偿操作</span> <span style="color: #0000ff">if</span> (LockStatus.P.getName().equals(lock.getStatus()) ||<span style="color: #000000"> LockStatus.S.getName().equals(lock.getStatus())) { </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">; } </span><span style="color: #0000ff">int</span> num =<span style="color: #000000"> lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(), ThreadLogUtils.getCurrThreadUuid(), </span><span style="color: #0000ff">null</span><span style="color: #000000">); </span><span style="color: #0000ff">if</span> (num == 1<span style="color: #000000">) { </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">true</span><span style="color: #000000">; } </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">; } </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">void</span><span style="color: #000000"> createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) { lock.setOuterSerialNo(outerSerialNo); lock.setCustNo(custNo); lock.setSourceCode(source.getCode()); lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid()); lock.setStatus(LockStatus.P.getName()); lock.setRemark(source.getDesc()); }
}
获取当前线程号以及打印uuid工具类ThreadLogUtils.Java
/\*\* \* \* 线程处理<br> \* \* @author fugaoyang \* \*/ public class ThreadLogUtils {</span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLogUtils instance = <span style="color: #0000ff">null</span><span style="color: #000000">; </span><span style="color: #0000ff">private</span><span style="color: #000000"> ThreadLogUtils() { setInstance(</span><span style="color: #0000ff">this</span><span style="color: #000000">); } </span><span style="color: #008000">//</span><span style="color: #008000"> 初始化标志</span> <span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">final</span> Object __noop = <span style="color: #0000ff">new</span><span style="color: #000000"> Object(); </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLocal<Object> __flag = <span style="color: #0000ff">new</span> InheritableThreadLocal<Object><span style="color: #000000">() { @Override </span><span style="color: #0000ff">protected</span><span style="color: #000000"> Object initialValue() { </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">null</span><span style="color: #000000">; } }; </span><span style="color: #008000">//</span><span style="color: #008000"> 当前线程的UUID信息,主要用于打印日志;</span> <span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLocal<String> currLogUuid = <span style="color: #0000ff">new</span> InheritableThreadLocal<String><span style="color: #000000">() { @Override </span><span style="color: #0000ff">protected</span><span style="color: #000000"> String initialValue() { </span><span style="color: #0000ff">return</span> UUID.randomUUID().toString()<span style="color: #008000">/*</span><span style="color: #008000"> .toUpperCase() </span><span style="color: #008000">*/</span><span style="color: #000000">; } }; </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> ThreadLocal<String> currThreadUuid = <span style="color: #0000ff">new</span> ThreadLocal<String><span style="color: #000000">() { @Override </span><span style="color: #0000ff">protected</span><span style="color: #000000"> String initialValue() { </span><span style="color: #0000ff">return</span><span style="color: #000000"> UUIDGenerator.getUuid(); } }; </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> clear(Boolean isNew) { </span><span style="color: #0000ff">if</span><span style="color: #000000"> (isNew) { currLogUuid.remove(); __flag.remove(); currThreadUuid.remove(); } } </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> String getCurrLogUuid() { </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">isInitialized()) { </span><span style="color: #0000ff">throw</span> <span style="color: #0000ff">new</span> IllegalStateException("TLS未初始化"<span style="color: #000000">); } </span><span style="color: #0000ff">return</span><span style="color: #000000"> currLogUuid.get(); } </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> String getCurrThreadUuid() { </span><span style="color: #0000ff">return</span><span style="color: #000000"> currThreadUuid.get(); } </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> clearCurrThreadUuid() { currThreadUuid.remove(); } </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> String getLogPrefix() { </span><span style="color: #0000ff">if</span> (!<span style="color: #000000">isInitialized()) { </span><span style="color: #0000ff">return</span> ""<span style="color: #000000">; } </span><span style="color: #0000ff">return</span> "<uuid=" + getCurrLogUuid() + ">"<span style="color: #000000">; } </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> isInitialized() { </span><span style="color: #0000ff">return</span> __flag.get() != <span style="color: #0000ff">null</span><span style="color: #000000">; } </span><span style="color: #008000">/**</span><span style="color: #008000"> * 初始化上下文,如果已经初始化则返回false,否则返回true<br/> * * </span><span style="color: #808080">@return</span> <span style="color: #008000">*/</span> <span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">boolean</span><span style="color: #000000"> initialize() { </span><span style="color: #0000ff">if</span><span style="color: #000000"> (isInitialized()) { </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">false</span><span style="color: #000000">; } __flag.set(__noop); </span><span style="color: #0000ff">return</span> <span style="color: #0000ff">true</span><span style="color: #000000">; } </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> setInstance(ThreadLogUtils instance) { ThreadLogUtils.instance </span>=<span style="color: #000000"> instance; } </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> ThreadLogUtils getInstance() { </span><span style="color: #0000ff">return</span><span style="color: #000000"> instance; }
}
两种锁的实现的大致思路如下:
1.交易同步锁
当一个客户来取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;
当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;
当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;
2.交易重试补偿锁
1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;
2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。
上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。
后续可以优化的部分:
1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。
2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。
3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。
因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~
完整示例代码后续会更新到github。