Trident API Overview
Trident 的核心数据模型是“流”(Stream),进行数据处理的时候,将数据作为一系列的batch(批)来进行。流被分割成多个partition分布在集群中的不同节点上来运行,而且对流的操作也是在流的各个partition上并行运行的。
Trident 中有五类操作:
针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输(each、map、faltmap、partitionAggregate等)
针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输(shuffle、partition等)
通过网络数据传输进行的聚合操作(Aggregate)
针对数据流的分组操作(groupBy)
融合与联结操作(merge、join)
Partition-local operations(本地分区操作)
本地分区操作是在每个batch partition上独立运行的操作,其中不涉及网络数据传输。
Functions
Functions函数负责接收一个input fields的集合并选择输出更多的tuple或者不输出tuple。输出tuple的fields会被添加到原始数据流的输入域中。如果一个function不输出tuple,那么原始的输入tuple就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入tuple。假设你有下面这样的函数:
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } }
假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每个 tuple 中包含有 "a"、"b"、"c" 三个域:
[1, 2, 3] [4, 1, 6] [3, 0, 8]
如果你运行这段代码:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
那么最终输出的结果 tuple 就会包含有 "a"、"b"、"c"、"d"4 个域,就像下面这样:
[1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]
Filters
过滤器负责判断输入的 tuple 是否需要保留。以下面的过滤器为例:
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2; } }
通过使用这段代码:
mystream.each(new Fields("b", "a"), new MyFilter())
就可以将下面这样带有 "a"、"b"、"c"三个域的 tuple
[1, 2, 3] [2, 1, 1] [2, 3, 4]
最终转化成这样的结果 tuple:
[2, 1, 1]
map and flatMap
map对stream中的tuple应用map函数,并返回结果流。这可以用于对tuples进行one-one(一对一)的转换(transformation)操作。
举例,如果你想将一个stream中的单词转换成大写,你可以定义一个mapping函数,如下:
public class UpperCase extends MapFunction { @Override public Values execute(TridentTuple input) { return new Values(input.getString(0).toUpperCase()); } }
mapping函数应用到stream上并生成一个由大写单词组成的stream。
mystream.map(new UpperCase());
flatMap与map类似,但是被用来对stream中的values进行one-to-many(一对多)操作,然后会将resulting elements(结果元素)flatten平压至一个新的stream中。
public class Split extends FlatMapFunction {
@Override
public Iterable
flatMap函数被应用在一个句子stream中,生成一个单词stream。
mystream.flatMap(new Split())
当然这些操作可以被连接在一起,可以从一个sentences stream中获得一个大写单词的stream
mystream.flatMap(new Split()).map(new UpperCase())
peek
peek可以用于在每个trident tuples流经stream时执行附加的操作(主要是为了输出一些信息,并不改变tuples或者fields)。这对于调试查看在管道中某个点上的tuples是有用的。
举例,接下来的代码将打印将单词转换成大写单词后groupBy操作的结果。
mystream.flatMap(new Split()).map(new UpperCase()).peek(new Consumer() { @Override public void accept(TridentTuple input) { System.out.println(input.getString(0)); } }).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
min and minBy
min和minBy操作将返回在trident stream中一个batch of tuples的每个partition的最小值。
假设,一个trident流中包含fields["device-id", "count"],下面是每个partition of tuples。
Partition 0: [123, 2] [113, 54] [23, 28] [237, 37] [12, 23] [62, 17] [98, 42]
Partition 1: [64, 18] [72, 54] [2, 28] [742, 71] [98, 45] [62, 12] [19, 174]
Partition 2: [27, 94] [82, 23] [9, 86] [53, 71] [74, 37] [51, 49] [37, 98]
minBy操作讲对每个partition中的fields named count取最小值,并且输出这个最小值的tuple。
mystream.minBy(new Fields("count"))
上面代码执行后各partition的结果:
Partition 0: [123, 2]
Partition 1: [62, 12]
Partition 2: [82, 23]
你可以看在org.apache.storm.trident.Stream类中查看min和minBy操作。
相关例子可以从下面的链接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology
max and maxBy
max和maxBy操作将返回在trident stream中一个batch of tuples的每个partition的最大值。
假设,一个trident流中包含fields["device-id", "count"],下面是每个partition of tuples。
maxBy操作讲对每个partition中的fields named count取最大值,并且输出这个最大值的tuple。
mystream.maxBy(new Fields("count"))
上面代码执行后各partition的结果:
Partition 0: [113, 54]
Partition 1: [19, 174]
Partition 2: [37, 98]
你可以看在org.apache.storm.trident.Stream类中查看max和maxBy操作。
相关例子可以从下面的链接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology
Windowing
相关window分类可参照window页面。
Trident流可以处理在某个相同窗口中batch的tuples,并将聚合结果发送给下一个操作。有两种windowing,分别是基于processing时间或者tuples数量:1.Tumbling window翻滚窗口2。Sliding window滑动窗口
Tumbling window
元组根据processing时间或者tuples数量分组在某个窗口中。任何元组只属于一个窗口。
Sliding window
Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window.
在每个滑动间隔中,元组在窗口和窗口滑动中分组。一个元组可以属于多个窗口。
Common windowing API
通用windowing API:
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)
windowConfig定义窗口的属性:window length和window sliding length。
WindowsStoreFactory用来储存接受到的tuples,并且聚合values。
partitionAggregate
partitionAggregate
会在一个batch of tuples的每个partition上执行function。与上面的函数不同,由partitionAggregate
发送出的tuples会将替换输入tuples。以下面这段代码为例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));
假如输入流中包含有 "a"、"b" 两个域并且有以下几个tuple块:
Partition 0: ["a", 1] ["b", 2]
Partition 1: ["a", 3] ["c", 8]
Partition 2: ["e", 1] ["d", 9] ["d", 10]
经过上面的代码之后,输出就会变成带有一个名为 "sum"的域的数据流,其中的tuple就是这样的:
Partition 0: [3]
Partition 1: [11]
Partition 2: [20]
Storm 有三个用于定义聚合器的接口:CombinerAggregator
,ReducerAggregator
以及 Aggregator
。
这是 CombinerAggregator
接口,整个CombinerAggregator
public interface CombinerAggregator
CombinerAggregator
会将带有一个field的一个单独的tuple返回作为输出。CombinerAggregator
会在每个输入tuple上运行初始化函数init,然后使用组合函数来组合所有输入的值。如果在某个分区中没有 tuple, CombinerAggregator
就会输出zero
方法的结果。例如,下面是 Count
的实现代码:
public class Count implements CombinerAggregator
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
如果你使用aggregate方法来代替partitionAggregate方法,你就会发现CombinerAggregators的好处了。在这种情况下,Trident会在发送tuple之前通过分区聚合操作来优化计算过程。
ReducerAggregator的接口:
public interface ReducerAggregator
整个ReducerAggregator
ReducerAggregator会使用init
方法来产生一个初始化的值,然后使用该值对每个输入tuple进行遍历,并最终生成并输出一个单独的tuple,这个tuple中就包含有我们需要的计算结果值。例如,下面是将Count定义为ReducerAggregator的代码:
public class Count implements ReducerAggregator
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
ReducerAggregator 同样可以用于 persistentAggregate,你会在后面看到这一点。
最常用的聚合器接口还是下面的 Aggregator接口:
public interface Aggregator
其中父接口Operation 还有两个方法
public interface Operation extends Serializable { void prepare(Map conf, TridentOperationContext context); void cleanup(); }
Aggregator
- prepare:只在启动topolopy时调用1次,如果设置了并发度,则在每一个partition中调用一次;
- cleanup:只在正常关闭topolopy时调用1次,如果设置了并发度,则在每一个partition中调用1次;
- init:对于global aggregation来说,每个批次调用1次。如果使用的时partitionAggregate则每个批次的每一个partition调用一次。对于Group Streams来说,每个相同的key组成的数据流调用一次。需要注意的是,如果使用的是事务型的spout,同时某个批次处理失败导致该批次消息重新发送,则在接下来处理时,initu有可能调用多次,所以init里面代码逻辑要支持同一批的重复调用。
- aggregate:每个tuple调用1次;
- complete:对于global aggregation来说,每个批次调用一次。如果使用的是partitionAggregate,则每一个批次的每一个partition 调用1次。对于Grouped Streams来说,每个相同的key组成的数据流调用1次。
需要特别注意的是:当使用没有group by 的Aggregator或者ReducerAggregation计算global aggretation时,每个batch的数据流只能在1个partition(相当于storm的task)中执行,即使设置了parallelismHint的并发数n>1,实际上也只能轮循的叫不同批次aggregation执行,也就相当于串行执行,所以反而浪费了资源。
使用aggregation做global aggregation无法启动并发,但是当配合CombinerAggregator
trident.newStream(“trident_spout”, new MySpout())
.partitionAggregate(new MyAggregator(), new Fields(“testoutput1”))
.parallelismHint(5)
.aggregate(new Fields(“out1”), new MyAggregator(), new Fields(“testoutput2”));
parallelismHint(n)要写在aggregate的前面,如果写在aggregate后面,将导致本地化操作的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。
把parallelismHint(n)写在aggregate的前面会导致spout同时开启n的并发度,因此要注意自己实现的spout类是否支持并发发送。
下面是使用Aggregator来进行Count的一个实现:
public class CountAgg extends BaseAggregator
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
某些时候你需要同时计算multiple aggregators时,使用如下的方式进行连接:
mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd()
这段代码会在每个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 ["count", "sum"] 域的单独的 tuple。
stateQuery and partitionPersist
stateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你可以参考 Trident State 文档 来了解如何使用它们。
projection
projection
方法只会保留操作中指定的域。如果你有一个带有 ["a", "b", "c", "d"] 域的数据流,通过执行这段代码:
mystream.project(new Fields("b", "d"))
就会使得输出数据流中只包含有 ["b","d"] 域。
Repartitioning operations
重分区操作会执行一个用来改变在不同的任务间分配 tuple 的方式的函数。在重分区的过程中分区的数量也可能会发生变化(例如,重分区之后的并行度就有可能会增大)。重分区会产生一定的网络数据传输。下面是重分区操作的几个函数:
- shuffle:通过随机轮询算法来重新分配目标区块的所有 tuple。
- broadcast:每个 tuple 都会被复制到所有的目标区块中。这个函数在 DRPC 中很有用 —— 比如,你可以使用这个函数来获取每个区块数据的查询结果。
- partitionBy:该函数会接收一组域作为参数,并根据这些域来进行分区操作。可以通过对这些域进行哈希化,并对目标分区的数量取模的方法来选取目标区块。partitionBy 函数能够保证来自同一组域的结果总会被发送到相同的目标区间。
- global:这种方式下所有的 tuple 都会被发送到同一个目标分区中,而且数据流中的所有的块都会由这个分区处理。
- batchGlobal:同一个 batch 块中的所有 tuple 会被发送到同一个区块中。当然,在数据流中的不同区块仍然会分配到不同的区块中。
- partition:这个函数使用自定义的分区方法,该方法会实现
org.apache.storm.grouping.CustomStreamGrouping
接口。
Aggregation operations
Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操作。其中,aggregate 方法会分别对数据流中的每个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的所有 batch 执行聚类处理,并将结果存入某个 state 中。
在数据流上执行 aggregate 方法会执行一个全局的聚类操作。在你使用 ReducerAggregator
或者 Aggregator
时,数据流首先会被重新分区成一个单独的分区,然后聚类函数就会在该分区上执行操作。而在你使用 CombinerAggregator
时,Trident 首先会计算每个分区的部分聚类结果,然后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成之后结束这个聚类过程。CombinerAggregator
比其他的聚合器的运行效率更高,在聚类时应该尽可能使用CombinerAggregator
。
下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子:
mystream.aggregate(new Count(), new Fields("count"))
与 partitionAggregate一样,aggregate的聚合器也可以进行链式处理。然而,如果你在一个处理链中同时使用了CombinerAggregator 和non-CombinerAggregator,Trident 就不能对部分聚类操作进行优化了。
想要了解更多使用 persistentAggregate 的方法,可以参考 Trident State 文档 一文。
Operations on grouped streams
通过对指定的域执行 partitionBy 操作,groupBy 操作可以将数据流进行重分区,使得相同的域的 tuple 分组可以聚集在一起。例如,下面是一个 groupBy 操作的示例:
如果你在分组数据流上执行聚合操作,聚合器会在每个分组(而不是整个区块)上运行。persistentAggregate 同样可以在一个分组数据里上运行,这种情况下聚合结果会存储在 MapState 中,其中的 key 就是分组的域名。
和其他操作一样,对分组数据流的聚合操作也可以以链式的方式执行。
Merges and joins
Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样:
topology.merge(stream1, stream2, stream3);
Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。
联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。
下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1”, “val2”] 域,另外一个流含有 [“x”, “val1”] 域:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
上面的例子会使用 "key" 和 "x" 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:
- join 域的列表。在这个例子里,输出的 "key" 域与 stream1 的 "key" 域以及 stream2 的 "x" 域对应。
- 来自所有流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,"a" 和 "b" 域与 stream1 的 "val1" 和 "val2" 域对应;而 "c" 域则与 stream2 的 “val1” 域相对应。
在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。
到这里你大概仍然会对如何进行窗口 join 操作感到困惑。窗口操作(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。
你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。