MySQL Binlog同步HDFS的方案

Wesley13
• 阅读 855

这个问题我想只要是在做数据开发的,有一定数据实时性要求、需要做数据的增量同步的公司都会遇到。

19年的时候我曾经写过一点canal的文章。

现在你只要看这个文章就可以了。

这篇文章是一个读者推荐给我的,原地址:https://dwz.cn/XYdYpNiI,作者:混绅士

我对其中的一些内容做了修改。

关系型数据库和Hadoop生态的沟通越来越密集,时效要求也越来越高。本篇就来调研下实时抓取MySQL更新数据到HDFS。

初步调研了canal(Ali)+kafka connect+kafka、maxwell(Zendesk)+kafka和mysql_streamer(Yelp)+kafka。_这几个工具抓取MySQL的方式都是通过扫描binlog,模拟MySQL master和slave(Mysql Replication架构–解决了:数据多点备份,提高数据可用性;读写分流,提高集群的并发能力。(并非是负载均衡);让一些非实时的数据操作,转移到slaves上进行。)之间的协议来实现实时更新的_。

先科普下Canal

Canal简介

原理

MySQL Binlog同步HDFS的方案

Canal原理图

原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  • mysql master收到dump请求,开始推送(slave拉取,不是master主动push给slaves)binary log给slave(也就是canal)

  • canal解析binary log对象(原始为byte流)

架构

MySQL Binlog同步HDFS的方案

Canal架构图

组件说明:

  • server代表一个canal运行实例,对应于一个jvm

  • instance对应于一个数据队列(1个server对应1..n个instance)

而instance模块又由eventParser(数据源接入,模拟slave协议和master进行交互,协议解析)、eventSink(Parser和Store连接器,进行数据过滤,加工,分发的工作)、eventStore(数据存储)和metaManager(增量订阅&消费信息管理器)组成。

EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功 ),传送成功之后更新Log Position。流程图如下:

MySQL Binlog同步HDFS的方案

EventParser流程图

  • EventSink起到一个类似channel的功能,可以对数据进行_过滤、分发/路由(1:n)、归并(n:1)和加工_。EventSink是连接EventParser和EventStore的桥梁。

  • EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。

  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:

1. Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]
2. void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作

3. void ack(long batchId),顾名思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

增量订阅和消费之间的协议交互如下:

MySQL Binlog同步HDFS的方案

增量订阅和消费协议

canal的get/ack/rollback协议和常规的jms协议有所不同,_允许get/ack异步处理_,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)

  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

流式api设计示意图如下:

MySQL Binlog同步HDFS的方案

流式api

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性

  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取

  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor

  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

这个流式api是不是类似hdfs write在pipeline中传输packet的形式,先将packet放入dataQueue,然后向下游传输,此时将packet放入ackQueue等到下游返回的ack,这也是异步的。

HA机制

canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

  • canal server: 为了减少对mysql dump的请求,_不同_server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。

  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

server ha的架构图如下:
MySQL Binlog同步HDFS的方案

ha

大致步骤:

  • canal server要启动某个_canal instance_时都先向zookeeper_进行一次尝试启动判断_(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)

  • 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,_没有创建成功的canal instance就会处于standby状态_。

  • 一旦zookeeper发现canal server A创建的_instance节点_消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。

  • canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

Canal部署及使用

MySQL配置

canal同步数据需要扫描MySQL的binlog日志,而binlog默认是关闭的,需要开启,并且为了保证_同步数据的一致性_,使用的日志格式为_row-based replication(RBR)_,在my.conf中开启binlog,

[mysqld]log-bin=mysql-bin #添加这一行就okbinlog-format=ROW #选择row模式server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

更改my.conf之后,需要_重启MySQL_,重启的方式有很多找到合适自己的就行。

Canal配置

由上面的介绍得知Canal由ServerInstance组成,而Server中又可以包含很多个Instance,_一个Instance对应一个数据库实例_,则Canal将配置分为两类,一类是server的配置,名字为canal.properties,另一类是instance的配置,名字为instance.properties,一般会在conf目录下新建一个instance同名的目录,将其放入此目录中。

先介绍canal.properties中的几个关键属性:

MySQL Binlog同步HDFS的方案

MySQL Binlog同步HDFS的方案

下面看下instance.properties,这里的属性较少:

MySQL Binlog同步HDFS的方案

除了上面两个配置文件,conf目录下还有一个目录需要强调下,那就是spring目录,里面存放的是instance.xml配置文件,目前默认支持的instance.xml有_memory-instance.xml、file-instance.xml、default-instance.xml和group-instance.xml_。这里主要维护的增量订阅和消费的关系信息(解析位点和消费位点)。

对应的两个位点组件,目前都有几种实现:

  • memory (memory-instance.xml中使用)

  • zookeeper

  • mixed

  • file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上)

  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)

分别介绍下这几种配置的功能

  • memory-instance.xml:

_所有的组件(parser , sink , store)都选择了内存版模式_,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

特点:速度最快,依赖最少(不需要zookeeper)

场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境

  • file-instance.xml:

所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制。

特点:支持单机持久化

场景:生产环境,无HA需求,简单可用.

  • default-instance.xml:

所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.(所有组件持久化的内容只有位置信息吧?__??)

特点:支持HA

场景:生产环境,集群化部署.

  • group-instance.xml:

主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。

场景:分库业务。比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.

canal example 部署

  • 在需要同步的MySQL数据库中创建一个用户,用来replica数据,这里新建的用户名和密码都为canal,命令如下:

    CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON . TO 'canal'@'%' ;FLUSH PRIVILEGES;

Mysql创建canal用户并为其赋所需权限之后,需要对Canal的配置文件(canal.properties和instance.properties)进行设置。

canal.properties和instance.properties里采用默认配置即可(这里只是运行个样例,生产中可以参考具体的参数属性进行设置),

  • Canal配置好之后,启动Canal client(client的作用是将Canal里的解析的binlog日志固化到存储介质中)。

client组件Canal本身是不提供的,需要根据api进行开发,这里将官方提供的client代码打包成jar进行消费Canal信息。

canal HA配置

canal的HA机制是依赖zk来实现的,需要更改canal.properties文件,修改内容如下:

# zk集群地址canal.zkServers=10.20.144.51:2181# 选择记录方式canal.instance.global.spring.xml = classpath:spring/default-instance.xml

更改两台canal机器上instance实例的配置instance.properties,修改内容如下:

canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可canal.instance.master.address = 10.20.144.15:3306

配置好之后启动canal进程,在两台服务器上执行sh bin/startup.sh

client进行消费时,可以直接指定zookeeper地址和instance name,也可以让canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接。

maxwell简介

maxwell实时抓取mysql数据的原理也是基于binlog,和canal相比,maxwell更像是canal server + 实时client。(数据抽取 + 数据转换)

maxwell集成了kafka producer,直接从binlog获取数据更新并写入kafka,而canal则需要自己开发实时client将canal读取的binlog内容写入kafka中

maxwell特色:

  • 支持bootstrap启动,同步历史数据

  • 集成kafka,直接将数据落地到kafka

  • 已将binlog中的DML和DDL进行了模式匹配,将其解码为有schema的json(有利于后期将其重组为nosql支持的语言)
    {“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}

缺点:

  • 一个MySQL实例需要对应一个maxwell进程

  • bootstrap的方案使用的是select *

maxwell的配置文件只有一个_config.properties_,在home目录。其中除了需要配置mysql master的地址、kafka地址还需要配置一个用于存放maxwell相关信息的mysql地址,maxwell会把读取binlog关系的信息,如binlog name、position。

工具对比

以上是Canal的原理及部署,其余类似maxwell和mysql_streamer对mysql进行实时数据抓取的原理一样就不再进行一一介绍,这里只对他们进行下对比:

MySQL Binlog同步HDFS的方案

以上只是将mysql里的实时变化数据的binlog以同种形式同步到kafka,但要实时更新到hadoop还需要使用一个实时数据库来存储数据,并自定制开发将kafka中数据解析为nosql数据库可以识别的DML进行实时更新Nosql数据库,使其与MySQL里的数据实时同步。

基础架构

MySQL Binlog同步HDFS的方案

基础架构图

虚线框是可选的方案

方案对比

  • _方案1_使用阿里开源的Canal进行Mysql binlog数据的抽取,另需开发一个数据转换工具将从binlog中解析出的数据转换成自带schema的json数据并写入kafka中。而_方案2_使用maxwell可直接完成对mysql binlog数据的抽取和转换成自带schema的json数据写入到kafka中。

  • _方案1_中不支持表中已存在的历史数据进行同步,此功能需要开发(如果使用sqoop进行历史数据同步,不够灵活,会使结果表与原始表结构相同,有区别于数据交换平台所需的schema)。_方案2_提供同步历史数据的解决方案。

  • _方案1_支持HA部署,而_方案2_不支持HA

方案1和方案2的区别只在于kafka之前,当数据缓存到kafka之后,需要一个定制的数据路由组件来将自带schema的数据解析到目标存储中。
数据路由组件主要负责将kafka中的数据实时读出,写入到目标存储中。(如将所有日志数据保存到HDFS中,也可以将数据落地到所有支持jdbc的数据库,落地到HBase,Elasticsearch等。)

综上,
方案1需要开发的功能有:

  • bootstrap功能

  • 实时数据转换工具

  • 数据路由工具

方案2需要开发的功能有:

  • 数据路由工具

  • HA模块(初期可暂不支持HA,所以开发紧急度不高)

数据路由工具是两个方案都需要开发的,则我比较偏向于第二种方案,因为在初期试水阶段可以短期出成果,可以较快的验证想法,并在尝试中能够较快的发现问题,好及时的调整方案。即使方案2中maxwell最终不能满足需求,而使用canal的话,我们也可能将实时数据转换工具的数据输出模式与maxwell一致,这样初始投入人力开发的数据路由工具依然可以继续使用,而不需要重新开发。

把增量的Log作为一切系统的基础。后续的数据使用方,通过订阅kafka来消费log。

比如:
大数据的使用方可以将数据保存到Hive表或者Parquet文件给Hive或Spark查询;
提供搜索服务的使用方可以保存到Elasticsearch或HBase 中;
提供缓存服务的使用方可以将日志缓存到Redis或alluxio中;
数据同步的使用方可以将数据保存到自己的数据库中;
由于kafka的日志是可以重复消费的,并且缓存一段时间,各个使用方可以通过消费kafka的日志来达到既能保持与数据库的一致性,也能保证实时性;

{“database”:”test”,”table”:”e”,”type”:”update”,”ts”:1488857869,”xid”:8924,”commit”:true,”data”:{“id”:1,”m”:5.556666,”torvalds”:null},”old”:{“m”:5.55}}

{“database”:”test”,”table”:”e”,”type”:”insert”,”ts”:1488857922,”xid”:8932,”commit”:true,”data”:{“id”:2,”m”:4.2,”torvalds”:null}}

欢迎点赞+收藏+转发朋友圈素质三连

MySQL Binlog同步HDFS的方案 MySQL Binlog同步HDFS的方案

文章不错?点个【在看】吧!** 👇**

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python3:sqlalchemy对mysql数据库操作,非sql语句
Python3:sqlalchemy对mysql数据库操作,非sql语句python3authorlizmdatetime2018020110:00:00coding:utf8'''
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这