Trident API(翻译)

Easter79
• 阅读 747

Trident API Overview

Trident 的核心数据模型是“流”(Stream),进行数据处理的时候,将数据作为一系列的batch(批)来进行。流被分割成多个partition分布在集群中的不同节点上来运行,而且对流的操作也是在流的各个partition上并行运行的。

Trident 中有五类操作:

  • 针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输(each、map、faltmap、partitionAggregate等)

  • 针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输(shuffle、partition等)

  • 通过网络数据传输进行的聚合操作(Aggregate)

  • 针对数据流的分组操作(groupBy)

  • 融合与联结操作(merge、join)

Partition-local operations(本地分区操作)

本地分区操作是在每个batch partition上独立运行的操作,其中不涉及网络数据传输。

Functions

Functions函数负责接收一个input fields的集合并选择输出更多的tuple或者不输出tuple。输出tuple的fields会被添加到原始数据流的输入域中。如果一个function不输出tuple,那么原始的输入tuple就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入tuple。假设你有下面这样的函数:

public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } }

假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每个 tuple 中包含有 "a"、"b"、"c" 三个域:

[1, 2, 3] [4, 1, 6] [3, 0, 8]

如果你运行这段代码:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

那么最终输出的结果 tuple 就会包含有 "a"、"b"、"c"、"d"4 个域,就像下面这样:

[1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]

Filters

过滤器负责判断输入的 tuple 是否需要保留。以下面的过滤器为例:

public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2; } }

通过使用这段代码:

mystream.each(new Fields("b", "a"), new MyFilter())

就可以将下面这样带有 "a"、"b"、"c"三个域的 tuple

[1, 2, 3] [2, 1, 1] [2, 3, 4]

最终转化成这样的结果 tuple:

[2, 1, 1]

map and flatMap

map对stream中的tuple应用map函数,并返回结果流。这可以用于对tuples进行one-one(一对一)的转换(transformation)操作。

举例,如果你想将一个stream中的单词转换成大写,你可以定义一个mapping函数,如下:

public class UpperCase extends MapFunction { @Override public Values execute(TridentTuple input) { return new Values(input.getString(0).toUpperCase()); } }

mapping函数应用到stream上并生成一个由大写单词组成的stream。

mystream.map(new UpperCase());

flatMap与map类似,但是被用来对stream中的values进行one-to-many(一对多)操作,然后会将resulting elements(结果元素)flatten平压至一个新的stream中。

public class Split extends FlatMapFunction { @Override public Iterable execute(TridentTuple input) { List valuesList = new ArrayList<>(); for (String word : input.getString(0).split(" ")) { valuesList.add(new Values(word)); } return valuesList; } }

flatMap函数被应用在一个句子stream中,生成一个单词stream。

mystream.flatMap(new Split())

当然这些操作可以被连接在一起,可以从一个sentences stream中获得一个大写单词的stream

mystream.flatMap(new Split()).map(new UpperCase())

peek

peek可以用于在每个trident tuples流经stream时执行附加的操作(主要是为了输出一些信息,并不改变tuples或者fields)。这对于调试查看在管道中某个点上的tuples是有用的。

举例,接下来的代码将打印将单词转换成大写单词后groupBy操作的结果。

mystream.flatMap(new Split()).map(new UpperCase()).peek(new Consumer() { @Override public void accept(TridentTuple input) { System.out.println(input.getString(0)); } }).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

min and minBy

min和minBy操作将返回在trident stream中一个batch of tuples的每个partition的最小值。

假设,一个trident流中包含fields["device-id", "count"],下面是每个partition of tuples。

Partition 0: [123, 2] [113, 54] [23, 28] [237, 37] [12, 23] [62, 17] [98, 42]

Partition 1: [64, 18] [72, 54] [2, 28] [742, 71] [98, 45] [62, 12] [19, 174]

Partition 2: [27, 94] [82, 23] [9, 86] [53, 71] [74, 37] [51, 49] [37, 98]

minBy操作讲对每个partition中的fields named count取最小值,并且输出这个最小值的tuple。

mystream.minBy(new Fields("count"))

上面代码执行后各partition的结果:

Partition 0: [123, 2]

Partition 1: [62, 12]

Partition 2: [82, 23]

你可以看在org.apache.storm.trident.Stream类中查看min和minBy操作。

相关例子可以从下面的链接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology

max and maxBy

max和maxBy操作将返回在trident stream中一个batch of tuples的每个partition的最大值。

假设,一个trident流中包含fields["device-id", "count"],下面是每个partition of tuples。

maxBy操作讲对每个partition中的fields named count取最大值,并且输出这个最大值的tuple。

mystream.maxBy(new Fields("count"))

上面代码执行后各partition的结果:

Partition 0: [113, 54]

Partition 1: [19, 174]

Partition 2: [37, 98]

你可以看在org.apache.storm.trident.Stream类中查看max和maxBy操作。

相关例子可以从下面的链接查看  TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology

Windowing

相关window分类可参照window页面。

Trident流可以处理在某个相同窗口中batch的tuples,并将聚合结果发送给下一个操作。有两种windowing,分别是基于processing时间或者tuples数量:1.Tumbling window翻滚窗口2。Sliding window滑动窗口

Tumbling window

元组根据processing时间或者tuples数量分组在某个窗口中。任何元组只属于一个窗口。

Sliding window

Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window.

在每个滑动间隔中,元组在窗口和窗口滑动中分组。一个元组可以属于多个窗口。

Common windowing API

通用windowing API:

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)

windowConfig定义窗口的属性:window length和window sliding length。

WindowsStoreFactory用来储存接受到的tuples,并且聚合values。

partitionAggregate

partitionAggregate会在一个batch of tuples的每个partition上执行function。与上面的函数不同,由partitionAggregate发送出的tuples会将替换输入tuples。以下面这段代码为例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));

假如输入流中包含有 "a"、"b" 两个域并且有以下几个tuple块:

Partition 0: ["a", 1] ["b", 2]

Partition 1: ["a", 3] ["c", 8]

Partition 2: ["e", 1] ["d", 9] ["d", 10]

经过上面的代码之后,输出就会变成带有一个名为 "sum"的域的数据流,其中的tuple就是这样的:

Partition 0: [3]

Partition 1: [11]

Partition 2: [20]

Storm 有三个用于定义聚合器的接口:CombinerAggregatorReducerAggregator 以及 Aggregator

这是 CombinerAggregator 接口,整个CombinerAggregator会在每批次结束时将combine的结果做一次emit:

public interface CombinerAggregator extends Serializable { T init(TridentTuple tuple);//每条tuple调用一次,对tuple做预处理。 T combine(T val1, T val2);//每条tuple调用一次,和之前的聚合值做combine。如果是partition中没有tuple则返回zero值作为combine的结果。 T zero();//当partition中没有数据流时,处理逻辑。 }

CombinerAggregator 会将带有一个field的一个单独的tuple返回作为输出。CombinerAggregator会在每个输入tuple上运行初始化函数init,然后使用组合函数来组合所有输入的值。如果在某个分区中没有 tuple, CombinerAggregator 就会输出zero 方法的结果。例如,下面是 Count 的实现代码:

public class Count implements CombinerAggregator { public Long init(TridentTuple tuple) { return 1L; }

public Long combine(Long val1, Long val2) {
    return val1 + val2;
}

public Long zero() {
    return 0L;
}

}

如果你使用aggregate方法来代替partitionAggregate方法,你就会发现CombinerAggregators的好处了。在这种情况下,Trident会在发送tuple之前通过分区聚合操作来优化计算过程。

ReducerAggregator的接口:

public interface ReducerAggregator extends Serializable { T init();//用来初始化reduce函数中的参数值curr。执行一次 T reduce(T curr, TridentTuple tuple);//每条tuple调用1次,与curr进行聚合操作。 }

整个ReducerAggregator会在每batch结束时将reduce的结果做一次emit。

ReducerAggregator会使用init方法来产生一个初始化的值,然后使用该值对每个输入tuple进行遍历,并最终生成并输出一个单独的tuple,这个tuple中就包含有我们需要的计算结果值。例如,下面是将Count定义为ReducerAggregator的代码:

public class Count implements ReducerAggregator { public Long init() { return 0L; }

public Long reduce(Long curr, TridentTuple tuple) {
    return curr + 1;
}

}

ReducerAggregator 同样可以用于 persistentAggregate,你会在后面看到这一点。

最常用的聚合器接口还是下面的 Aggregator接口:

public interface Aggregator extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); }

其中父接口Operation 还有两个方法

public interface Operation extends Serializable { void prepare(Map conf, TridentOperationContext context); void cleanup(); }

Aggregator接口,实现了上面五个方法:

  •  prepare:只在启动topolopy时调用1次,如果设置了并发度,则在每一个partition中调用一次;
  •  cleanup:只在正常关闭topolopy时调用1次,如果设置了并发度,则在每一个partition中调用1次;
  •  init:对于global aggregation来说,每个批次调用1次。如果使用的时partitionAggregate则每个批次的每一个partition调用一次。对于Group Streams来说,每个相同的key组成的数据流调用一次。需要注意的是,如果使用的是事务型的spout,同时某个批次处理失败导致该批次消息重新发送,则在接下来处理时,initu有可能调用多次,所以init里面代码逻辑要支持同一批的重复调用。
  •  aggregate:每个tuple调用1次;
  •  complete:对于global aggregation来说,每个批次调用一次。如果使用的是partitionAggregate,则每一个批次的每一个partition 调用1次。对于Grouped Streams来说,每个相同的key组成的数据流调用1次。

需要特别注意的是:当使用没有group by 的Aggregator或者ReducerAggregation计算global aggretation时,每个batch的数据流只能在1个partition(相当于storm的task)中执行,即使设置了parallelismHint的并发数n>1,实际上也只能轮循的叫不同批次aggregation执行,也就相当于串行执行,所以反而浪费了资源。

使用aggregation做global aggregation无法启动并发,但是当配合CombinerAggregator时候可以,Trident会把拓扑自动拆分成2个bolt,第一个bolt做局部聚合,类似于Hadoop中的map;第二个bolt通过接收网络传输过来的局部聚合值最后做一个全局聚合。自动优化后的第一个bolt是本地化操作,因此它可以和它前面或者后面挨着的所有each合并在同一个bolt里面。

trident.newStream(“trident_spout”, new MySpout())
.partitionAggregate(new MyAggregator(), new Fields(“testoutput1”))
.parallelismHint(5)
.aggregate(new Fields(“out1”), new MyAggregator(), new Fields(“testoutput2”));

parallelismHint(n)要写在aggregate的前面,如果写在aggregate后面,将导致本地化操作的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。

把parallelismHint(n)写在aggregate的前面会导致spout同时开启n的并发度,因此要注意自己实现的spout类是否支持并发发送。

下面是使用Aggregator来进行Count的一个实现:

public class CountAgg extends BaseAggregator { static class CountState { long count = 0; }

public CountState init(Object batchId, TridentCollector collector) {
    return new CountState();
}

public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
    state.count+=1;
}

public void complete(CountState state, TridentCollector collector) {
    collector.emit(new Values(state.count));
}

}

某些时候你需要同时计算multiple aggregators时,使用如下的方式进行连接:

mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd()

这段代码会在每个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 ["count", "sum"] 域的单独的 tuple。

stateQuery and partitionPersist

stateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你可以参考 Trident State 文档 来了解如何使用它们。

projection

projection 方法只会保留操作中指定的域。如果你有一个带有 ["a", "b", "c", "d"] 域的数据流,通过执行这段代码:

mystream.project(new Fields("b", "d"))

就会使得输出数据流中只包含有 ["b","d"] 域。

Repartitioning operations

重分区操作会执行一个用来改变在不同的任务间分配 tuple 的方式的函数。在重分区的过程中分区的数量也可能会发生变化(例如,重分区之后的并行度就有可能会增大)。重分区会产生一定的网络数据传输。下面是重分区操作的几个函数:

  1. shuffle:通过随机轮询算法来重新分配目标区块的所有 tuple。
  2. broadcast:每个 tuple 都会被复制到所有的目标区块中。这个函数在 DRPC 中很有用 —— 比如,你可以使用这个函数来获取每个区块数据的查询结果。
  3. partitionBy:该函数会接收一组域作为参数,并根据这些域来进行分区操作。可以通过对这些域进行哈希化,并对目标分区的数量取模的方法来选取目标区块。partitionBy 函数能够保证来自同一组域的结果总会被发送到相同的目标区间。
  4. global:这种方式下所有的 tuple 都会被发送到同一个目标分区中,而且数据流中的所有的块都会由这个分区处理。
  5. batchGlobal:同一个 batch 块中的所有 tuple 会被发送到同一个区块中。当然,在数据流中的不同区块仍然会分配到不同的区块中。
  6. partition:这个函数使用自定义的分区方法,该方法会实现 org.apache.storm.grouping.CustomStreamGrouping 接口。

Aggregation operations

Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操作。其中,aggregate 方法会分别对数据流中的每个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的所有 batch 执行聚类处理,并将结果存入某个 state 中。

在数据流上执行 aggregate 方法会执行一个全局的聚类操作。在你使用 ReducerAggregator 或者 Aggregator 时,数据流首先会被重新分区成一个单独的分区,然后聚类函数就会在该分区上执行操作。而在你使用 CombinerAggregator 时,Trident 首先会计算每个分区的部分聚类结果,然后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成之后结束这个聚类过程。CombinerAggregator 比其他的聚合器的运行效率更高,在聚类时应该尽可能使用CombinerAggregator

下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子:

mystream.aggregate(new Count(), new Fields("count"))

与 partitionAggregate一样,aggregate的聚合器也可以进行链式处理。然而,如果你在一个处理链中同时使用了CombinerAggregator 和non-CombinerAggregator,Trident 就不能对部分聚类操作进行优化了。

想要了解更多使用 persistentAggregate 的方法,可以参考 Trident State 文档 一文。

Operations on grouped streams

通过对指定的域执行 partitionBy 操作,groupBy 操作可以将数据流进行重分区,使得相同的域的 tuple 分组可以聚集在一起。例如,下面是一个 groupBy 操作的示例:

Trident API(翻译)

如果你在分组数据流上执行聚合操作,聚合器会在每个分组(而不是整个区块)上运行。persistentAggregate 同样可以在一个分组数据里上运行,这种情况下聚合结果会存储在 MapState 中,其中的 key 就是分组的域名。

和其他操作一样,对分组数据流的聚合操作也可以以链式的方式执行。

Merges and joins

Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样:

topology.merge(stream1, stream2, stream3);

Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。

联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。

下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1”, “val2”] 域,另外一个流含有 [“x”, “val1”] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上面的例子会使用 "key" 和 "x" 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:

  1. join 域的列表。在这个例子里,输出的 "key" 域与 stream1 的 "key" 域以及 stream2 的 "x" 域对应。
  2. 来自所有流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,"a" 和 "b" 域与 stream1 的 "val1" 和 "val2" 域对应;而 "c" 域则与 stream2 的 “val1” 域相对应。

在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。

到这里你大概仍然会对如何进行窗口 join 操作感到困惑。窗口操作(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。

你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Easter79 Easter79
3年前
Trident学习笔记(一)
1\.Trident入门Trident\ 三叉戟 storm高级抽象,支持有状态流处理; 好处是确保消费被处理一次; 以小批次方式处理输入流,得到精准一次性处理 ; 不再使用bolt,使用functions、aggreates、filters以及states。 TridentT
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
5
获赞
1.2k