Kafka深入浅出(二)

Stella981
• 阅读 730

Kafka是一个分布式的流处理平台

Kafka深入浅出(二)

Kafka提供封装好的客户端以方便开发者连接服务器,目前常用的客户端有两种:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.1</version>
</dependency>


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.0.1</version>
</dependency>

上面的一种为官方目前推荐的(但貌似很多生产环境是用下边一种的),具体的历史原因第二种2013年就已经问世了,第一种是在2015年的时候才出现的,为了响应官方号召我将主要分析第一种客户端

1. Producers

Producer API 允许应用程序发布一个或多个流式数据给topic, 如果需要使用这个API,你需要在maven的pom配置文件中,增加一个依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>

我们会看到客户端(刚才添加这个东西会引入一个kafka客户端)中定义了一个Producer接口,并继承于Closeable接口(这个接口源于JDK1.5,通常用于需要关闭对象的资源,其中有一个close方法用于关闭,并抛出一个IOException),我们简单看一下类图,Apache已经为我们搞定了两个实现,如下:

Kafka深入浅出(二)

KafkaProducer

这个是Apache提供的一个Producer接口实现,同时标明了他是单例且线程安全的,使用方法大致如下:

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();

Properties的配置参数大家可以到 http://kafka.apache.org/documentation.html#producerconfigs 查看手册

MockProducer

这个是apache为开发测试提供的一个实现类,他提供了一些扩展的方便,可以方便我们开发的时候用于调试

2. Consumers

消费者的接口设计稍微复杂一些,应该是为了方便使用,同样apache也提供了两个实现,如果需要使用apache提供的客户端,同样需要引入如下包

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>

Consumer接口看以下类图:

Kafka深入浅出(二)

KafkaConsumer

需要注意的一点是这个KafkaConsumer实现,并不是线程安全的,大家在使用过程中,需要保证线程安全,

这个Consumer的实现,我们可以参考如下几种用法:

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset =%d, key =%s, value =%s", record.offset(), record.key(), record.value());
        }

上面这种用法设置了 enable.auto.commit = true, offsets 将会根据 auto.commit.interval.ms 的值,进行自动提交

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                insertIntoDb(buffer);   //这里插入数据库
                consumer.commitSync();
                buffer.clear();
            }
        }

以上这种用法关闭了 enable.auto.commit 参数,通过 consumer的commitSync方法来手动commit(Commit操作表示客户端通知服务端,信息已经收到,服务器会标记信息为 commited 状态),同时使用List做了一个缓存,来批量进行数据库写入操作

public class KafkaConsumerRunner implements Runnable {

    public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.3.37:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        int totleThread = 5;
        CountDownLatch countDown = new CountDownLatch(totleThread);

        List<KafkaConsumerRunner> consumers = startServer(props, totleThread, countDown);

        Thread.sleep(10000);

        stopServer(consumers);

        countDown.await();
        System.out.println("shutdown all done");
        System.exit(0);
    }

    private static void stopServer(List<KafkaConsumerRunner> consumers) {
        System.out.println("start shutdown");
        for(KafkaConsumerRunner consumerRunner : consumers){
            consumerRunner.shutdown();
        }
    }

    private static List<KafkaConsumerRunner> startServer(Properties props, int totleThread, CountDownLatch countDown) {
        ExecutorService threadPool = Executors.newFixedThreadPool(totleThread);
        List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();

        for (int i = 0; i < 5; i++) {
            KafkaConsumerRunner consumer = new KafkaConsumerRunner(countDown, props);
            threadPool.execute(consumer);
            consumers.add(consumer);
        }
        return consumers;
    }

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer = null;
    private CountDownLatch countDown;
    private Properties props;
    private KafkaConsumer<String, String> consumer = null;


    public KafkaConsumerRunner(CountDownLatch countDown, Properties props) {
        this.countDown = countDown;
        this.props = props;
        this.consumer = new KafkaConsumer(this.props);
    }
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic1", "demo"));
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        // // Handle new records
                        System.out.println(record.offset() + ": --" + record.value() + " " + Thread.currentThread().getId());
                    }
                    consumer.commitSync();//同步
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        countDown.countDown();
        closed.set(true);
    }

}

一个多线程使用的例子

MockConsumer

这应该还是一个用来测试的实现

更多的配置参数可以看 http://kafka.apache.org/documentation#producerapi , 我们同样可以实现自己的 partition.class 只要实现 Partitioner 接口就可以

public class RandomPartitioner implements Partitioner {

    private Random ran = new Random();

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        synchronized (this) {
            int partitionerNum = cluster.partitionsForTopic(topic).size();
            return ran.nextInt(partitionerNum);
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
}

然后通过 Properties 类配置到 Producer 就可以了

props.put("partitioner.class", RandomPartitioner.class.getName());

3. Connectors

Connectors 是Kafka 0.9 版本提供的一种新的工具,他的主要目的是将其他系统(平台)的数据导入、导出到Kafka的集群里,甚至可以把整个数据库导入到Topic中,同时也可以作为数据的二次存储、查询或者批量的进行离线分析

Kafka 已经提供了一个工具给我们玩耍:

第一步:建立一个测试文件

echo "this is large data" > test.txt

第二步:查看source的配置文件

vim config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

里面一个file参数是指定source(需要被导入kafka的数据)文件的位置

第三步:查看sink文件配置

vim config/connect-file-sink.properties 

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

这里配置了数据从kafka集群导出后需要保存的位置(目前为test.sink.txt)

第四部:运行kafka自带脚本

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

运行后会看到当前目录多了一个 text.sink.txt, 内容与 text.txt 一致,同时两个文件(数据)可以通过kafka保持同步

同时我们可以自己写一个配置文件链接数据库,例如

name=test-mssql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://127.0.0.1:1433;user=xd;password=xd;databaseName=scratchpad
mode=incrementing
incrementing.column.name=id
topic.prefix=test-mssql-jdbc-
table.whitelist=data01

同时我们可以通过实现 Connector 和 Task 接口来实现自己的应用,但很可惜我在客户端内没有找到这两个接口,只有在kafka源码中有这两个接口,官方的例子如下:

http://kafka.apache.org/documentation#connect\_developing

4. StreamProcessors

Stream是一个客户端库,用于处理和分析那些存储在Kafka上的数据,同时可以将结果写回到Kafka集群或者发送到外部的系统,其特点如下:

  • 设计一个简单且轻量级的客户端(貌似Stream很多都是用spark),可以方便的嵌入到已有的Java应用当中
  • 消息通讯层只依赖Kafka自己(不依赖其他东西),使用Kafka分区模型水平的拆分处理
  • 支持本地状态容错(不是十分清楚什么鸟意思)

目前对Stream了解并不多,后续跟进

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Peter20 Peter20
3年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这