Bifrost 位点管理 之 异构中间件实现难点(1)

Stella981
• 阅读 669

Bifrost ---- 面向生产环境的 MySQL 同步到 Redis,MongoDB 等服务的异构中间件

源码下载 (你的点击 star 就是对 Bifrost 最大的支持!!!): Github Gitee

现在行里有不少的数据同步的工具,比如 datalink, 阿里的DTS , otter 等工具,甚至很多企业都在 otter 等开源工具里进行二次开发,实更了很多功能。

也和不少的开发人员,聊过类似工具的架构设计

大多数的开发人员架构设计 都是先用 canal binlog解析出来,每个表一个topic,然后再起额外的topic 消费线程,进行数据转换,再写入相对应的目标库里。其实这个设计没有毛病,而且看上去解偶设计非常得好。

咱在这里也先不讨论说一个表一个topic,等太多topic造成 kafka 磁盘IO性能下降明显 等问题。

只是有不少开发人员,对 kafka 集群的数据认为是绝对安全,并没有考滤到 kafka 数据集群崩溃等不可控的情况下造成数据丢失,

假如Kafka等工具出现严重不可恢复的情况下,怎么通知canal该从哪一个位点开始进行重新解析呢?当然完全重新来,也不是不可以,再全量一次。比如用阿里的datax,再来一次全量。

Bifrost 在开发设计的时候,考滤到性能,还有数据位点安全,还有资源最小化等多情况下,全局采用当前进程里的内存队列(当然在出现同步阻塞的时候,会自动启动文件队列功能)。全局采用内存队列,减少网络延迟,还有减少硬件资源的消耗。

Bifrost 全局有多个地方有对位点进行保存管理:

1. Bristol 模块里,在解析出 query,commit 事件的时候,对记录一次位点信息,保存到一个变量中,row_event,map_event 等其他事件的位点,并不会记录,这和binlog数据有关,map_event紧跟着row_event,row_event的解析又依懒map_event,假如 map_event之后,连接出错了,Bristol 需要自动重连等原因,假如重连接,没有map_event事件的数据,不就解析出错了么?

binlog.go

func (parser *eventParser) saveBinlog(event *EventReslut) { switch event.Header.EventType { case QUERY_EVENT,XID_EVENT: if event.BinlogFileName == "" { return } parser.binlogDump.Lock() parser.binlogFileName = event.BinlogFileName parser.binlogPosition = event.Header.LogPos parser.binlogTimestamp = event.Header.Timestamp parser.binlogDump.Unlock() break case ROTATE_EVENT: parser.binlogDump.Lock() parser.currentBinlogFileName = event.BinlogFileName parser.binlogDump.Unlock() default: break } }

2. 每个数据源列表数据里,每3秒,调用 Bristol 模块,拿Bristol 最后一次解析的 query,commit事件的位点.

1).将位点保存到 Db数据源在内存里对应的变量中,用于界面显示显示,还有正常关闭的时候,将配置文件刷到db.bifrost文件中

2). 将DB数据源位点保存到 leveldb中,每隔2秒刷一次leveldb数据到磁盘

func (db *db) saveBinlog(){ FileName,Position,Timestamp := db.binlogDump.GetBinlog() if FileName == ""{ return } //db.Lock() //保存位点,这个位点在重启 配置文件恢复的时候 //一个db有可能有多个channel,数据顺序不用担心,因为实际在重启的时候 会根据最小的 ToServerList 的位点进行自动替换 db.binlogDumpFileName,db.binlogDumpPosition,db.binlogDumpTimestamp = FileName,Position,Timestamp if db.DBBinlogKey == nil{ db.DBBinlogKey = getDBBinlogkey(db) } //db.Unlock() index := strings.IndexAny(FileName, ".")

BinlogFileNum,_ := strconv.Atoi(FileName[index+1:]) saveBinlogPosition(db.DBBinlogKey,BinlogFileNum,db.binlogDumpPosition) }

func saveBinlogPositionToStorageFromCache() { for { time.Sleep(2 * time.Second) for _, t := range TmpPositioin { t.Lock() for k, v := range t.Data { Val, _ := json.Marshal(v) storage.PutKeyVal([]byte(k) , Val) } t.Data = make(map[string]positionStruct,0) t.Unlock() } } }

3. 每个数据同步队列对应最后同步成功和最后进入队列的2个位点

1). 最后同步成功的位点

    这个就是字段上的意思 ,最后同步到目标库的位点

2). 最后进入队列的位点

    每次写数据到队列的时候,都会更新最后一条更新到当前队列的数据位点

 不管是 最后同步成功 还是 最后进入队列的位点 ,都会和数据源位点一样,一份存到内存变量中,另一份存到leveldb中,每隔2秒进行刷到磁盘

以上 除了 Bristol 模块为了解决 mysql 连接异常,造成的自动重连的位点,不会被保存到磁盘,另外2种位点都会刷到磁盘,而且还会刷到leveldb中,为了解决中途断点或者被kill -9 等非正常退出的情况下,数据恢复的时候,才知道是从哪一个位点开始重新解析!

位点恢复

即然每个同步都保存了有相对应的位点信息,那肯定 就能算出一个,所有同步中,最小的位点,然后进行从算出来最小的位点进行重新开始解析。实际有,也有个别情况,是不需要参与位点大写计算的或者说,有个别情况,在两者之前位点比较的时候取更大位点的

1).  不管数据源的位点,还是同步队列的最后成功的位点,或者 最后进入队列的位点,首先配置文件中的位点数据和leveldb中取出来位点数据做对比,取大值。

 刷到leveldb的数据 是每2秒刷一次,数据理论上是延迟的,除非非正常关闭的时候,leveldb 前2位的位点数据已经进入到 磁盘,但是同步文件中的数据没刷到db.bifrost 文件的时候进程就被强制退出了。所以这里取两者的大值

2). 数据同步队列的 最后成功的位点 和 最后进入队列的位点 相同步的时候,放弃位点比较。

这里解决的问题是  假如有2个表配置了同步到 ClickHouse 或者 Redis 等目标库,Table1 更新了一条数据当前位点是 1000,然后再也没更新过数据了,以后都是Table2 这个表的数据更新,当Table2 同步数据的位点到了 3000 的时候,重启了Bfirost,那这个时候,应该是按3000这个位点开始重新解析,对不对?而不是从1000这个位点开始对吧?

会进行 保存 最后进入队列的位点 的位点,就是为了解决这个问题的

3). 假如所有同步队列 的 最后成功的位点 和 最后进入队列的位点 都是相同的情况下,直接取 数据源对应的最后的位点. 

相关源码:

server/recovery.go

func recoveryData(data map[string]dbSaveInfo,isStop bool){}

为了实现这个数据位点安全及性能,在位点管理上,做了不少事情 ,并不是代表这个方案好,而是我们在设计系统上,需要考滤极端情况下,数据怎么恢复,重新拉取一次全量任务,其实也是很不错的选择!

GitHub : https://github.com/brokercap/Bifrost

Gitee : https://gitee.com/jc3wish/Bifrost

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Easter79 Easter79
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这