深入理解分布式锁:原理、应用与挑战| 京东物流技术团队

京东云开发者
• 阅读 233

前言

在单机环境中,我们主要通过线程间的加锁机制来确保同一时间只有一个线程能够访问某个共享资源或执行某个关键代码块,从而防止各种并发修改异常。例如,在Java中提供了synchronized/Lock。但是在分布式环境中,这种线程间的锁机制已经不起作用了,因为系统会被部署在不同机器上,这些资源已经不是在线程间共享了,而是进程之间共享资源。为了解决这个问题,分布式锁应运而生。本文将详细解析分布式锁的原理、应用与挑战,以帮助读者更好地理解和应用分布式锁。

分布式锁的原理

首先,从最原始的锁定义来看,锁是一种同步机制,主要用于协调并发访问共享资源的行为。 分布式锁也符合这个定义,只不过运行环境从单机变为分布式环境。它们的核心操作都可以分为以下三个步骤:

  1. 获取:在访问共享资源前,先获取一个锁

  2. 占有:获取成功的进程或线程可以访问共享资源,其他进程或线程则需要等待锁释放后才能进行访问

  3. 释放:释放锁

同时,分布式锁也具备一般锁的以下特性:

  1. 互斥性:这是锁的核心特性,确保在任意时刻,同一个锁只能被一个进程或线程所持有。这种特性对于确保资源的独占访问和防止并发冲突至关重要。

  2. 一致性:加锁和释放锁的过程应尽量由同一个线程或进程完成,以确保锁状态的一致性,防止因锁状态不一致而导致的错误或混乱。

  3. 可重入性:这意味着已经持有锁的线程或进程可以再次获得同一个锁,这在某些情况下是有用的,例如递归函数中的锁操作。

还有分布式锁的特性问题:

  1. 锁租期问题:在分布式锁的场景中,为避免死锁或无法正常释放,锁通常设置有效时间。当有效时间过期但业务还在执行时,需要通过特定的机制(如watchdog)来续租,确保锁的持有者能够继续完成其操作。

  2. 性能:避免锁成为分布式系统的瓶颈。



分布式锁的主流实现方案

常见的分布式锁实现方案可以分为以下三大类:基于数据库(比如MySQL),基于缓存(比如 Redis)和基于分布式一致性协调服务组件(比如 ZooKeeper、etcd)



深入理解分布式锁:原理、应用与挑战| 京东物流技术团队



基于数据库的分布式锁(以MySQL为例)

要实现一套基于数据库的分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现分布式锁。

为了更好的演示,我们先创建一张数据库表,例如:

CREATE TABLE `database_lock` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `resource` int(11) NOT NULL COMMENT '锁定的资源',
  `desc` varchar(128) NOT NULL DEFAULT '' COMMENT '描述',
  `create_time` datetime COMMENT '创建时间', 
  `update_time` datetime COMMENT '更新时间'
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_idx_resource` (`resource`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式锁表';

记录锁

  1. 获取锁:

当想要获取锁时,可以插入一条数据:

INSERT INTO `database_lock` (resource, desc, create_time, update_time) VALUES (1,'lock',now(), now());

由于表中对resource设置了唯一索引,也就存在唯一性约束,这样如果有多个请求同时提交到数据库的话,数据库可以保证只有一个操作成功,那么我们就可以认为操作成功的请求获得了锁。

  1. 占有锁:

成功获取锁后,就可以继续操作共享资源了。

  1. 释放锁:

当需要释放锁时,可以删除这条数据:

DELETE FROM database_lock WHERE resource = 1;

以上实现方式非常简单,但是以下几点需要特别注意:

  1. 这种锁没有失效时间,一旦释放锁的操作失败就会导致锁记录一直存在数据库中,锁无法释放,其他线程无法获得锁。这个缺陷也很好解决,比如可以增加一个定时任务定时清理未正常释放的锁记录。

  2. 这种锁的可靠性依赖于数据库。可以设置备库,避免单点,进一步提升可靠性。

  3. 这种锁时非阻塞的,因为插入数据失败后会立即报错,想要获得锁就需要再次操作。如果需要阻塞式的,可以通过For循环、while循环模拟,直至成功再返回。

  4. 这种锁时非可重入的,因为同一个线程在没有释放锁之前无法再次获得锁,因为数据库中已经存在同一份记录了。想要实现可重入,可以在数据库中添加一些锁的唯一标识字段,比如 主机信息、线程信息等,那么再次获取锁的时候可以先查询数据,如果当前的主机信息和线程信息等能被查询到的话,可以直接分配锁。

乐观锁

如果数据的更新在大多数情况下是不会产生冲突的,那么只在数据库更新操作提交的时候对数据作冲突检测,如果检测的结果与预期一致,则获得锁,如果出现了与预期数据不一致的情况,则丢弃本次更新。

乐观锁大多数是基于版本控制实现的。即给数据增加一个版本标识,比如通过为数据库表添加一个"version"字段来实现。

为了更好的理解数据库乐观锁在实际项目中的使用,这里就列举一个典型的电商库存更新的例子。电商平台中,当用户提单的时候就会对库存进行操作(库存减1代表已经卖出了一件)。我们将这个库存模型用下面的一张表optimistic_lock来表述:

CREATE TABLE `optimistic_lock` (
 `id` BIGINT NOT NULL AUTO_INCREMENT,
 `resource` int NOT NULL COMMENT '锁定的资源',
 `version` int NOT NULL COMMENT '锁的版本信息',
 `create_time` datetime COMMENT '创建时间',
 `update_time` datetime COMMENT '更新时间',
 `delete_time` datetime COMMENT '删除时间', 
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式锁表-乐观锁';

其中:resource表示具体操作的资源,在这里也就是特指库存;version表示版本号。

在使用乐观锁之前要确保表中有相应的数据,比如:

INSERT INTO optimistic_lock (resource, version, create_at, update_at) VALUES(20, 10, now(), now());

如果只有一个线程进行操作,数据库本身就能保证操作的正确性。主要步骤如下:

  1. 获取资源信息:SELECT resource FROM optimistic_lock WHERE id = 1

  2. 执行业务逻辑

  3. 提交数据:UPDATE optimistic_lock SET resource = resource -1 WHERE id = 1

但是当有两个用户同时购买一件商品时,库存实际操作应该是库存(resource)减2,但是由于有高并发的存在,第一个用户请求执行之后(执行了1、2,但是还没有完成3),第二个用户在购买相同的商品(执行1),此时查询出的库存并没有完成减1的动作,那么最终会导致2个线程购买的商品却出现库存只减1的情况,最终导致库存异常。

在引入了version版本控制之后,具体的操作就会演变成如下步骤:

  1. 获取资源信息: SELECT resource, version as oldVersion FROM optimistic_lock WHERE id = 1

  2. 执行业务逻辑

  3. 更新资源:UPDATE optimistic_lock SET resource = resource -1, version = version + 1 WHERE id = 1 AND version = oldVersion

另外,借助更新时间戳(update_at)也可以实现乐观锁,和采用version字段的方式相似:更新操作执行前先获取并记录当前的更新时间,在提交更新时,检测当前更新时间是否与更新开始时获取的更新时间戳相等。

由于在检测数据冲突时并不依赖唯一索引,不会影响请求的性能,在并发量较小的时候只有少部分请求会失败,适用于竞争较少的场景。缺点是当应用并发量高的时候,version值在频繁变化,则会导致大量请求失败,影响系统的可用性。另外,我们通过上述sql语句还可以看到,数据库锁都是作用于同一行数据记录上,这就会导致热点数据,在一些特殊场景,如大促、秒杀等活动的时候,大量的请求同时请求同一条记录的行锁,会对数据库产生很大的写压力。所以综合数据库乐观锁的优缺点,可以看出乐观锁比较适合并发量不高,写操作不频繁的场景。

悲观锁

我们还可以借助数据库中自带的锁来实现分布式锁。例如在查询语句后面增加FOR UPDATE,数据库会在查询过程中给数据库表增加悲观锁,也称排他锁。当某条记录被加上悲观锁之后,其它线程也就无法再该行上增加悲观锁。

悲观锁,与乐观锁相反,总是假设最坏的情况,它认为数据的更新在大多数情况下是会产生冲突的。

在使用悲观锁的同时,我们需要注意一下锁的级别。MySQL InnoDB引擎在加锁的时候,只有明确地指定主键(或唯一索引)的才会执行行锁 (只锁住被选取的数据)。 在使用悲观锁时,我们必须关闭MySQL数据库的自动提交属性(参考下面的示例),因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。

mysql> SET AUTOCOMMIT = 0;
Query OK, 0 rows affected (0.00 sec)

这样在使用FOR UPDATE获得锁之后可以执行相应的业务逻辑,执行完之后再使用COMMIT来释放锁。

下面通过前面的database_lock表来具体表述一下用法。假设有一线程A需要获得锁并执行相应的操作,那么它的具体步骤如下:

  1. 获取锁:SELECT * FROM database_lock WHERE id = 1 FOR UPDATE;。

  2. 执行业务逻辑。

  3. 释放锁:COMMIT。

如果另一个线程B在线程A释放锁之前执行步骤1,那么它会被阻塞,直至线程A释放锁之后才能继续。注意,如果线程A长时间未释放锁,那么线程B会报错,参考如下(lock wait time可以通过innodb_lock_wait_timeout来进行配置):

ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction

注意事项:

  1. 上面的示例中演示了指定主键并且能查询到数据的过程(触发行锁),如果查不到数据那么也就无从“锁”起了。 2. 如果未指定主键(或者唯一索引)且能查询到数据,那么就会触发表锁或间隙锁,比如步骤1改为执行:
SELECT * FROM database_lock WHERE desc='lock' FOR UPDATE;

或者主键不明确也会触发表锁,又比如步骤1改为执行:

SELECT * FROM database_lock WHERE id>0 FOR UPDATE;

在悲观锁中,每一次行数据的访问都是独占的,只有当正在访问该行数据的请求事务提交以后,其他请求才能依次访问该数据,否则将阻塞等待锁的获取。悲观锁可以严格保证数据访问的安全。但是缺点也明显,即每次请求都会额外产生加锁的开销且未获取到锁的请求将会阻塞等待锁的获取,在高并发环境下,容易造成大量请求阻塞,影响系统性能。另外,悲观锁使用不当还可能产生死锁的情况。

小结

基于以上讨论,借助与数据库自身的能力(唯一索引,数据库排他锁),基于数据库实现分布式锁还是挺简单的。下面对其实用性其进行简单分析:

优点

▪实现简单,容易理解,不需要额外的第三方中间件。

▪通过数据库的事务特性可以确保锁的原子性、互斥性。

不足

▪性能相对较低,特别是在高并发场景下,频繁的数据库操作可能导致性能瓶颈。

▪需要自己考虑锁超时等问题,实现起来较为繁琐。

▪依赖本地事务,不支持集群部署,不能保证高可用。

基于Redis实现的分布式锁

方案一:SETNX+EXPIRE

这种是最简单的实现方式,先通过setNX或取到锁,然后通过expire命令添加超时时间。这种方式存在一个很大的问题:这两个命令不是原子操作,需要和redis交互两次,客户端可能会在第一个命令执行完之后挂掉,导致没有设置超时时间,锁无法正常失效。于是产生了以下优化方案。

方案二:SETNX+VALUE

这种方式的value值中保存的是客户端计算出的过期时间,通过setnx命令一次性写入redis中

public boolean getLock(String key,Long expireTime) {
    long now = System.currentTimeMills();
    //绝对超时时间
    long expireTime = now + expireTime; 
    String expiresStr = String.valueOf(expireTime); 
    // 加锁成功 
    if ( jedis.setnx(key, expiresStr)==1) { 
        return true; 
    } 
    // 检查锁是否过期,获取锁的value 
    String currentValueStr = jedis.get(key); 
    // 如果记录的过期时间小于系统时间,则表示已过期 
    if (currentValueStr != null && Long.parseLong(currentValueStr) < now) { 
        // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间 
        String oldValueStr = jedis.getSet(key, expiresStr); 
        if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { 
            // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁 
            return true; 
        } 
    } 
    //其他情况,均返回加锁失败 
    return false;
}

这种方式通过value将超时时间赋值,解决了第一种方案的两次操作不能保证原子性的问题。但是这种方式也有问题:

  1. 在锁过期时,如果多个线程同时来加锁,可能会导致多个线程都加锁成功(不满足互斥性);

  2. 在多个线程都加锁成功后,因为锁中没有加锁线程的标识,会导致多个线程都可以解锁(不满足一致性);

  3. 超时时间是在客户端计算的,不同的客户端的时钟可能会存在差异,导致在加锁客户端没有超时的锁,在另一个客户端已经超时(基于客户端时钟,不满足一致性)。

方案三:使用Lua脚本

同样是为了解决第一种方案中的原子性问题,我们可以采用Lua脚本,来保证SETNX+EXPIRE操作的原子性。

if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then 
    redis.call('expire',KEYS[1],ARGV[2])
else
    return 0
end;

在Java代码中,使用jedis.eval()执行加锁。

public boolean getLock(String key, String value, long expireTime) {  
    String lua_scripts = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +  
                          "redis.call('expire', KEYS[1], ARGV[2]) " +  
                          "return 1 " +  
                          "else " +  
                          "return 0 " +  
                          "end";  
    List<String> keys = Collections.singletonList(key);  
    List<String> argv = Arrays.asList(value, String.valueOf(expireTime));  
    Long result = (Long) jedis.eval(lua_scripts, keys, argv);  
    return result != null && result == 1;  
}

这种方式可以完全避免在加锁后中断设置不上超时时间的问题。也不会存在有时钟不一致的问题,和高并发情况下多个线程都加上锁的问题。但是这种方式就一定没有问题了吗?答案是否定的。考虑以下场景:

当服务A加锁成功后,正在执行业务的过程中,锁过期啦,这时服务A是没有感知的;

接着服务B这时来获取锁,成功获取到了;

紧接着,服务A处理完业务了,来释放锁,成功释放掉了,而服务B这时还以为它的锁还在,在执行代码。

全乱套了有没有?以为自己加锁了,其实你没加;

以为自己解锁成功了,其实解的是别人的锁;

这种方案的问题主要是因为两点:锁过期释放,业务没处理完;锁没有唯一身份标识

备注:从Redis 2.6.12版本开始支持setNx同时设置超时时间

如果你想要在设置key的同时为其设置过期时间,并希望这是一个原子操作,你可以考虑使用Redis的 SET 命令,如下所示:

SET mykey "myvalue" NX EX 10  # 设置mykey的值为myvalue,仅当mykey不存在时,并设置过期时间为10秒

方案四:SET NX PX EX + 唯一标识

对于误删锁的问题,我们可以在加锁时,由客户端生成一个唯一ID作为value设置在锁中,在删除锁时先进行身份判断,再删除;加锁逻辑如下:

public boolean getLock(String key,String uniId,Long expireTime) {    
    //加锁    
    return jedis.set(key, uniId, "NX", "EX", expireTime) == 1;
}
// 解锁
public boolean releaseLock(String key,String uniId) {    
    // 因为get和del操作并不是原子的,所以使用lua脚本    
    String lua_script = "if redis.call('get',KEYS[1]) == ARGV[1] then  return redis.call('del',KEYS[1]) else return 0  end;"; 
    List<String> keys = Collections.singletonList(key); 
    List<String> argv = Arrays.asList(uuiId);  
    Object result = jedis.eval(lua_scripts, keys, argv);    
    return result !=null && result.equals(1L);
}

这种方式解决了锁被误删的问题,但是同样存在锁超时失效,但是业务还未处理完的问题

方案五:Redission框架

那么对于锁过期失效,业务未处理完毕的问题,该如何处理呢?

我们可以在加锁成功后,启动一个守护线程,在守护线程中隔一段时间就对锁的超时时间再续长一点,直到业务处理完成后再释放锁,防止锁在业务处理完毕之前提前释放。而Redission框架就是使用的这种机制来解决的这个问题。

  1. 当一个线程去获取锁,在加锁成功的情况下,那么它已经通过Lua脚本将数据保存在了redis中;

  2. 然后在加锁成功的同时,启动Watch Dog看门狗,每隔10秒检查是否还持有锁,如果是则将锁超时时间延长。

  3. 如果一开始就获取锁失败,则会一直循环获取。

方案六:RedLock

以上的这些方案,都只是在Redis单机模式下讨论的方案,如果Redis是采用集群模式,还会存在一些问题,比如:

在集群模式下,一般Master节点会将数据同步到Salve节点,如果我们先在Master节点上加锁成功,在同步到Salve节点之前,这个Master节点挂了,然后另一台Salve节点升级为Master节点,这时这个节点上并没有我们的加锁数据;

此时另一个客户端线程来获取相同的锁,它就会获取成功,这时在我们的应用中将会有两个线程同时获取到这个锁,这个锁也就不安全了。

为了解决这个问题,Redis的作者提出了一种高级的分布式锁算法,叫:RedLock, 即:Redis Distributed Lock, Redis分布式锁。

RedLock的核心原理:

•在Redis集群中选出多个Master节点,保证这些Master节点不会同时宕机;

•并且各个Master节点之间相互独立,数据不同步;

•使用与Redis单实例相同的方法来加锁和解锁。

那么RedLock到底是如何来保证在有节点宕机的情况下,还能安全的呢?

1.假设集群中有N台Master节点,首先,获取当前时间戳;

2.客户端按照顺序使用相同的key,value依次获取锁,并且获取时间要比锁超时时间足够小;比如超时时间5s,那么获取锁时间最多1s,超过1s则放弃,继续获取下一个;

3.客户端通过获取所有能获取的锁之后减去第一步的时间戳,这个时间差要小于锁超时时间,并且要至少有N/2 + 1台节点获取成功,才表示锁获取成功,否则获取失败;

4.如果成功获取锁,则锁的有效时间是原本超时时间减去第三步的时间差;

5.如果获取锁失败,则要解锁所有的节点,不管该节点加锁时是否成功,防止有漏网之鱼。

Redssion库对RedLock方案已经做了实现,如果你的Redis是集群部署,可以看看使用方法。

参考文档:https://redis.io/topics/distlock

小结

优点

▪实现简单,性能较高。

▪可以利用Redis的集群特性实现高可用性和可扩展性。

▪有现成的第三方包和工具支持,实现起来相对简单。

缺点

▪如果Redis节点故障,可能导致锁失效或死锁。

▪RedLock算法虽然提高了容错性,但增加了实现的复杂性和开销。

基于Zookeeper等实现的分布式锁

zookeeper 锁相关基础知识

zk 一般由多个节点构成(单数),采用 zab 一致性协议。因此可以将 zk 看成一个单点结构,对其修改数据其内部自动将所有节点数据进行修改而后才提供查询服务。zk 的数据以目录树的形式,每个目录称为 znode,znode 中可存储数据(一般不超过 1M),还可以在其中增加子节点。

znode节点有三种类型。序列化节点,每在该节点下增加一个节点自动给该节点的名称上添加序号并且自增1。临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除。最后就是普通节点。

Watch 机制,client 可以监控每个节点的变化,当产生变化时 client 会接受到一个事件通知。

zk 基本锁

原理:利用临时节点与 watch 机制。每个锁占用一个普通节点 /lock,当需要获取锁时在 /lock 目录下创建一个临时节点,创建成功则表示获取锁成功,失败则 watch/lock 节点,有删除操作后再去争锁。临时节点好处在于当进程挂掉后能自动上锁的节点自动删除即取消锁。

缺点:所有取锁失败的进程都监听父节点,很容易发生羊群效应,即当释放锁后所有等待进程一起来创建节点,并发量很大,增加zk集群压力。

zk 锁优化

原理:上锁改为创建临时有序节点,每个上锁的节点均能创建节点成功,只是其序号不同。只有序号最小的可以拥有锁,如果这个节点序号不是最小的则 watch 序号比本身小的前一个节点 (公平锁)。

步骤:

•在 /lock 节点下创建一个有序临时节点 (EPHEMERAL_SEQUENTIAL)。

•判断创建的节点序号是否最小,如果是最小则获取锁成功。不是则获取锁失败,然后 watch 序号比本身小的前一个节点。

•当取锁失败,设置 watch 后则等待 watch 事件到来后,再次判断是否序号最小。

•取锁成功则执行代码,最后释放锁(删除该节点)。

参考代码:

@Slf4j
public class DistributedLock implements Lock, Watcher{
     /**
      * zk客户端
      */
      private ZooKeeper zk;
     /**
     * 根目录
     */
     private final String root = "/locks";
     /**
     * 锁名称
     */
     private final String lockName;

     /**
     * 等待前一个锁
     */
     private String waitNode;

     /**
     * 当前锁
     */
     private String myZnode;
     /**
     * 计数器
     */
     private CountDownLatch latch;
     /**
     * 会话超时时间
     */
     private final int sessionTimeout = 30000;
     /**
     * 异常列表
     */
     private final List<Exception> exception = new ArrayList<>();

     /**
     * 创建分布式锁
     * @param config 服务器配置
     * @param lockName 竞争资源标志,lockName中不能包含单词lock
     */
     public DistributedLock(String config, String lockName){
         this.lockName = lockName;
         // 创建与服务器的连接
         try {
             zk = new ZooKeeper(config, sessionTimeout, this);
             Stat stat = zk.exists(root, false);
             if(stat == null){
                 // 创建根节点
                 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
             }
         } catch (IOException | KeeperException | InterruptedException e) {
              exception.add(e);
         }
     }

    /**
     * zookeeper节点的监视器
     */
     @Override
     public void process(WatchedEvent event) {
         if(this.latch != null) {
             this.latch.countDown();
         }
     }

     @Override
     public void lock() {
         if(!exception.isEmpty()){
              throw new LockException(exception.get(0));
         }
         try {
             if(this.tryLock()){
                 log.info("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
             } else{
                 //等待锁
                 waitForLock(waitNode, sessionTimeout);
             }
         } catch (KeeperException | InterruptedException e) {
             throw new LockException(e);
         }
     }

     @Override
     public boolean tryLock() {
         try {
             String splitStr = "_lock_";
             if(lockName.contains(splitStr)) {
                 throw new LockException("lockName can not contains \u000B");
         }
         //创建临时有序子节点
         myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
         log.info(myZnode + " is created ");
         //取出所有子节点
         List<String> subNodes = zk.getChildren(root, false);
         //取出所有lockName的锁
         List<String> lockObjNodes = new ArrayList<String>();
         for (String node : subNodes) {
             String _node = node.split(splitStr)[0];
             if(_node.equals(lockName)){
                 lockObjNodes.add(node);
             }
         }
         Collections.sort(lockObjNodes);
         log.info("myZnode={} minZnode={}", myZnode, lockObjNodes.get(0));
         if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
             //如果是最小的节点,则表示取得锁
             return true;
         }
         //如果不是最小的节点,找到比自己小1的节点
         String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
         waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
         } catch (KeeperException | InterruptedException e) {
             throw new LockException(e);
         }
         return false;
     }

     @Override
     public boolean tryLock(long time,@NonNull TimeUnit unit) {
         try {
             if(this.tryLock()){
                 return true;
             }
             return waitForLock(waitNode,time);
         } catch (Exception e) {
             log.error("tryLock exception:", e);
         }
         return false;
     }

     /**
     * @param lower 监视节点
     * @param waitTime 等待超时时间
     * @return 是否获得锁
     * @throws InterruptedException
     * @throws KeeperException
     */
      private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
          Stat stat = zk.exists(root + "/" + lower,true);
          //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
          if(stat != null){
             log.info("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
             this.latch = new CountDownLatch(1);
             this.latch.await(waitTime, TimeUnit.MILLISECONDS);
             this.latch = null;
          }
          return true;
     }


      /**
      * 解锁方法
      * @throws InterruptedException 线程中断异常
      * @throws KeeperException ZooKeeper异常
      */
      @Override
      public void unlock() {
          try {
              log.info("unlock " + myZnode);
              zk.delete(myZnode,-1);
              myZnode = null;
              zk.close();
         } catch (InterruptedException | KeeperException e) {
             log.error("unlock exception:", e);
         }
     }

     @Override
     public void lockInterruptibly() throws InterruptedException {
          this.lock();
     }

     @Override
     public Condition newCondition() {
         return null;
     }

     /**
     * 自定义锁异常
     */
     public static class LockException extends RuntimeException {
         private static final long serialVersionUID = 1L;

         /**
         * @param e 异常
         */
         public LockException(String e){
             super(e);
         }

         /**
         * @param e 异常
         */
         public LockException(Exception e){
             super(e);
         }
     }
}

小结

优点

▪有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。

▪具有良好的顺序性和公平性,可以有效的避免死锁和竞争问题。

▪支持高可用,容错性较好,通过zookeeper集群可以确保锁的可靠性和强一致性。

▪有现成的第三方包和工具支持,实现起来相对简单。

不足

▪性能相对较低,ZK中创建和删除节点只能通过 Leader 服务器来执行,然后将数据同步到所有的 Follower 机器上。

▪需要维护ZooKeeper集群,增加了系统的复杂性和维护成本。

▪在高并发场景下,频繁的锁操作可能导致ZooKeeper集群成为性能瓶颈。

分布式锁的应用

分布式锁的应用场景

分布式锁在分布式系统中有着广泛的应用,主要体现在以下几个方面:

1.共享资源竞争:当多个进程或线程尝试同时访问或修改共享资源时,为了避免数据冲突和不一致,可以使用分布式锁来确保同一时刻只有一个节点可以访问资源。这在多机器或多节点的分布式系统中尤为重要,因为传统的单机并发控制策略可能不再适用。

2.效率性:使用分布式锁可以避免不同节点或进程重复执行相同的任务或操作。例如,在任务调度系统中,如果多个节点都尝试执行同一任务,通过使用分布式锁,可以确保只有一个节点执行该任务,从而提高系统的整体效率。

3.特殊业务场景:在电商业务中,分布式锁常用于处理高并发场景下的资源竞争问题。例如,在扣减库存或防止流量过载时,通过分布式锁可以确保操作的原子性和一致性。此外,秒杀抢购、优惠券领取等场景也常利用分布式锁来确保数据的一致性。

4.微服务架构:在微服务架构的系统中,分布式锁发挥着至关重要的作用。特别是在金融支付系统等对一致性要求极高的场景中,分布式锁被广泛应用于实现各种特殊需求,确保操作的原子性、数据的准确性和一致性。

总的来说,分布式锁的主要应用场景涉及需要确保数据一致性、防止数据冲突和提高系统效率的场景。通过使用分布式锁,可以在分布式系统中实现更精细化的控制和协调,确保系统的稳定性和可靠性。

选型分析

根据以上实现原理的分析,选择哪种分布式锁方案取决于具体的应用场景和需求。对于简单的应用场景和对性能要求不高的系统,基于MySQL的分布式锁可能是一个不错的选择。对于高并发、高性能要求的系统,基于Redis的分布式锁可能更合适。而如果需要确保锁的公平性和一致性,并且对性能要求不是特别高,那么基于ZooKeeper的分布式锁可能是一个更好的选择。在实际应用中,还需要根据系统的具体情况和需求进行权衡和选择。

关于布式锁互斥性的进一步讨论

经过以探讨,我们可以得出一个结论:基于单机模式的MySQL、Redis以及ZooKeeper集群,均能够严格实现分布式锁,从而确保锁的互斥性。这里之所以强调锁的互斥性,是因为它确保了同一时刻仅有一个进程或线程能够访问特定的共享资源,从而避免了数据冲突和不一致性的发生。

然而,当我们转向MySQL主从模式或Redis主从模式时,情况便发生了变化。这些模式在保障锁的互斥性方面存在明显的不足。要深入探究这一现象的根源,我们不得不提及分布式领域中的一个关键理论——CAP理论。

从锁的定义和特性出发,我们知道,在获取锁的过程中,需要一个全局可见的标识。当一个进程或线程成功获取锁后,该标识会被设置并变得全局可见,这样其他线程就无法突破锁的互斥性限制,确保锁的互斥性得到维护。而这一切的前提,便是数据必须保持一致性

然而,主从模式更倾向于保障可用性和分区容忍性,即AP模型,这在一定程度上牺牲了数据的一致性。相比之下,ZooKeeper集群则采用了CP模型,即保证一致性和分区容忍性。因此,在分布式环境下,ZooKeeper集群能够确保数据的一致性,从而确保锁的互斥性得到严格保障。

综上所述,在分布式系统中,确保锁的互斥性至关重要。我们在选择和设计分布式锁时,必须充分考虑其互斥性保障能力,并结合实际场景和需求,选择最合适的实现方案。当业务场景需要高可靠性的分布式锁时,ZooKeeper集群因其出色的数据一致性保障能力,自然成为了一个更加值得考虑的优秀选择。

分布式锁的挑战

虽然分布式锁为分布式系统带来了诸多好处,但在实际应用中也面临一些挑战:

1.性能问题:分布式锁的获取和释放需要通过网络通信,这可能会引入额外的性能开销。在高并发场景下,如果大量进程或线程争用同一个锁,可能导致性能瓶颈。

2.可靠性问题:分布式锁的可靠性受到网络、硬件、软件等多方面因素的影响。如果锁服务出现故障或网络中断,可能导致死锁或数据不一致等问题。

3.可扩展性问题:随着分布式系统的规模不断扩大,如何确保分布式锁的可扩展性成为一个重要问题。需要设计合理的分布式锁策略,以适应不同规模和需求的系统。

本文主要讨论了分布式锁的原理和不同的实现方案,有基于数据库,Redis和ZooKeeper三种选择,并且各有优缺点。项目开发过程中根据自己实际的业务场景,选择适合自己项目的方案。



文章中难免会有不足之处,希望读者能给予宝贵的意见和建议。谢谢!

参考文档

https://cloud.tencent.com/developer/article/1909596

https://zhuanlan.zhihu.com/p/42056183

作者:京东物流 刘浩

来源:京东云开发者社区

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
2年前
java 多线程总结篇4——锁机制
在开发Java多线程应用程序中,各个线程之间由于要共享资源,必须用到锁机制。Java提供了多种多线程锁机制的实现方式,常见的有synchronized、ReentrantLock、Semaphore、AtomicInteger等。每种机制都有优缺点与各自的适用场景,必须熟练掌握他们的特点才能在Java多线程应用开发时得心应手。——《Java锁机制详解》(
Wesley13 Wesley13
2年前
java 面试知识点笔记(十)多线程与并发
问:线程安全问题的主要诱因?1.存在共享数据(也称临界资源)2.存在多条线程共同操作这些共享数据解决方法:同一时刻有且只有一个线程在操作共享数据,其他线程必须等到该线程处理完数据后再对共享数据进行操作互斥锁的特征:1.互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程协调机制,这样在同一时间只有一
zdd小小菜鸟 zdd小小菜鸟
1年前
分布式锁面试
分布式锁面试引言tex为什么要使用分布式锁?为了保证一个方法在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchroniz
Stella981 Stella981
2年前
Redis的锁
分布式与集群什么是锁在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须
Wesley13 Wesley13
2年前
Java 之 synchronized 详解
一、概念synchronized是Java中的关键字,是利用锁的机制来实现同步的。锁机制有如下两种特性:互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程中的协调机制,这样在同一时间只有一个线程对需同步的代码块(复合操作)进行访问。互斥性我们也往往称为操作的原子性。可见性:必须确
Wesley13 Wesley13
2年前
Java多线程synchronized关键字引出的多种锁
前言Java中的 synchronized关键字可以在多线程环境下用来作为线程安全的同步锁。本文不讨论 synchronized 的具体使用,而是研究下synchronized底层的锁机制,以及这些锁分别的优缺点。一、synchronized机制synchro
Wesley13 Wesley13
2年前
Java并发编程:Lock
一.synchronized的缺陷synchronized是java中的一个关键字,也就是说是Java语言内置的特性。那么为什么会出现Lock呢?  在上面一篇文章中,我们了解到如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁
Wesley13 Wesley13
2年前
Java多线程锁释放
Java多线程运行环境中,在哪些情况下会使对象锁释放?由于等待一个锁的线程只有在获得这把锁之后,才能恢复运行,所以让持有锁的线程在不再需要锁的时候及时释放锁是很重要的。在以下情况下,持有锁的线程会释放锁:(1)执行完同步代码块,就会释放锁。(synchronized)(2)在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。(exc
Stella981 Stella981
2年前
Linux 多线程
I.同步机制线程间的同步机制主要包括三个:互斥锁:以排他的方式,防止共享资源被并发访问;互斥锁为二元变量,状态为0开锁、1上锁;开锁必须由上锁的线程执行,不受其它线程干扰.条件变量:
Wesley13 Wesley13
2年前
Java分布式锁看这篇就够了
\什么是锁?在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到