Bifrost ---- 面向生产环境的 MySQL 同步到 Redis,MongoDB 等服务的异构中间件
源码下载 (你的点击 star 就是对 Bifrost 最大的支持!!!): Github Gitee
现在行里有不少的数据同步的工具,比如 datalink, 阿里的DTS , otter 等工具,甚至很多企业都在 otter 等开源工具里进行二次开发,实更了很多功能。
也和不少的开发人员,聊过类似工具的架构设计
大多数的开发人员架构设计 都是先用 canal binlog解析出来,每个表一个topic,然后再起额外的topic 消费线程,进行数据转换,再写入相对应的目标库里。其实这个设计没有毛病,而且看上去解偶设计非常得好。
咱在这里也先不讨论说一个表一个topic,等太多topic造成 kafka 磁盘IO性能下降明显 等问题。
只是有不少开发人员,对 kafka 集群的数据认为是绝对安全,并没有考滤到 kafka 数据集群崩溃等不可控的情况下造成数据丢失,
假如Kafka等工具出现严重不可恢复的情况下,怎么通知canal该从哪一个位点开始进行重新解析呢?当然完全重新来,也不是不可以,再全量一次。比如用阿里的datax,再来一次全量。
Bifrost 在开发设计的时候,考滤到性能,还有数据位点安全,还有资源最小化等多情况下,全局采用当前进程里的内存队列(当然在出现同步阻塞的时候,会自动启动文件队列功能)。全局采用内存队列,减少网络延迟,还有减少硬件资源的消耗。
Bifrost 全局有多个地方有对位点进行保存管理:
1. Bristol 模块里,在解析出 query,commit 事件的时候,对记录一次位点信息,保存到一个变量中,row_event,map_event 等其他事件的位点,并不会记录,这和binlog数据有关,map_event紧跟着row_event,row_event的解析又依懒map_event,假如 map_event之后,连接出错了,Bristol 需要自动重连等原因,假如重连接,没有map_event事件的数据,不就解析出错了么?
binlog.go
func (parser *eventParser) saveBinlog(event *EventReslut) { switch event.Header.EventType { case QUERY_EVENT,XID_EVENT: if event.BinlogFileName == "" { return } parser.binlogDump.Lock() parser.binlogFileName = event.BinlogFileName parser.binlogPosition = event.Header.LogPos parser.binlogTimestamp = event.Header.Timestamp parser.binlogDump.Unlock() break case ROTATE_EVENT: parser.binlogDump.Lock() parser.currentBinlogFileName = event.BinlogFileName parser.binlogDump.Unlock() default: break } }
2. 每个数据源列表数据里,每3秒,调用 Bristol 模块,拿Bristol 最后一次解析的 query,commit事件的位点.
1).将位点保存到 Db数据源在内存里对应的变量中,用于界面显示显示,还有正常关闭的时候,将配置文件刷到db.bifrost文件中
2). 将DB数据源位点保存到 leveldb中,每隔2秒刷一次leveldb数据到磁盘
func (db *db) saveBinlog(){ FileName,Position,Timestamp := db.binlogDump.GetBinlog() if FileName == ""{ return } //db.Lock() //保存位点,这个位点在重启 配置文件恢复的时候 //一个db有可能有多个channel,数据顺序不用担心,因为实际在重启的时候 会根据最小的 ToServerList 的位点进行自动替换 db.binlogDumpFileName,db.binlogDumpPosition,db.binlogDumpTimestamp = FileName,Position,Timestamp if db.DBBinlogKey == nil{ db.DBBinlogKey = getDBBinlogkey(db) } //db.Unlock() index := strings.IndexAny(FileName, ".")
BinlogFileNum,_ := strconv.Atoi(FileName[index+1:]) saveBinlogPosition(db.DBBinlogKey,BinlogFileNum,db.binlogDumpPosition) }
func saveBinlogPositionToStorageFromCache() { for { time.Sleep(2 * time.Second) for _, t := range TmpPositioin { t.Lock() for k, v := range t.Data { Val, _ := json.Marshal(v) storage.PutKeyVal([]byte(k) , Val) } t.Data = make(map[string]positionStruct,0) t.Unlock() } } }
3. 每个数据同步队列对应最后同步成功和最后进入队列的2个位点
1). 最后同步成功的位点
这个就是字段上的意思 ,最后同步到目标库的位点
2). 最后进入队列的位点
每次写数据到队列的时候,都会更新最后一条更新到当前队列的数据位点
不管是 最后同步成功 还是 最后进入队列的位点 ,都会和数据源位点一样,一份存到内存变量中,另一份存到leveldb中,每隔2秒进行刷到磁盘
以上 除了 Bristol 模块为了解决 mysql 连接异常,造成的自动重连的位点,不会被保存到磁盘,另外2种位点都会刷到磁盘,而且还会刷到leveldb中,为了解决中途断点或者被kill -9 等非正常退出的情况下,数据恢复的时候,才知道是从哪一个位点开始重新解析!
位点恢复
即然每个同步都保存了有相对应的位点信息,那肯定 就能算出一个,所有同步中,最小的位点,然后进行从算出来最小的位点进行重新开始解析。实际有,也有个别情况,是不需要参与位点大写计算的或者说,有个别情况,在两者之前位点比较的时候取更大位点的
1). 不管数据源的位点,还是同步队列的最后成功的位点,或者 最后进入队列的位点,首先配置文件中的位点数据和leveldb中取出来位点数据做对比,取大值。
刷到leveldb的数据 是每2秒刷一次,数据理论上是延迟的,除非非正常关闭的时候,leveldb 前2位的位点数据已经进入到 磁盘,但是同步文件中的数据没刷到db.bifrost 文件的时候进程就被强制退出了。所以这里取两者的大值
2). 数据同步队列的 最后成功的位点 和 最后进入队列的位点 相同步的时候,放弃位点比较。
这里解决的问题是 假如有2个表配置了同步到 ClickHouse 或者 Redis 等目标库,Table1 更新了一条数据当前位点是 1000,然后再也没更新过数据了,以后都是Table2 这个表的数据更新,当Table2 同步数据的位点到了 3000 的时候,重启了Bfirost,那这个时候,应该是按3000这个位点开始重新解析,对不对?而不是从1000这个位点开始对吧?
会进行 保存 最后进入队列的位点 的位点,就是为了解决这个问题的
3). 假如所有同步队列 的 最后成功的位点 和 最后进入队列的位点 都是相同的情况下,直接取 数据源对应的最后的位点.
相关源码:
server/recovery.go
func recoveryData(data map[string]dbSaveInfo,isStop bool){}
为了实现这个数据位点安全及性能,在位点管理上,做了不少事情 ,并不是代表这个方案好,而是我们在设计系统上,需要考滤极端情况下,数据怎么恢复,重新拉取一次全量任务,其实也是很不错的选择!
GitHub : https://github.com/brokercap/Bifrost