TiKV 源码解析系列文章(十九)read index 和 local read 情景分析

Easter79
• 阅读 774

在上篇文章中,我们讲解了 Raft Propose 的 Commit 和 Apply 情景分析,相信大家对 TiKV 的 Raft 写流程有了大概了解。这篇文章将尝试向大家较为完整的介绍下 TiKV 中的 Raft 读流程的实现,特别是 read index 和 lease read(或称 local read)。关于 read index 和 lease read 的介绍和理论基础,请大家参阅 TiKV 功能介绍 - Lease Read 或者 Raft 论文第 6.4 节,不在这里赘述。

如何发起 Raft 读请求?

TiKV 的实现是分层的, 不同模块负责不同事情,下图直观地介绍了 TiKV 的模块的层级关系。

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析

TiKV 中所有 Raft 相关的逻辑都在  Raftstore 模块 ,如何发起 Raft 读请求就是说如何通过 Raftstore 发起读请求。Raftstore 对外(TXN/MVCC)提供接口叫做  RaftStoreRouter ,它提供了多方s法,但能供外面发起读写请求的只有一个,叫做  send_command

/// Routes messages to the raftstore. pub trait RaftStoreRouter<E>: Send + Clone where E: KvEngine, { /// Sends RaftCmdRequest to local store. fn send_command(&self, req: RaftCmdRequest, cb: Callback<E>) -> RaftStoreResult<()>; // Other methods are elided. }

所有的读写请求统一使用这个方法发起。当操作完成后,不管成功与否,都调用 cb: Callbck<E>,并将回复传入。

这篇文章接下来的部分将围绕图中黄色部分展开。

读请求有哪些?

既然这么问,肯定意味着 TiKV 中有多个不同类型的读请求。这就需要了解下 RaftCmdRequest 的构成了。TiKV 对外的请求都是 Protocol buffer message,RaftCmdRequest 定义在 kvproto/raft_cmd.proto,它包含了所有 TiKV 支持的读写请求。

message Request { CmdType cmd_type = 1; GetRequest get = 2; PutRequest put = 4; DeleteRequest delete = 5; SnapRequest snap = 6; PrewriteRequest prewrite = 7; DeleteRangeRequest delete_range = 8; IngestSSTRequest ingest_sst = 9; ReadIndexRequest read_index = 10; }

上面代码中加粗的就是 TiKV 目前支持的几种读请求。

  • GetRequest:读取一个 key value 对。

  • SnapRequest:获取当前时刻 RocksDB 的 snapshot。

  • ReadIndexRequest:获取当前时刻能保证线性一致的 Raft log index。

注意:不要把 ReadIndexRequst 和 Read Index 搞混。ReadIndexRequest 是一种读的请求,ReadIndex 是一种处理读请求的方式。

Raft 如何处理读请求?

我们以日常使用中最常见的 SnapRequest 为例,说一下 Read Index 和 Local read 的流程。

在  TXN/MVCC 层通过 send_command  发起一个读请求后,Raftstore 中对应的 PeerFsm (就是一个 Raft 状态机)会在  PeerFsm::handld_msgs  中收到该请求。

PeerFsm::propose_raft_command

fn propose_raft_command(&mut self, mut msg: RaftCmdRequest, cb: Callback<RocksEngine>) { // Irrelevant code is elided. match self.pre_propose_raft_command(&msg) { Ok(Some(resp)) => { cb.invoke_with_response(resp); return; } Err(e) => { cb.invoke_with_response(new_error(e)); return; } _ => (), } let mut resp = RaftCmdResponse::default(); if self.fsm.peer.propose(self.ctx, cb, msg, resp) { self.fsm.has_ready = true; } }

PeerFsm 在会将该请求传入 PeerFsm::propose_raft_command 做进一步处理。为了突出重点,无关代码已被删去。

  1. pre_propose_raft_command:检查能否处理该请求,包括:

    a. 检查 store id,确认是否发送到发送到了对的 TiKV;

    b. 检查 peer id,确认是否发送到了对的 Peer;

    c. 检查 leadership,确认当前 Peer 是否为 leader;

    d. 检查 Raft 任期,确认当前 leader 的任期是否符合请求中的要求;

    e. 检查 peer 初始化状态,确认当前 Peer 已经初始化,有完整数据;

    f. 检查 region epoch,确认当前 Region 的 epoch 符合请求中的要求。

  2. peer.propose: 当全部检查通过后,正式进入 Raft 的 Propose 环节。

Peer::propose

pub fn propose<T: Transport, C>( &mut self, ctx: &mut PollContext<T, C>, cb: Callback<RocksEngine>, req: RaftCmdRequest, mut err_resp: RaftCmdResponse, ) -> bool { // Irrelevant code is elided. let policy = self.inspect(&req); let res = match policy { Ok(RequestPolicy::ReadLocal) => { self.read_local(ctx, req, cb); return false; } Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb), Ok(RequestPolicy::ProposeTransferLeader) | Ok(RequestPolicy::ProposeConfChange) | Ok(RequestPolicy::ProposeNormal) => { // Irrelevant code is elided. } Err(e) => Err(e), }; }

由于 RaftCmdRequest 可能包含了多种请求,加上请求间的处理方式各有不同,所以我们需要判断下该如何处理。

  • inspect:判断请求类别和处理方式。让我们聚焦到读请求,处理方式总共有两种:

  • RequestPolicy::ReadLocal,也就是 local read,说明该 Peer 是 leader 且在 lease 内,可以直接读取数据。

  • RequestPolicy::ReadIndex,也就是 read index,说明该 Peer 是 leader 但不在 lease 内,或者该请求明确要求使用 read index 处理。

  • self.read_local:以 loca read 方式处理请求,直接读取 RocksDB。

  • self.read_index:以 read index 方式处理请求,询问一遍大多数节点,确保自己是合法 leader,然后到达或超过线性一致性的点(read index)后读取 RocksDB。

Peer::inspect

fn inspect(&mut self, req: &RaftCmdRequest) -> Result<RequestPolicy> { // Irrelevant code is elided. if req.get_header().get_read_quorum() { return Ok(RequestPolicy::ReadIndex); } if !self.has_applied_to_current_term() { return Ok(RequestPolicy::ReadIndex); } match self.inspect_lease() { LeaseState::Valid => Ok(RequestPolicy::ReadLocal), LeaseState::Expired | LeaseState::Suspect => { Ok(RequestPolicy::ReadIndex) } } } fn inspect_lease(&mut self) -> LeaseState { if !self.raft_group.raft.in_lease() { return LeaseState::Suspect; } // None means now. let state = self.leader_lease.inspect(None); if LeaseState::Expired == state { self.leader_lease.expire(); } state }

inspect 方法也不复杂,我们住逐行看一下:

  • req.get_header().get_read_quorum():该请求明确要求需要用 read index 方式处理,所以返回 ReadIndex。

  • self.has_applied_to_current_term():如果该 leader 尚未 apply 到它自己的 term,则使用 ReadIndex 处理,原因见  TiKV 功能介绍 - Lease Read 。

  • self.raft_group.raft.in_lease():如果该 leader 不在 raft 的 lease 内,说明可能出现了一些问题,比如网络不稳定,心跳没成功等。使用 ReadIndex 处理。

  • self.leader_lease.inspect(None):使用 CPU 时钟判断 leader 是否在 lease 内,如果在,则使用 ReadLocal 处理。

这判断总的来说就是,如果不确定能安全地读 RocksDB 就用 read index,否则大胆地使用 local read 处理。

多线程 local read

细心的读者可能已经发现,是否能 local read 关键在 leader 是否在 lease 内,而判断 lease 其实是不用经过 Raft 状态机的,所以我们能不能扩展下 lease,让它能在多线程间共享,特别是在 TXN/MVCC 层,这样读请求就能绕过 Raft 直接执行了。答案是可以的,而且 TiKV 已经实现了。话不多说,直接看代码。

impl<E> RaftStoreRouter<E> for ServerRaftStoreRouter<E> where E: KvEngine { fn send_command(&self, req: RaftCmdRequest, cb: Callback<E>) -> RaftStoreResult<()> { let cmd = RaftCommand::new(req, cb); if LocalReader::<RaftRouter<E>, E>::acceptable(&cmd.request) { self.local_reader.execute_raft_command(cmd); Ok(()) } else { let region_id = cmd.request.get_header().get_region_id(); self.router .send_raft_command(cmd) .map_err(|e| handle_send_error(region_id, e)) } } }

这个实现的有些取巧,我们直接把它做到 raftstore 的入口处,也就是 RaftStoreRouter 中。这里的 LocalReader 其实就是一个 cache,缓存了现有 leader 处理读请求时的一些状态。

  • acceptable(): 检查这个请求是否允许用 local read 方式处理。

  • execute_raft_command(): 尝试以 local read 方式处理该请求。

LocalReader::execute_raft_command

p``ub fn execute_raft_command(&self, cmd: RaftCommand<E>) { // Irrelevant code is elided. let region_id = cmd.request.get_header().get_region_id(); let mut executor = ReadExecutor::new( self.kv_engine.clone(), false, /* dont check region epoch */ true, /* we need snapshot time */ ); match self.pre_propose_raft_command(&cmd.request) { Ok(Some(delegate)) => { if let Some(resp) = delegate.handle_read(&cmd.request, &mut executor, &mut *metrics) { cmd.callback.invoke_read(resp); } else { self.redirect(cmd) } } Ok(None) => { if self.delegates.borrow().get(&region_id).is_some() { self.redirect(cmd); } let meta = self.store_meta.lock().unwrap(); match meta.readers.get(&region_id).cloned() { Some(reader) => { self.delegates.borrow_mut().insert(region_id, Some(reader)); } None => self.redirect(cmd), } } Err(e) => { let mut response = cmd_resp::new_error(e); if let Some(Some(ref delegate)) = self.delegates.borrow().get(&region_id) { cmd_resp::bind_term(&mut response, delegate.term); } cmd.callback.invoke_read(ReadResponse { response, snapshot: None }); self.delegates.borrow_mut().remove(&region_id); } } }

上述代码就是 Localreader 中处理请求的关键逻辑。注意为了突出重点,我们对该函数做了适当精简,完整代码请参考 链接。

  • pre_propose_raft_command(): 这个函数和 PeerFsm 中的同名函数做的事情是类似的,对 lease 的检查也在这里发生,如果所有检查通过,就会返回 Ok(Some(delegate)),用来执行读请求。

  • redirect():如果 Localreader 不确定如何处理,那它就用该方法将请求重新转发到 raftstore 中,一切以 raftstore 为准。

Localreader 中对 lease 的处理和 raftstore 略有不同,关键代码在  这里  和  这里 ,至于为什么可以这么写,在这就不说了,作为课后作业留给读者思考 :-p

最后

read index 和 local read 的源码阅读就到这结束了,希望读者看完后能了解并掌握 TiKV 处理读请求的逻辑。

💡 文中划线部分均有跳转,请点击【阅读原文】查看原版

TiKV 源码解析系列文章

(一)序

(二)raft-rs proposal 示例情景分析

(三)Prometheus(上)

(四)Prometheus(下)

(五)fail-rs 介绍

(六)raft-rs 日志复制过程分析

(七)gRPC Server 的初始化和启动流程

(八)grpc-rs 的封装与实现

(九)Service 层处理流程解析

(十)Snapshot 的发送和接收

(十一)Storage - 事务控制层

(十二)分布式事务

(十三)MVCC 数据读取

(十四)Coprocessor 概览

(十五)表达式计算框架

(十六)TiKV Coprocessor Executor 源码解析

(十七)raftstore 概览

(十八)Raft Propose 的 Commit 和 Apply 情景分析

TiKV 是一个开源的分布式事务 Key-Value 数据库,支持跨行 ACID 事务,同时实现了自动水平伸缩、数据强一致性、跨数据中心高可用和云原生等重要特性。作为一个基础组件,TiKV 可作为构建其它系统的基石。目前,TiKV 已用于支持分布式 HTAP 数据库—— TiDB 中,负责存储数据,并已被多个行业的领先企业应用在实际生产环境。2019 年 5 月,CNCF 的 TOC(技术监督委员会)投票决定接受 TiKV 晋级为孵化项目。

· 源码地址https://github.com/tikv/tikv

· 更多信息https://tikv.org

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析

文章转载自PingCAP。点击这里阅读原文了解更多

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析

扫描二维码联系我们!


CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux  Foundation,是非营利性组织。

_CNCF云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请长按以下二维码进行关注。_

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析

本文分享自微信公众号 - CNCF(lf_cncf)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k