关于Kafka-Connect:
(1) 是否可以动态添加已有数据的新表?
不可以,Kafka-Connect需要配置先行。如果是已有数据的新表,无法通过修改已有的kafka-connect配置进行新表的Snapshot初始化。
建议通过table white list功能,进行新表的snapshot。然后等到稳定后,再合并到同一个Kafka-connect配置里面。
(2) Kafka-Connect什么时候回触发Snapshot ?
参考 https://debezium.io/docs/connectors/mysql/
关于配置项:snapshot.mode
(A) initial
表示指定connector在logic server name 没有offset记录 的情况下,会运行一次snapshot。(默认值)
(B) when_needed
这表示connector在启动的时候,判断是否需要运行snapshot。当没有offsets数据,或者之前的binlog offset记录或者 GTID 在mysql服务器上面,找不到了。
配置binlog超过现有的可读binlog范围,则会强制触发一个snapshot。(手动修改 kafka-connect distribute版本的内建kafka对列:connect-offsets 的值,达到这个效果)
(C) never
nerver表示,不会使用snapshot操作,所以当启动connector的时候,会根据logical server name,会从binlog的最开始处进行数据读取。这需要谨慎地使用,因为只有在binlog数据包含了所有的数据库历史记录的时候,才会有效。
如果你不需要topics包含一致性历史快照,且只需要增量变化的数据,那么可以使用schema_only。
(D) schema_only
只对表结构进行snapshot,数据会按照增量的形式进行添加。
(E) schema_only_recovery
schema_only_recovery
用于修复一个现有的connector,因为崩溃 或者丢失数据库历史topic,或者是周期性执行了clean up操作,清空了数据库历史topic(database history topic). 它不会同步任何数据,只是将schema修复到与数据库一致。
(3) 遇到问题:
com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)
注意修改database.server.id 是唯一值,用来区分每一个mysql client instance
[database.server.id](https://www.oschina.net/action/GoToLink?url=http%3A%2F%2Fdatabase.server.id%2F)
random
A numeric ID of this database client, which must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL database cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
(4) 遇到问题:
org.apache.kafka.connect.errors.ConnectException: no viable alternative at.... antlr ... 遇到不能解析的ddl语句
解决:配置database.history.skip.unparseable.ddl = true
(5) 遇到问题:
Encountered change event for table xxx.xxxx whose schema isn't known to this connector
解决思路:配置 inconsistent.schema.handling.mode = warn
原理,在遇到表schema不一致的时候,内部维护的表结构与数据库表结构不一致的时候,默认会failed整个处理逻辑。
但是通过配置 handling.mode = warn 可以使Kafka对该表重新进行一次snapshot操作。
不过过程可能会对Kafka插入重复的数据。需要在写入端进行数据去重操作。
(6) 遇到问题:
Error recording the DDL statement(s) in the database history Kafka topic ori_xxx_history:0 using brokers at null
解决方法:
配置 snapshot.mode = schema_only_recovery 修复完成后,再改回来 when_needed
(7) 遇到问题:
ConnectException: OffsetStorageWriter is already flushing
解决方法:
使用restful PUT API 进行配置更新,出现背景是,Kafka Broker进行了维护重启,上一次的offset提交出现了异常。
重启Kafka_connect的task后, 应该可以解决。PS. 这个异常或许会自己自动恢复,或者需要重启。
(8) 由于修改了数据库链接配置,导致Kafka-Connect 的Debezium无法读取Binlog异常
解决方法:
shutdown 所有的Kafka-Connect 集群服务器,并且进行重启。故障消除。
(9) 遇到问题 binlog event 的列和数据库表的列数据不对应:
The binlog event does not contain expected number of columns; the internal schema representation is probably out of sync with the real database schema, or the binlog contains events recorded with binlog_row_image other than FULL or the table in question is an NDB table。
2020-6-23:先使用schemal_only进行修复,等2分钟后,然后再使用schema_only_recovery 进行增量同步。
(10) 遇到金融云数据库binlog归档异常:
Could not find existing binlog information while attempting schema only recovery snapshot
将配置改从SCHEMA_ONLY_RECOVERY 改成 "snapshot.mode": "SCHEMA_ONLY" 。问题修复。
Schema_only只能修复schema不完整的问题,然后再使用snapshot.mode 如 never或者 schema_only_recovery 读取binlog数据。
如果schema数据不完整,就会遇到错误:Encountered change event for table xxx.xxx.xxx whose schema isn't known to this
connector。当遇到这个错误的时候,就需要使用Schema_only 或者 schema_only_recovery 来修复元数据。
(11) 遇到binlog文件归档后丢失的情况:
Could not find existsing binlog information while attempting schema only recovery snapshot
实际上,是由于binlog服务器归档操作,导致binlog日志文件,在slave同步数据之前,被磁盘清理或转移。导致binlog tcp dump无法读取之前的binlog文件。
解决方法有2个:
(a) 让运维同事,把归档或转移掉的binlog文件,重新迁移到binlog的目录中,纳入mysql的管理。
(b) 更改配置文件的name 比如原配置name叫: "name": "debezium-mysql-xxx" ,名字可以改成 "name": "debezium-mysql-xxx2" 这样。
改了名字以后,kafka-connect会把这个配置认为是新的配置项,然后从当前可以读到的binlog position中读取,但是会丢失部分数据,需要重新拉取数据。
(12) 遇到change event isn't known to this connector:
org.apache.kafka.connect.errors.ConnectException: Encountered change event for table pay.comp_accountwhose schema isn't known to this connector
查询google,结果指向https://gitter.im/debezium/dev/archives/2018/04/05
里面说,history topic 数据异常,建议使用schema_only_recovery来修复数据。
后来发现是Mysql MASTER-MASTER架构的原因,MHA架构部署debezium,请参考:https://github.com/debezium/debezium-examples/tree/master/failover
(13) 解决Kafka-Connect History 数据过大的情况:
由于History存放是表结构变更的数据,而且过期时间为无限,所以时间久了,History的topic会变得非常大。
首先,配置 database.history.store.only.monitored.tables.ddl = true。该配置根据table filter,来减少记录到history topic的数据。
最后,如果history topic实在是过大,可以重新创建一个topic,并通过schema_only_recovery来重启connector。由于binlog offset信息存储在connector的topic里面,所以不会受到影响。
(14) Kafka-Connect配置binlog的读取位置:
可以,参考Url:https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database
通过修改对应配置offset.storage.topic的值,可以做到。
$ kafkacat -b localhost -C -t my_connect_offsets -f
'Partition(%p) %k %s\n'
Partition(11) [
"inventory-connector"
,{
"server"
:
"dbserver1"
}] {
"ts_sec"
:1530088501,
"file"
:
"mysql-bin.000003"
,
"pos"
:817,
"row"
:1,
"server_id"
:223344,
"event"
:2}
Partition(11) [
"inventory-connector"
,{
"server"
:
"dbserver1"
}] {
"ts_sec"
:1530168941,
"file"
:
"mysql-bin.000004"
,
"pos"
:3261,
"row"
:1,
"server_id"
:223344,
"event"
:2}
如上述所示,根据你的connector_name所示 对应的offset.storage.topic的key是这样的:["inventory-connector",{"server":"dbserver1"}]
对应的Kafka分区固定是11
最后的**Value**值是:{"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}
这里用到了一个kafkacat工具,当然,也可以通过python代码的方式进行写入。
(15) 遇到异常如下:
org.apache.kafka.connect.errors.ConnectException: Encountered change event for table xxxx. xxxx whose schema isn't known to this connector
判断:有可能是kafka-connect的source程序,对于使用history topic生成schema的元数据与生产库订阅binlog而来的数据的元数据不一致。
解决方法是,shutdown 当前connect配置,修改database.history.kafka.topic的名称。使用一个新的topic名称,一般就在原本topic名称后+1。
然后重启kafka-source程序。注意,当前使用的snapshot.mode为schema_only_recovery。问题得到解决。