Storm可以保证每一个从spout发出的消息能被完全处理。本章描述storm是如何完成这个保证以及用户如何从storm的可靠性能力获益的。
消息“完全处理”的含义
一个tuple从spout发出后可能会触发成千上万的tuple基于它而创建。以work count的topology为例考虑下:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
22133,
"sentence_queue",
new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
这个topology从Kestrel queue中读取出句子(sentences)_【注:Kestrel为一个scala实现的消息队列组件,参考https://github.com/twitter-archive/kestrel】_, 拆分这些句子为单词,然后发射每一个单词,为了统计这些单词出现的次数。一个从spout发出的tuple会触发很多的基于他的tuple被创建,如代表句子中的每个单词的tuple,代表每个单词计数的tuple。tuple消息呈树的结构:
当tuple消息树已经遍历完,并且每一个消息都被处理,则storm认为tuple从spout发出后被完全处理。当tuple消息树上的消息在一个指定的超时时间内没有被完全处理则认为tuple处理失败。这个超时时间可以在topology创建时通过配置 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 来指定,默认为30秒。
当一个消息被“完全处理”或“未完全处理”会发生什么呢
讨论这个问题之前,我们先来了解下tuple从spout发出后的生命周期。参考spout的接口实现:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首先,storm通过调用Spout的nextTuple()方法请求一个tuple,Spout通过open方法中提供的 SpoutOutputCollector 发射一个tuple到输出流。当发射tuple时spout提供了一个消息ID用于后续唯一标识这个tuple,例如KestrelSpout从kestrel消息队列中读取一条消息后发射时以Kestrel提供的消息id为tuple的消息ID_【KestrelSpout可参考https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/backtype/storm/spout/KestrelThriftSpout.java】_。SpoutOutputCollector 发射消息如下:
_collector.emit(new Values("field1", "field2", 3) , msgId);
接下来tuple会被发送给消费的bolts,并且storm会注意跟踪这个被创建的消息树。如果storm检测到一个tuple被完全处理,则会调用原发射它的spout task中的ack方法并传入该tuple被spout发射时的消息ID作为参数。同样如果tuple处理超时,storm会调用spout中的fail方法。注意不管是ack还是fail,都是调用的原来创建该tuple的spout task的方法。所以当spout在集群中以多个并行task的形式执行时,一个tuple不会被除创建它的spout task以外的其他task调用ack或fail。
让我们再以KestrelSpout来看下spout要保证消息的处理该怎么做,当KestrelSpout从Kestrel队列中读取出一条消息,它“打开”了这条消息。这里的意思是这条消息没有真正从队列中脱离,而是被放置到了一个“挂起”的状态等待消息的完成确认。在此期间,该消息不会被其他的客户端消费。此外,如果客户端断开连接了,则该客户端的所有的挂起消息会被重新放回队列。当一条消息被打开,Kestrel提供给客户端消息数据和一个唯一的消息ID,KestrelSpout用这个ID作为storm中发射tuple时的“消息ID”。后续某时刻,当KestrelSpout的ack或fail方法被调用时,KestrelSpout会通过Kestrel客户端确认消息已经被消费或是重新放回消息队列。【注:这里涉及到Kestrel这个消息队列的一个机制: 当某个客户端消费一条消息后,消息会进入一个挂起状态,只有待调用“确认”或“取消”的操作后才能真正确定消息是否真正被消费还是重新回到队列继续被其他客户端消费。】
Storm的可靠性API是什么
作为用户想使用好storm的可靠性必须做到如下两点:
- 无论什么时候你在tuple消息树上创建了新的连接你都要告知storm;
- 当你完成某个独立的tuple的处理时你必须告知storm;
这样storm就能在tuple消息树被完全处理后检测到,并恰当的调用ack或fail。Storm API提供了一个简洁的方式去完成这两点。
指定一个连接在tuple消息树上被称作“锚定”,锚定是在新的tuple被发射后执行的。我们以wordcount中的SplitSentence为例,这个bolt拆分一个包含一条句子的tuple为包含每个单词的tuple:
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
每个单词tuple被锚定是通过调用emit方法时将输入的tuple作为第一个参数。
看下源码注释说明:
/**
* Emits a new tuple to the default stream anchored on a single tuple. The emitted values must be
* immutable.
*
* @param anchor the tuple to anchor to
* @param tuple the new output tuple from this bolt
* @return the list of task ids that this new tuple was sent to
*/
public List
只要单词tuple被锚定,则当单词tuple在下游的流中处理失败,消息树根也就是spout中的tuple会在后续重放。作为对比,让我们看下如果像下面代码这样发射会有什么效果:
_collector.emit(new Values(word));
这样发射一个单词tuple导致它未被锚定,如果下游的tuple处理失败,根部的tuple不会重放。根据对容错机制的不同考虑,有时也会适时的用到这样非锚定的tuple。
一个输出的tuple可以锚定到多个输入的tuple上,当在处理多个流的结合和聚合时这是有用的。 一个多锚定tuple在处理失败后会引起spout中多个tuple被重放。emit方法在调用时传入多个锚点的list实现:
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
多锚定让输出tuple进入多个tuple树,也就是说多锚定会打破树的结构,从而形成有向无环图(DAG):
Storm是可以支持DAG图结构的流式处理的,早期的发行版只支持树的结构。
Storm可靠性API分两步,第一步通过锚定的作用构建一个tuple树,第二步也是最后一步是通过OutputCollector的ack和fail方法完成某个tuple的处理。可以看到上面SplitSentence例子中,首先通过锚定的方式发射所有单词tuple,然后会对输入的tuple执行ack操作,确认其已经完成处理。
你可以通过OutputCollector的fail方法使tuple树根上的Spout tuple立即失败。比如你的程序可能会在数据库客户端捕获到一个异常然后显式的使输入的tuple失败。tuple显式的失败的好处是,spout可以更快速的进行数据的重放,而不用等待tuple的超时。
在storm中你处理的每个tuple都必须执行ack 或 fail操作。Storm是通过内存跟踪每个tuple的,所以如果你不对每个tuple执行ack或fail操作的话,最终可能会导致内存溢出。
绝大部分的bolts都会遵循一个常用的模式: 首先读取输入的tuple,发送基于这个tuple创建的新的tuples,然后在execute方法结束前对这个输入的tuple执行ack操作。Bolts可以分为两类:过滤或者是函数处理。Storm有个名为IBasicBolt的接口封装了这个模式。SplitSentence的例子可以写成下面这样子:
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
这种实现方式比之前通过继承BaseRichBolt的方式更简单,语义也更相近。tuple发送给BasicOutputCollector时自动与输入的tuple进行锚定了, 并且当execute方法完成后会自动对输入的tuple执行ack操作。
比较起来,执行聚合或连接的bolts可能会延迟对tuple进行ack,因为这些bolt须要基于一群tuples来计算它的结果,然后才会执行ack操作。因此聚合和连接的场景下通常都是多锚定它们的输出tuples。这种场景超出了IBasicBolt这种简单模式的处理能力范围。(所以这种场景须要用到IRichBolt接口)
如果tuples可以被重放,怎样让程序正常工作呢?
对于软件设计这一答案通常是“视情况而定”。Storm 0.7.0 介绍了“事务型的拓扑”特性,在大部分的计算中可以满足完全容错恰好一次消息处理语义。更多事务型拓扑的内容可以参考http://storm.apache.org/releases/0.9.7/Transactional-topologies.html
Storm如何高效的实现可靠性
Storm拓扑有一组特殊的“acker”任务,用于跟踪每个Spout tuple的tuples DAG图。当一个acker 认为一个DAG图是完成了,它会发送一条消息给spout任务告诉它给对这个tuple执行ack操作。你可以通过 Config.TOPOLOGY_ACKERS配置一个拓扑的acker任务的数量,默认是1个。当你的拓扑须要处理大数据量的消息时,你须要增加acker的数量。
理解storm可靠性实现的最好的方式是研究下tuple和tuple构成的DAG图的生命周期。当拓扑中的一个tuple被创建,不管是在spout还是bolt中创建的,它都会分配一个64位的id,这个id被acker用于跟踪每个spout tuple的tuple DAG图。
每个tuple都知道所有的spout tuples的id,因为这些id存在于整个tuple树中。当你在bolt中发射一个新的tuple, 输入tuple的锚点中的spout tuple ids就会被拷贝到新的tuple中去。当一个tuple被执行ack操作时会发送一条消息给合适的acker 任务,这条消息中包含了tuple树发生了怎样的变化的信息, 这个消息可以解释为: “我在spout tuple树中已经完成了,树上有一些新的tuples产生是锚定在我上的”。
以下图为例,如果“D” 和 “E" tuple是基于 ”C“ 创建的, 下图阐明了当tuple ”C“ 完成执行ack时,这个tuple树发生的变化:
由于 ”C“ 从树中移除的同时 ”D“ 和 ”E“ 被添加到进来, 所以树永远不会提前完成。
这里有一些更详细的内容关于Storm是如何跟踪tuple树的。上面已经提到的你可以指定拓扑中acker任务的个数, 这会导致下面的问题:当拓扑中的一个tuple执行了ack,它怎么知道ack消息应该发送给哪个acker任务呢?
Storm是使用取模哈希算法去映射一个spout tuple id到某个acker 任务的, 因为每个tuple都附带了它们已经存在的所有树中的Spout tuple ids, 所以它们知道应该跟哪个acker任务去通信。
另一个细节问题是acker任务怎么知道它跟踪的每个spout tuple是由哪个spout任务负责的呢。 当一个spout任务提交一个新的tuple时,它会发送一个消息给合适的acker任务告知它负责这个spout tuple的spout 任务的ID,所以当一个树完成后,acker任务知道要给哪个spout 任务发送完成的消息。
Acker任务并不会显式的去跟踪tuple树,对于具有数万个(或更多)的大型tuple树,跟踪所有的tuple树可能会压垮acker使用的内存。作为替代,acker采用一种不同的策略即每个spout tuple仅要求一个固定大小的内存空间(大概20字节),那么跟踪算法就成为了Storm正常工作的关键,也是最主要的技术突破之一。
一个acker任务存储一个 以spout tuple id为KEY, 以一对值为VALUE的Map。VALUE中第一个值是创建spout tuple的任务的ID,后续被用于找到指定的spout任务发送完成消息,第二个值是一个64位的数值称作”ack val“, ”ack val“代表了整个tuple树的状态, 不管树有多大,它其实就是树上所有被创建的或者执行ack了的tuple id的异或结果。
当一个acker任务检测到”ack val“的值变为0时,它就认为tuple树已经完成处理。 tuple id是一个随机的64位的数值,所以”ack val“偶然变为0的概率是极小的。【这里我目前的理解是偶然变为0是由于随机的64位数值出现碰撞也就是出现两个相同的随机值的情况。】 如果你以每秒10K的速度进行数学计算,则需要花费5000万年才出现错误。即使如此,如果该tuple恰好在拓扑中发生故障,它也只会导致数据丢失。
现在你已经了解了可靠性算法,下面我们来看下所有的失败的情况下如何避免数据丢失的:
- 由于任务挂掉导致tuple没有被执行ack操作: 这种情况下失败的tuple的树根部的spout tuple会有超时机制,当超时时会进行消息重放;
- Acker任务挂掉:这种情况下所有该acker任务跟踪的spout tuples都会超时并进行重放;
- Spout任务挂掉:这种情况下spout任务获取数据的数据源会负责消息的重放。比如像Kestrel和RabbitMQ这些消息系统在客户端断开连接后会将所有的状态为挂起的消息重新放回队列。
如你所看到的,Storm的可靠性机制是完全分布式的、可伸缩的和容错的。
可靠性调优
Acker任务是轻量的,所以一个拓扑中不需要非常多的acker任务。你可以通过storm ui上的组件ID“__acker” 跟踪acker任务的执行情况,如果吞吐量不理想你可能须要增加更多的acker任务。【当storm ui上查看某个拓扑信息时,默认是不显示“__acker”的,页面最下方有个“Show System Stats”的按钮点击下,则在Bolts栏中可以看到"__acker"组件以及其性能情况,见下图】
如果可靠性对你来说不重要也就是说你可以接受处理失败数据丢失的情况,那么你可以通过不去跟踪spout tuple的树提升性能。由于正常情况下每个tuple树上的tuple都要有一条ack消息,所以不跟踪tuple树的话须要传递的消息的数量会减半。此外,由于跟踪tuple树,下游的tuple不需要维系上游的tuple的id信息,这样也减少了带宽占用。
有三种方式可以取消可靠性保证:
- 第一种方式是配置Config.TOPOLOGY_ACKERS的值为0,这种情况下spout发射完tuple后,Storm就会立即调用spout的ack方法,不会去等待tuple树处理完成。
- 第二种方式是基于消息来移除可靠性,可以通过SpoutOutputCollector.emit方法提交spout tuple时忽略message id,这样就关闭了消息跟踪。
- 最后一种,当你的拓扑中不在乎某些tuple的子集是否处理失败的时候,你可以不带锚定的发射它们。由于它们没有锚定到任何spout tuple上,所以它们不会引起任何spout tuple执行失败即使它们不执行ack操作。【也就是说整个storm流处理过程中某些环节的一些tuple你可以接受它处理失败丢失的情况,可以采用这种方式】