Kafka Connect深度解读之单消息转换

Stella981
• 阅读 967

Kafka Connect是Apache Kafka®的一部分,在Kafka和其它系统之间提供可靠的、可扩展的分布式流式集成。Kafka Connect具有可用于许多系统的连接器,它是一个配置驱动的工具,不需要编码。

Kafka Connect API还提供了一个简单的接口,用于处理从源端通过数据管道到接收端的记录,该API称为单消息转换(SMT),顾名思义,当数据通过Kafka Connect连接器时,它可以对数据管道中的每条消息进行操作。

连接器分为源端或接收端,它们要么从Kafka上游的系统中提取数据,要么向Kafka的下游推送数据。这个转换可以配置为在任何一侧进行,源端连接器可以在写入Kafka主题之前对数据进行转换,接收端连接器也可以在将数据写入接收端之前对其进行转换。

转换的一些常见用途是:

  • 对字段重命名;
  • 掩蔽值;
  • 根据值将记录路由到主题;
  • 将时间戳转换或插入记录中;
  • 操作主键,例如根据字段的值设置主键。

Kafka自带了许多转换器,但是开发自定义的转换器也非常容易。

配置Kafka Connect的单消息转换

需要给转换器指定一个名字,该名字将用于指定该转换器的其他属性。例如,下面是JDBC源端利用RegexRouter转换器的配置片段,该转换器将固定字符串附加到要写入的主题的末尾:

{
  “name”: "jdbcSource", 
    "config": {
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  # —- other JDBC config properties —-
  "transforms": "routeRecords",
  "transforms.routeRecords.type":  "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeRecords.regex": "(.*)",
  "transforms.routeRecords.replacement": "$1-test"
[…]  }
}

该转换器被命名为routeRecords,且在后续中用于传递属性。注意,上面的示例显示了RegexRouter的两个配置属性:正则表达式regex和匹配组引用replacement。此设置将从JDBC源端获取表名,并将其加上-test后缀。根据转换器的功能不同,也可能会有不同的配置属性,具体可以参见相关的文档。

执行多次转换

有时需要执行多次转换,Kafka Connect支持定义多个转化器,他们在配置中链接在一起。这些消息按照在transforms属性中定义的顺序执行转换。

转换链示例

下面的转换使用ValueToKey转换器将值转换为主键,并使用ExtractField转换器仅使用ID整数值作为主键:

“transforms”:”createKey,extractInt”,
“transforms.createKey.type”:”org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.createKey.fields”:”c1”,
“transforms.extractInt.type”:”org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.extractInt.field”:”c1”

注意,使用上述$Key符号,会指定此转换将作用于记录的Key,如果要针对记录的Value,需要在这里指定$Value。最后ConnectRecord看起来像这样:

key        value
------------------------------
null       {"c1":{"int":100},"c2":{"string":"bar"}}

转换后:

key        value
------------------------------
100       {"c1":{"int":100},"c2":{"string":"bar"}}

转换器适合做什么,不适合做什么

转换是一个功能强大的概念,但仅应将其用于简单、有限的数据突变,不要调用外部API或状态存储,也不应尝试做任何繁琐的处理。应该使用Kafka Streams或KSQL之类的流处理解决方案在连接器之间的流处理层中处理更重的转换和数据集成。转换不能将一条消息拆分成多条消息,也不能关联其他流来进行扩充或进行任何类型的聚合,此类任务应留给流处理器。

单消息转换深入解读

下面深入地看下连接器如何处理数据。转换器被编译为JAR,并通过Connect工作节点的属性文件中的plugin.path属性,指定其可用于Kafka Connect,安装后就可以在连接器属性中配置转换。

配置和部署后,源端连接器将从上游系统接收记录,将其转换为ConnectRecord,然后将该记录传递给配置的转换器的apply()函数,然后等待返回记录。接收端连接器也是执行类似的过程,从Kafka主题读取并反序列化每个消息之后,将调用转换器的apply()方法,并将结果记录发送到目标系统。

如何开发单消息转换器

要开发将UUID插入每个记录的简单转换器,需要逐步执行以下的步骤。

apply方法是转换器的核心,这种转换支持带有模式和不带有模式的数据,因此每个都有一个转换:

@Override
  public R apply(R record) {
    if (operatingSchema(record) == null) {
      return applySchemaless(record);
    } else {
      return applyWithSchema(record);
    }
  }

  private R applySchemaless(R record) {
    final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);

    final Map<String, Object> updatedValue = new HashMap<>(value);

    updatedValue.put(fieldName, getRandomUuid());

    return newRecord(record, null, updatedValue);
  }

  private R applyWithSchema(R record) {
    final Struct value = requireStruct(operatingValue(record), PURPOSE);

    Schema updatedSchema = schemaUpdateCache.get(value.schema());
    if(updatedSchema == null) {
      updatedSchema = makeUpdatedSchema(value.schema());
      schemaUpdateCache.put(value.schema(), updatedSchema);
    }

    final Struct updatedValue = new Struct(updatedSchema);

    for (Field field : value.schema().fields()) {
      updatedValue.put(field.name(), value.get(field));
    }

    updatedValue.put(fieldName, getRandomUuid());

    return newRecord(record, updatedSchema, updatedValue);
  }

此转换器可以应用于记录的键或值,因此需要实现KeyValue子类,其扩展了InsertUuid类并实现apply方法调用的newRecord方法:

public static class Key<R extends ConnectRecord<R>> extends InsertUuid<R> {

    @Override
    protected Schema operatingSchema(R record) {
      return record.keySchema();
    }

    @Override
    protected Object operatingValue(R record) {
      return record.key();
    }

    @Override
    protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
      return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
    }

  }

  public static class Value<R extends ConnectRecord<R>> extends InsertUuid<R> {

    @Override
    protected Schema operatingSchema(R record) {
      return record.valueSchema();
    }

    @Override
    protected Object operatingValue(R record) {
      return record.value();
    }

    @Override
    protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
      return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
    }

  }

该转换器仅改变了模式和值,但是要注意其可以操纵ConnectRecord的所有部分:KeyValueKeyValue的模式、目标主题、目标分区和时间戳。

该转换器具有可选的参数,这些参数可以在运行时配置,并可以通过转换器类中重写的configure()方法访问:

 @Override
  public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    fieldName = config.getString(ConfigName.UUID_FIELD_NAME);

    schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
  }

如上所示,该Transformation接口很简单,它实现了一个apply()方法来接收ConnectRecord然后再返回ConnectRecord,它可以选择通过configure()方法接收参数。

接下来,编译此JAR并将其放入Connect工作节点中plugin.path指定的路径中。注意需要将转换器所依赖的任何依赖项打包到它的路径中或编译为胖JAR。然后在连接器配置中调用它,如下所示(注意$Value内部类约定,以指示此转换应作用于记录的值):

transforms=insertuuid
transforms.insertuuid.type=kafka.connect.smt.InsertUuid$Value
transforms.insertuuid.uuid.field.name="uuid"
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Kafka连接器深度解读之错误处理和死信队列
Kafka连接器是Kafka的一部分,是在Kafka和其它技术之间构建流式管道的一个强有力的框架。它可用于将数据从多个地方(包括数据库、消息队列和文本文件)流式注入到Kafka,以及从Kafka将数据流式传输到目标端(如文档存储、NoSQL、数据库、对象存储等)中。现实世界并不完美,出错是难免的,因此在出错时Kafka的管道能尽可能优雅地处理是最好的。一
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
可莉 可莉
3年前
2020Kafka最新最全面试题!
1、请说明什么是ApacheKafka?ApacheKafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。2、说说Kafka的使用场景?①异步处理②应用解耦③流量削峰④日志处理⑤消息通讯等。3、使用Kafka有什么优点和缺点?优点:①支持跨数据中心的消息复制;②单
Stella981 Stella981
3年前
2020Kafka最新最全面试题!
1、请说明什么是ApacheKafka?ApacheKafka是由Apache开发的一种发布订阅消息系统,它是一个分布式的、分区的和可复制的提交日志服务。2、说说Kafka的使用场景?①异步处理②应用解耦③流量削峰④日志处理⑤消息通讯等。3、使用Kafka有什么优点和缺点?优点:①支持跨数据中心的消息复制;②单