R2M分布式锁原理及实践

京东云开发者
• 阅读 431

作者:京东科技 张石磊

1 案例引入

名词简介:

资源:可以理解为一条内容,或者图+文字+链接的载体。

档位ID: 资源的分类组,资源必须归属于档位。

问题描述:当同一个档位下2条资源同时审批通过时,收到擎天审批系统2条消息,消费者应用部署了2台机器,此时正好由2台机器分别消费,在并发消费时,先更新资源状态,然后写缓存,每次取前100条资源,类似select * from resource where gear_id=xxx limit 100 order by id desc;

在写档位缓存,此时事务未提交,并发查询时根据档位Id查询时查询不到对方的数据,全量写缓存时导致后写的缓存覆盖了先写的缓存,即缓存被覆盖,导致投放资源缺失。

方案思考 :

方案1:一台机器消费mq–单点问题

方案2:将同档位ID的资源路由到同一个queue,需要审批系统配合根据档位Id做路由,审批系统发的消息不只是cms审批数据,此方案不适用。

方案3:在档位级别加分布式锁。

经比较,最终采用方案3是合适的方案.

2 锁说明和分布式锁选择

synchronized锁的粒度是JVM进程维度,集群模式下,不能对共享资源加锁,此时需要跨JVM进程的分布式锁。

分布式锁方式核心实现方式优点缺点分析

1 数据库:

悲观锁,lock

乐观锁,通过版本号实现version

实现简单,不依赖中间件

数据库IO瓶颈,性能差

单实例存在单点问题,主从架构存在数据不一致,主从切换时其他客户端可重复加锁。

2 zookeeper

创建临时节点

CP模型,可靠性高,不存在主从切换不一致问题

频繁创建和销毁临时节点,且

集群模式下,leader数据需要同步到follower才算加锁成功,性能不如redis

主从切换服务不可用

3 redis集群

setnx+expire

性能高

有封装好的框架redission

支持超时自动删除锁

集群支持高可用,AP模型

主从切换时其他客户端可重复加锁。

R2M是基于开源的Redis cluster(Redis 3.0以上版本)构建的高性能分布式缓存系统,我们系统一直在使用,3.2.5版本开始支持分布式锁。

3 r2m分布式锁原理

示例代码:

String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());

R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);

//获取锁,锁的默认有效期30s,获取不到锁就阻塞

lock.lock();

try {

    //业务代码

    resourceService.afterApprovedHandle(resource);

}  finally {

    //释放锁

    lock.unlock();

}


1 加锁核心流程:

加锁流程图:

R2M分布式锁原理及实践

1):尝试获取锁,通过执行加锁Lua脚本来做;

2):若第一步未获取到锁,则去redis订阅解锁消息

3):一旦持有锁的线程释放了锁,就会广播解锁消息,其他线程自旋重新尝试获取锁。

核心加锁原理:使用lua脚本封装了hset和pexpire命令,保证是一个原子操作, KEYS[1]是加锁的key,argv[2]是加锁的客户端ID(UUID+线程ID),ARGV[1]是锁的有效期,默认30s.

private Object acquireInternal(List<String> args) {

if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {

return -1L;

} else {

try {

//hash结构,hash的key是加锁的key,键值对的key为客户端的UUID+线程id,value为锁的重入计数器值。

return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),

"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;

return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);

} catch (Exception var3) {

this.setUnlocked();

throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3);

}

}

}


args参数

private List<String> acquireArgs(long leaseTime) {

        List<String> args = new ArrayList();

        if (leaseTime != -1L) {

            args.add(String.valueOf(leaseTime));

        } else {

             //默认30s

            args.add(String.valueOf(this.internalLockLeaseTime()));

        }

        //UUID+当前线程id

        args.add(this.currentThreadLockId(Thread.currentThread().getId()));

        return args;

    }




获取锁失败订阅锁的channel

//获取锁失败,订阅释放锁的消息

 private boolean failedAcquire() {

            this.subLock();

            return false;

        }



 private void subLock() {

            CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);

            if (cf != null) {

                cf.handleAsync(this::reSubIfEx);

            }



        }


锁释放后,订阅者通过自旋尝试获取锁。

//tryAcquire获取锁,!tryAcquire就是获取锁失败,锁释放后,通知线程唤醒后返回false,然后通过自旋,尝试获取锁,

public final void acquire(long arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }



 final boolean acquireQueued(final Node node, long arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            //内部自旋获取锁

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }


2 释放锁核心逻辑:

1)删除分布式锁key(如果可重入锁计数为0)

  1. 发释放锁的广播消息

3)取消watchdog

private Object unlockInternal(List<String> args) {

        logger.debug("{} trying to unlock.", Thread.currentThread().getId());



        Object var2;

        try {

     //判断锁 key 是否存在,如果存在,然后递减hash的value值,当value值为0时再删除锁key,并且广播释放锁的消息

            if (this.unlockSha() == null) {

                var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);

                return var2;

            }



            var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);

        } catch (Exception var6) {

            throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6);

        } finally {

            this.finalizeRelease();

        }



        return var2;

    }



//取消当前线程的watchdog

 private void finalizeRelease() {

        long threadId = Thread.currentThread().getId();

        R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);

        if (entry != null) {

            entry.release(threadId);

            if (entry.isReleased()) {

                //取消这个线程watchdog定时任务

                entry.getExtTask().cancel();

                this.expEntry.compareAndSet(entry, (Object)null);

                //从缓存watchdog线程的map中删除该线程

                this.entryCache.remove(threadId);

            }

        }



    }


3 锁的健壮性思考

1 业务没执行完,锁超时过期怎么办?

客户端加锁默认有效期30s,超过有效期后如果业务没执行完,还需要持有这把锁,r2m客户端提供了续期机制,也就是watchdog机制。

watchdog原理:客户端线程维度(UUID+线程ID,客户端维护一个MAP,key就是UUID+线程ID)的后台定时线程,获取锁成功后,如果客户端还持有当前锁,每隔10s(this.internalLockLeaseTime() / 3L),去延长锁的有效期(internalLockLeaseTime)

//watchdog核心机制 ,internalLockLeaseTime默认30s

private void extendLock(long holderId) {

        if (this.expEntry.get() != null) {

            R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);

            if (holderEntry != null) {

                 //每隔10s,如果当前线程持有锁,则续期30s

                if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {

                    Timeout task = this.timer().newTimeout((timeout) -> {

                        if (this.extendLockInternal(holderId)) {

                            this.extendLock(holderId);

                        }



                    }, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);

                    if (this.expEntry.get() != null) {

                        ((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);

                    }

                }



            }

        }

    }



 //执行续期lua脚本

 private boolean extendLockInternal(long threadId) {

        Object result;

        try {

           //只续期

            if (this.extendLockSha() != null) {

                result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));

            } else {

                result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));

            }

        } catch (Exception var5) {

            return false;

        }



        return Long.parseLong(result.toString()) == 1L;

    }


2 客户端宕机,锁如何释放?

分布式锁是有效期的,客户端宕机后,watchdog机制失效,锁过期自动失效。

3 redis分布式锁集群模式下缺陷

r2m集群模式,极端情况,master加锁成功,宕机,还未来得及同步到slave,主从切换,slave切换成master,可以继续加锁,对于非及其严格加锁场景,该方案可满足,属于AP;对于严格场景下的分布式锁,可采用基于zookeeper的分布式锁,属于CP,leader宕机,folllower选举时不可用。性能上redis更优。

4 锁的释放问题

注意锁的释放在finally中释放,必须由锁的持有者释放,不能由其他线程释放别人的锁,示例代码中lock放到try的外面。

点赞
收藏
评论区
推荐文章
peter peter
3年前
Go:分布式锁实现原理与最佳实践
分布式锁应用场景很多应用场景是需要系统保证幂等性的(如api服务或消息消费者),并发情况下或消息重复很容易造成系统重入,那么分布式锁是保障幂等的一个重要手段。另一方面,很多抢单场景或者叫交易撮合场景,如dd司机抢单或唯一商品抢拍等都需要用一把“全局锁”来解决并发造成的问题。在防止并发情况下造成库存超卖的场景,也常用分布式锁来解决。实现
灯灯灯灯 灯灯灯灯
3年前
面试百度和美团,竟然问我多线程安全问题,正好撞在我知识点上
解决多线程安全问题无非两个方法synchronized和lock具体原理以及如何获取锁AQS算法本篇文章主要讲了lock的原理就是AQS算法,还有个姊妹篇讲解synchronized的实现原理也是阿里经常问的,一定要看后面的文章,先说结论:非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查
Stella981 Stella981
3年前
Redis分布式锁的正确实现方式
前言分布式锁一般有三种实现方式:1.数据库乐观锁;2.基于Redis的分布式锁;3.基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Redis分布式锁。
Stella981 Stella981
3年前
Redis 分布式锁
一.什么是分布式锁   分布式锁其实可以理解为:控制分布式系统有序的去对共享资源进行操作,通过互斥来保持一致性。  举个不太恰当的例子:假设共享的资源就是一个房子,里面有各种书,分布式系统就是要进屋看书的人,分布式锁就是保证这个房子只有一个门并且一次只有一个人可以进,而且门只有一把钥匙。然后许多人要去看书,可以,排队,第一个人拿着钥匙把门打开
Easter79 Easter79
3年前
SpringBoot+Redis分布式锁:模拟抢单
本篇内容主要讲解的是redis分布式锁,这个在各大厂面试几乎都是必备的,下面结合模拟抢单的场景来使用她;本篇不涉及到的redis环境搭建,快速搭建个人测试环境,这里建议使用docker;本篇内容节点如下:jedis的nx生成锁如何删除锁模拟抢单动作(10w个人开抢)jedis的nx生成锁
Stella981 Stella981
3年前
Redis专题(3):锁的基本概念到Redis分布式锁实现
!(https://oscimg.oschina.net/oscnet/5017163e87f6300bb3bfbf64a9abd7815ba.png)近来,分布式的问题被广泛提及,比如分布式事务、分布式框架、ZooKeeper、SpringCloud等等。本文先回顾锁的概念,再介绍分布式锁,以及如何用Redis来实现分布式锁。
Wesley13 Wesley13
3年前
Java分布式锁之数据库实现
<divid"cnblogs\_post\_body"class"blogpostbody"<p之前的文章<ahref"http://www.cnblogs.com/garryfu/p/7978611.html"target"\_blank"《Java分布式锁实现》</a中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存
图解Redis和Zookeeper分布式锁 | 京东云技术团队
使用Redis还是Zookeeper来实现分布式锁,最终还是要基于业务来决定,可以参考以下两种情况:(1)如果业务并发量很大,Redis分布式锁高效的读写性能更能支持高并发(2)如果业务要求锁的强一致性,那么使用Zookeeper可能是更好的选择
linbojue linbojue
9个月前
SpringCloud原理解析与实战技巧
SpringCloud原理详解SpringCloud是一套基于SpringBoot的开源微服务架构构建工具集。它提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话和集群状态)环境中设计、构