1、前言
本文是在《如何计算实时热门商品》[1]一文上做的扩展,仅在功能上验证了利用Flink消费Kafka数据,把处理后的数据写入到HBase的流程,其具体性能未做调优。此外,文中并未就Flink处理逻辑做过多的分析,只因引文(若不特殊说明,文中引文皆指《如何计算实时热门商品》一文)中写的很详细了,故仅给出博主调试犯下的错。文中若有错误,欢迎大伙留言指出,谢谢!
源码在GitHub上,地址:https://github.com/L-Wg/flinkExample;
环境:Flink 1.6+Kafka 1.1+HBase 1.2
OpenJDK 1.8+Maven 3.5.2
2、获取数据
本文是将Kafka作为数据源(目前业界比较流行的做法),数据的格式和引文的格式一致,数据类型为POJO。为添加源,一般是实现接口SourceFunction
DataStream<UserBehaviorSchema> dataStream=env.addSource(new FlinkKafkaConsumer010<UserBehaviorSchema>(
topic,
new UserBehaviorSerial(),
properties
).setStartFromEarliest());
其中,在代码中需指定的有:要消费的topic、数据序列化的对象以及配置,其中,配置可指定bootstrap.servers即可,其他配置按需设置。调用setStarFromEarliest()是为让Flink从头消费指定topic中数据,这样写的好处是:只要你Kafka topic中存在数据,测试时就不用重新往kafka里写数据了。当然调用该方法不仅仅是这个作用,其在业务上的使用需根据需求。此外,Flink中还有诸多指定消费kafka的方法,详情请见官网**[2]**。
这里值得说的一点是获取数据后,dataStream的值是不变的,不会因为做过flatmap等操作后就会改变。
3、数据转换
对Flink 代码的分析过程见引文,此处仅有以下几点需说明的:
1. 若是kafka中的数据是自己按照因为数据格式随机生成的,请不要按照博主代码中customWaterExtractor()类的写法去定义watermark和timestamp,因为代码中的currentTimeStamp的值可能也是随机的,所以就会造成程序不报错但是卡死等待的情况。
2. timestamp的值要和数据源中数据保持相同的数据级。
public static class customWaterExtractor implements AssignerWithPeriodicWatermarks<UserBehaviorSchema>{
private static final long serialVersionUID = 298015256202705122L;
private final long maxOutOrderness=3500;
private long currentTimeStamp=Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimeStamp-maxOutOrderness);
}
@Override
public long extractTimestamp(UserBehaviorSchema element, long previousElementTimestamp) {
// 此处需要注意的点:timestamp得到的值是否乘以1000转换为毫秒,需要根据消息中被指定为timestamp字段的单位。
long timeStamp=element.timestamp*1000;
currentTimeStamp=Math.max(timeStamp,currentTimeStamp);
return timeStamp;
}
}
3. 在返回的结果类ResultEvent中,使用sinking字段去保存HotTopN的名次,其默认值为0。
4、数据存储
本文中是通过extends RichSinkFunction来实现将数据写入HBase中,其中,@Override的invoke()方法是针对每条数据都会调用的,其余的open()、close()方法,从日志上看是不是针对每条数据都会调用。对open()方法用于打开链接,最好实现连接池避免链接过多,此处HBase的connection已自身实现不用单独实现。
数据写入HBase时,有两点建议:
1. 将数据写入HBase的表中时,最好先做好表的预分区工作,避免后期因为表的split造成性能下降以及维护上的困难;
2. 为加快HBase的查询速度,可以将制定字段作为HBase表的rowkey,文中是指定时间戳和排名作为表的rowkey,至于二级索引等暂不在此处讨论。
5、参考文献链接:
[1]http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html