Kafka Producer端封装自定义消息

Stella981
• 阅读 821

这篇文章主要讲kafka producer端的编程,通过一个应用案例来描述kafka在实际应用中的作用。如果你还没有搭建起kafka的开发环境,可以先参考:<kafka开发环境搭建>

首先描述一下应用的情况:一个站内的搜索引擎,运营人员想知道某一时段,各类用户对商品的不同需求。通过对这些数据的分析,从而获得更多有价值的市场分析报表。这样的情况,就需要我们对每次的搜索进行记录,当然,不太可能使用数据库区记录这些信息(数据库存这些数据我会觉得是种浪费,个人意见)。最好的办法是存日志。然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。

完成上述一系列的工作,可以按照以下步骤来执行:

  1. 搭建kafka系统运行环境。

  2. 设计数据存储格式(按照自定义格式来封装消息)

  3. Producer端获取真实数据(搜索记录),并对数据按上述2中设计的格式进行编码。

  4. Producer将已经编码的数据发送到broker上,在broker上进行存储(分配存储策略)。

  5. Consumer端从broker中获取数据,分析计算。

如果用淘宝数据服务平台的架构来匹配这一过程,broker就好比数据中心中存储的角色,producer端基本是放在了应用中心的开放API中,consumer端则一般用于数据产品和应用中心的获取数据中使用。

Kafka Producer端封装自定义消息

今天主要写的是2、3、4三个步骤。我们先看第二步。为了快速实现,这里就设计一个比较简单的消息格式,复杂的原理和这个一样。

Kafka Producer端封装自定义消息

用四个字段分别表示消息的ID、用户、查询关键词和查询时间。当然你如果要设计的更复杂,可以加入IP这些信息。这些用java写就是一个简单的pojo类,这是getter/setter方法即可。由于在封转成kafka的message时需要将数据转化成bytep[]类型,可以提供一个序列化的方法。我在这里直接重写toString了:

@Override
    public String toString() {
        String keyword = "[info kafka producer:]";
        keyword = keyword + this.getId() + "-" + this.getUser() + "-"
                + this.getKeyword() + "-" + this.getCurrent();
        return keyword;
    }

这样还没有完成,这只是将数据格式用java对象表现出来,解析来要对其按照kafka的消息类型进行封装,在这里我们只需要实现Encoder类即可:

package org.gfg.kafka.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.message.Message;

public class KeywordMessage implements kafka.serializer.Encoder<Keyword>{
    
    public static final Logger LOG=LoggerFactory.getLogger(Keyword.class); 
    
    @Override
    public Message toMessage(Keyword words) {
        LOG.info("start in encoding...");
        return new Message(words.toString().getBytes());
    }

}

注意泛型和返回类型即可。这样KeywordMessage就是一个可以被kafka发送和存储的对象了。

接下来,我们可以编写一部分producer,获取业务系统的数据。要注意,producer数据的推送到broker的,所以发起者还是业务系统,下面的代码就能直接发送一次数据,注释都很详细:

/**配置producer必要的参数*/
        Properties props = new Properties();
        props.put("zk.connect", "192.168.10.11:2181");
        /**选择用哪个类来进行序列化*/
        props.put("serializer.class", "org.gfg.kafka.message.KeywordMessage");
        props.put("zk.connectiontimeout.ms", "6000");
        ProducerConfig config=new ProducerConfig(props);
        
        /**制造数据*/
        Keyword keyword=new Keyword();
        keyword.setUser("Chenhui");
        keyword.setId(0);
        keyword.setKeyword("china");
        
        List<Keyword> msg=new ArrayList<Keyword>();
        msg.add(keyword);
        
        /**构造数据发送对象*/
        Producer<String, Keyword> producer=new Producer<String, Keyword>(config);        
        ProducerData<String,Keyword> data=new ProducerData<String, Keyword>("test", msg);
        producer.send(data);

发送完之后,我们可以用bin目录下的kafka-console-consumer来看发送的结果(当然现在用的topic是test)。可以用命令:

./kafka-console-consumer –zookeeper 192.168.10.11:2181 –topic test –from-beginning

Kafka Producer端封装自定义消息

如果是在使用zookeeper搭建分布式的情况下(zookeeper based broker discovery),我们可以执行第三个步骤,用编码来实现partition的分配策略。这里需要我们实现Partitioner对象:

package org.gfg.kafka.partitioner;

import org.gfg.kafka.message.Keyword;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.producer.Partitioner;

/**
 * 
 * @author Chen.Hui
 * 
 */
public class ProducerPartitioner implements Partitioner<String> {
    
    public static final Logger LOG=LoggerFactory.getLogger(Keyword.class); 
    
    @Override
    public int partition(String key, int numPartitions) {
        LOG.info("ProducerPartitioner key:"+key+" partitions:"+numPartitions);
        return key.length() % numPartitions;
    }

}

在上面的partition方法中,值得注意的是,key我们是在构造数据发送对象时设置的,这个key是区分存储的关键,比如我想将我的数据按照不同的用户类别存储。Partition的好处是可以并发的获取同类数据,提高效率,具体可以看之前的文章。

所以在第二部时的producer代码需要有所改进:

/**选择用哪个类来进行设置partition*/
        props.put("partitioner.class", "org.gfg.kafka.partitioner.ProducerPartitioner");

        ProducerData<String,Keyword> data=new ProducerData<String, Keyword>("test","developer", msg);

增加了对partition的配置,并且修改了ProducerData的参数,其中,中间的就是key,如果不设置partition,kafka则随机的向broker中发送请求。我们可以看一眼ProducerData的源码:

package kafka.javaapi.producer

import scala.collection.JavaConversions._

class ProducerData[K, V](private val topic: String,
                         private val key: K,
                         private val data: java.util.List[V]) {

  def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)

  def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))

  def getTopic: String = topic

  def getKey: K = key

  def getData: java.util.List[V] = data
}

至此,producer端的事情都做完了,当然这就是个demo,还有很多性能上的优化需要做,当然有了这个基础,我们就能将数据存储到broker上,下一步,就是用consumer来消费这些日志,形成有价值的数据产品。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
10小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(