#RocketMQ Filtersrv详解 Metaq 3.0以后改名为RocketMQ,阿里云的ONS则是基于RocketMQ搭建的。今天闲着无事,把RocketMQ的代码clone到本地分析了一下,从clone下来的代码,结构还是比较清晰的,代码量也不是很多。我发现其中一个模块我之前没有听说过的。于是就抱着一探究竟的目的,去看看它是何物,这个模块就是filtersrv,是RocketMQ在3.0.11版本以后加入的模块。下面将对它进行讨论和分析,这里假设你已经对RocketMQ有一定的了解,不对如何搭建它做任何介绍。 ##RocketMQ基本组件 在RocketMQ中分为client(consumer,producer)、broker和namesrv,下面对这三者进行一下简单的介绍。
- client:是开发主要面对的模块,它主要提供consumer订阅消息,提供producer发布消息,提供简洁的API。
- broker:是每个RocketMQ中最核心的部分,该组件提供消息的存储和分发,producer将某个topic的消息发布到broker,而consumer将从broker订阅某个topic的消息
- namesrv:可以理解为注册中心,没个broker启动则将会将自己的信息发布到namesrv,发布到namesrv的信息包括broker提供的topic信息,那么client启动的时候,则将自己所需要的topic向namesrv订阅,namesrv则会返回提供订阅topic的broker信息(主要是地址相关信息),获的broker信息,那么client可以直接和broker通信。
如果按照dubbo的思维去看待,可以把topic理解为一个服务的接口,broker则是实现该服务的提供者,namesrv则是dubbo中的注册中心,这样就不难理解RocketMQ中这三个组件了。基于这个,那么这三者之间的通信是下图方式:
上面讲了一大堆貌似还没有filtersrv的内容,下面将进入今天的主题——filtersrv
##Filtersrv是何物 相信从名字可以大概看到它的目的,没错他就是过滤服务器,那是对什么进行过滤呢?以及我们要怎么配置才可以让它工作呢?
在深入介绍它之前,先看看怎么用它,上代码:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
consumer.setNamesrvAddr("localhost:20881");
/**
* 使用Java代码,在服务器做消息过滤
*/
consumer.subscribe("TopicFilter7","MessageFilterImpl","import com.alibaba.rocketmq.common.filter.MessageFilter;\n" +
"import com.alibaba.rocketmq.common.message.MessageExt;\n" +
"\n" +
"\n" +
"public class MessageFilterImpl implements MessageFilter {\n" +
"\n" +
" @Override\n" +
" public boolean match(MessageExt msg) {\n" +
" String property = msg.getUserProperty(\"SequenceId\");\n" +
" if (property != null) {\n" +
" int id = Integer.parseInt(property);\n" +
" if ((id % 3) == 0 && (id > 10)) {\n" +
" return true;\n" +
" }\n" +
" }\n" +
"\n" +
" return false;\n" +
" }\n" +
"}");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
try {
for (int i = 0; i < 6000000; i++) {
Message msg = new Message("TopicFilter7",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
msg.putUserProperty("SequenceId", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
你可能会发现为什么这里的Consumer订阅消息的时候这么复杂?一串java类的源码,并且这个类还实现了MessageFilter
接口,如果你仔细看会发现MessageFilter
的方法match
会对当前消息中的某个自定义属性判断,返回boolean值。这个是什么意思呢?不妨看看MessageFilter
的接口定义
/**
* 服务端消息过滤接口,Consumer实现这个接口后,Consumer客户端会注册这段Java程序到Broker,由Broker来编译并执行,
* 以达到服务器消息过滤的目的
*/
public interface MessageFilter {
/**
* 过滤消息
*
* @param msg
* @return 是否可以被Consumer消费
*/
public boolean match(final MessageExt msg);
}
相信看到这个注释就应该知道这个是干嘛的吧?其实就是对当前消费者接受的消息进行一次过滤。如果在延伸点就是可以自定义负载均衡,上面的MessageFilter
既然是字符串,那是不是表示可以动态生成这个代码呢?对的,那么基于这个可以动态对同一个Group下面的消费者提供不同的过滤原则。那么下面让我们在看深点,看看consumer是什么时候通知filtersrv,以及filtersrv是在整个RocketMQ定位是什么?分析如果有了filtersrv之后,整个订阅消息和发布消息又和异常。
##Broker和Filtersrv绑定
###Broker和Filtersrv的关系
public class FiltersrvConfig {
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
System.getenv(MixAll.NAMESRV_ADDR_ENV));
// 连接到哪个Broker
private String connectWhichBroker = "127.0.0.1:10911";
// Filter Server对外服务的IP
private String filterServerIP = RemotingUtil.getLocalAddress();
// 消息超过指定大小,开始压缩
private int compressMsgBodyOverHowmuch = 1024 * 8;
// 压缩Level
private int zipCompressLevel = 5;
// 是否允许客户端上传Java类
private boolean clientUploadFilterClassEnable = true;
// 过滤类的仓库地址
private String filterClassRepertoryUrl = "http://fsrep.tbsite.net/filterclass";
private int fsServerAsyncSemaphoreValue = 2048;
private int fsServerCallbackExecutorThreads = 64;
private int fsServerWorkerThreads = 64;
.....
}
上面是FiltersrvConfig
的定义,所有关于filtersrv的配置信息都会封装在这个里面。可以看到主要要配置三个参数namesrvAddr
、namesrvAddr
和connectWhichBroker
(此处补充一下:RocketMQ每个模块具体有哪些配置项的规则还是比较简单的可以看看MixAll
里面的properties2Object
方法,这个是自动将properties文件自动填充到FiltersrvConfig
,BrokerConfig
、NamesrvConfig
、NettyServerConfig
和NettySystemConfig
等配置对象中),其中namesrvAddr
和namesrvAddr
两个属性和broker的一样,而connectWhichBroker
表示当前filtersrv是属于哪个broker的,这里可以看出一个filtersrv只能属于一个broker,那是不是一个broker可以有多个filtersrv呢?这个后面介绍。
为了探究broker和filtersrv之间的关系,有必要看看broker的启动和注册到namesrv的过程,RocketMQ的一点好处就是基本每个模块都是通过XXXStartup
加上XXController
,那么broker也不例外,它的入口是BrokerStartup
和BrokerController
。如果你展开了rocketmq-broker的包接口,一个很醒目的filtersrv
包名,不用想也知道,这就是broker和filtersrv关联的主要地方。
代码看上去并没有很多,如果你打开BrokerController
不难发现它有一个filterServerManager
属性,就是FilterServerManager
。看broker的启动,需要看看BrokerController
的start
方法,看里面做了什么。
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
this.remotingServer.start();
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
this.registerBrokerAll(true, false);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
}
catch (Exception e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
this.addDeleteTopicTask();
发现是启动了各个组件,其中也触发了filterServerManager
的start
方法,另外还设置了一个定时器定时器将当前broker注册到namesrv。上面看到是在filtersrv需要配置broker的地址信息,而broker并没有配置filtersrv的地方,那可以猜测是filtersrv启动的时候向broker发送了一个消息,表示告诉broker你有一个filtersrv了。我就抱着这个猜想去看看是不是真的这样,要看broker监听请求,那就需要看看remotingServer
了,这里先说明一下RocketMQ用来处理远程的请求的均是NettyRequestProcessor
接口的实现,那么可以发现在BrokerController
的initialize
方法调用了registerProcessor
方法,可以看到在方法registerProcessor
中,向remotingServer
注册了各种NettyRequestProcessor
实现,其中有一个AdminBrokerProcessor
做为默认的processor。看看AdminBrokerProcessor
里面的实现,发现里面有注册filtersrv的实现:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
......
case RequestCode.REGISTER_FILTER_SERVER:
return this.registerFilterServer(ctx, request);
// 根据 topic 和 group 获取消息的时间跨度
......
default:
break;
}
return null;
}
果然是我们猜测的那样,是filtersrv向broker注册。下面我就不卖关子了,直接告诉filtersrv在哪里向broker注册了自己,在FiltersrvController
的initialize
方法的时候调用了FilterServerOuterAPI
的registerFilterServerToBroker
方法将自己注册到broker里面去了。上面说了一个filtersrv只能属于一个broker,那一个broker可不可以有多个filtersrv呢?下面揭开这个谜底。上面已经知道broker在哪里处理filtersrv的注册请求,那么我们不妨看看registerFilterServer
方法做了什么事情:
private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
final RegisterFilterServerResponseHeader responseHeader =
(RegisterFilterServerResponseHeader) response.readCustomHeader();
final RegisterFilterServerRequestHeader requestHeader =
(RegisterFilterServerRequestHeader) request
.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(),
requestHeader.getFilterServerAddr());
responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId());
responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
其实最后还是交给了FilterServerManager
的registerFilterServer
方法,继续深入进去,看看这个方法里面做了什么。
public void registerFilterServer(final Channel channel, final String filterServerAddr) {
FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
if (filterServerInfo != null) {
filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
}
else {
filterServerInfo = new FilterServerInfo();
filterServerInfo.setFilterServerAddr(filterServerAddr);
filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
this.filterServerTable.put(channel, filterServerInfo);
log.info("Receive a New Filter Server<{}>", filterServerAddr);
}
}
发现是将filtersrv信息封装成了FilterServerInfo
并且放到了filterServerTable
一个 Map
对象中,既然这里是一个集合,那就不难知道一个broker可以有多个filtersrv,这个问题就解决了。
上面一直是在说filtersrv和broker之间的关系,以及filtersrv如果将自己注册到broker中。下面我们看看有了filtersrv的broker又如何将自己发布到namesrv中。
###Broker如何将自己注册到Namesrv
要回答上面的问题,其实就要知道Broker如何将自己注册到Namesrv中。如果没记错,上面以及说过在 BrokerController
有定时将自己注册到Namersrv中,那这个就简单了,看看那个里面做了什么,这个问题基本就解决了。在 BrokerController
的registerBrokerAll
方法便是将broker发布到所有的namesrv中的逻辑。
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
TopicConfigSerializeWrapper topicConfigWrapper =
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(topicConfigWrapper.getTopicConfigTable());
for (TopicConfig topicConfig : topicConfigTable.values()) {
topicConfig.setPerm(this.getBrokerConfig().getBrokerPermission());
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(//
this.brokerConfig.getBrokerClusterName(), //
this.getBrokerAddr(), //
this.brokerConfig.getBrokerName(), //
this.brokerConfig.getBrokerId(), //
this.getHAServerAddr(), //
topicConfigWrapper,//
this.filterServerManager.buildNewFilterServerList(),//
oneway);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
public List<String> buildNewFilterServerList() {
List<String> addr = new ArrayList<String>();
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, FilterServerInfo> next = it.next();
addr.add(next.getValue().getFilterServerAddr());
}
return addr;
}
可以看到上面的方法调用了filterServerManager
的buildNewFilterServerList
方法,而buildNewFilterServerList
方法则是从上面说的filterServerTable
集合中把filtersrv的地址拷贝出来,不得不说世界是圆的。到这里我们知道broker将自己注册到namesrv的时候,其实是将属于自己的filtersrv也注册到了namesrv中,那我这里又猜测一下,client端通过topic查找某个broker的时候也会将附带的filtersrv提供给client端,因为这也是属于broker的一部分。
##Client端通过Topic查找broker
要看client通过topic查找broker,就需要去看看namesrv端的处理了。在整个RocketMQ中,我个人觉得namesrv实现比较简洁,不光是代码量,实现比较直接,并没有过多的嵌套。整个namesrv处理来自broker和client的请求均是在DefaultRequestProcessor
类中实现。那么我们找一下在哪里是通过topic查找broker的。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
....
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 新版本Broker,支持Filter Server
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
// 低版本Broker,不支持Filter Server
else {
return this.registerBroker(ctx, request);
}
....
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
.....
default:
break;
}
return null;
}
此处顺便也说一下broker注册的处理,可以看到在处理broker注册的时候有进行版本判断,可以看到如果版本大于3.0.11,那么就有通过filtersrv,注册的实现方法是registerBrokerWithFilterServer
里面的具体实现,可以自行去看看。接下来看看通过topic查找broker的处理,其实就是getRouteInfoByTopic
方法的逻辑。
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request
.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData =
this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(
NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
可以看到是通过RouteInfoManager
的pickupTopicRouteData
方法获取某个topic的broker信息,如果没有则提示没有该topic匹配的broker。看看在pickupTopicRouteData
方法中的实现:
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
// BrokerName去重
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData();
brokerDataClone.setBrokerName(brokerData.getBrokerName());
brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
// 增加Filter Server
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
}
finally {
this.lock.readLock().unlock();
}
}
catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
if (log.isDebugEnabled()) {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
}
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
可以看到当前namesrv的filterServerTable
中获取了某个broker的filtersrv列表,并且整个信息是封装在TopicRouteData
返回给client查找的请求,那么client端拿到的TopicRouteData
里面就包含了filtersrv列表,那么接下来看看client怎么来处理这些filtersrv的。
这里再补充一下,其实只有consumer才会需要filtersrv,所以只有consumer端才会利用这个信息,而producer不会对这些信息。而consumer通过topic获取filtersrv的方式是先同步获取一次,后面还会有定时器定时发起查询topic的broker信息。这里如果有兴趣可以看看MQClientInstance
的updateTopicRouteInfoFromNameServer
方法,定时调用是在 startScheduledTask
方法触发,而这个方法是在消费端start的时候,而一次同步触发是在触发订阅某个topic的方法时候。
下面需要看看consumer如何利用TopicRouteData
实体处理filtersrv集合的。
##Consumer如何处理filtersrv的
###consumer如何将本地的MessageFilter同步到filtersrv
我们知道执行MessageFilter
实现逻辑是在filtersrv,那要怎么告诉filtersrv呢?同样也是需要递归namesrv返回的TopicRouteData
中filtersrv地址列表,将信息同步到filtersrv中。
这部分逻辑是在MQClientInstance
的sendHeartbeatToAllBrokerWithLock
,而这个方法同样也是会有一次同步调用,以及后面会定时出发,和updateTopicRouteInfoFromNameServer
方法一样。
由于篇幅问题,这里就不贴代码,感兴趣可以去看看这部分逻辑。
###filtersrv如何处理来自consumer的MessageFilter的实现逻辑
上面说过,RocketMQ处理来自远程的请求是NettyRequestProcessor
的子类实现的,filtersrv也不例外,处理它这部分逻辑的是DefaultRequestProcessor
。下面看看他的基本实现:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws Exception {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:
return registerMessageFilterClass(ctx, request);
case RequestCode.PULL_MESSAGE:
return pullMessageForward(ctx, request);
}
return null;
}
可以看到它这里只处理两种请求,一种是注册MessageFilter
实现类的,另一个则是consumer发起的拉去消息,这就是为什么 MessageFilter
只能处理来自消费端的请求了。处理注册MessageFilter
实现类是方法registerMessageFilterClass
,最终会在FilterClassManager
动态编译来自consumer的java源码生成一个MessageFilter
的类对象,然后反射生成对象。具体逻辑如下:
public boolean registerFilterClass(final String consumerGroup, final String topic,
final String className, final int classCRC, final byte[] filterSourceBinary) {
final String key = buildKey(consumerGroup, topic);
// 先检查是否存在,是否CRC相同
boolean registerNew = false;
FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
if (null == filterClassInfoPrev) {
registerNew = true;
}
else {
if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) {
registerNew = true;
}
}
}
// 注册新的Class
if (registerNew) {
synchronized (this.compileLock) {
filterClassInfoPrev = this.filterClassTable.get(key);
if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
return true;
}
try {
FilterClassInfo filterClassInfoNew = new FilterClassInfo();
filterClassInfoNew.setClassName(className);
filterClassInfoNew.setClassCRC(0);
filterClassInfoNew.setMessageFilter(null);
if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);
Object newInstance = newClass.newInstance();
filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
filterClassInfoNew.setClassCRC(classCRC);
}
this.filterClassTable.put(key, filterClassInfoNew);
}
catch (Throwable e) {
String info =
String
.format(
"FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
consumerGroup, topic, className);
log.error(info, e);
return false;
}
}
}
return true;
}
那么这里就有问题需要注意了。由于MessageFilter
是在Consumer端实现的,如果在实现类中用了第三方的api,并且在filtersrv中不存在,那么将会导致传输到filtersrv的java源码无法编译,如果需要能够编译通过,需要将该第三方api的jar包也放到filtersrv的classpath下面。同时还需要注意的是,你的实现类不能有包名,不然也会导致编译不通过。个人觉得这部分实现有点粗糙,我还是第一次看到将java源码传输到远程服务编译再执行的,这部分实现欠妥。
##Consumer如何触发filtersrv
###Consumer“狸猫换太子”
上面说了那么多,这里就要回到重点,消费端获取消息的时候怎么触发filtersrv呢?其实为了将filtersrv整合到Consumer感觉对RocketMQ的改动还是比较大的,在PullApiWrapper
中的方法pullKernelImpl
有调用computPullFromWhichFilterServer
类,不妨看看这两个方法的实现:
public PullResult pullKernelImpl(//
final MessageQueue mq,// 1
final String subExpression,// 2
final long subVersion,// 3
final long offset,// 4
final int maxNums,// 5
final int sysFlag,// 6
final long commitOffset,// 7
final long brokerSuspendMaxTimeMillis,// 8
final long timeoutMillis,// 9
final CommunicationMode communicationMode,// 10
final PullCallback pullCallback// 11
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
.........
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(//
brokerAddr,//
requestHeader,//
timeoutMillis,//
communicationMode,//
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
throws MQClientException {
ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
if (topicRouteTable != null) {
TopicRouteData topicRouteData = topicRouteTable.get(topic);
List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);
if (list != null && !list.isEmpty()) {
return list.get(randomNum() % list.size());
}
}
throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
+ topic, null);
}
第一个方法主要看参数brokerAddr
,发现会判断当前订阅的topic是不是有filtersrv有则会调用computPullFromWhichFilterServer
,而这个方法里面则是从当前topic的TopicRouteData
中filtersrv的集合中随机获取一个返回并赋给brokerAddr
,那么此时请求的不是broker了,而是filtersrv,于是你也就不奇怪为什么在filtersrv的DefaultRequestProcessor
里面会处理拉去消息的请求了。这里就很容易理解在consumer端对filtersrv的处理,其实就是“狸猫换太子”的方式。
###filtersrv反向代理
为了看清楚在consumer“狸猫换太子”之后filtersrv是如何处理的,所以还需要看看filtersrv如何处理来自consumer获取消息的。这就需要看看DefaultRequestProcessor
的pullMessageForward
方法
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request)
throws Exception {
final RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
// 由于异步返回,所以必须要设置
response.setOpaque(request.getOpaque());
DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
final FilterClassInfo findFilterClass =
this.filtersrvController.getFilterClassManager().findFilterClass(
requestHeader.getConsumerGroup(), requestHeader.getTopic());
if (null == findFilterClass) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, not registered");
return response;
}
if (null == findFilterClass.getMessageFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Find Filter class failed, registered but no class");
return response;
}
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
// 构造从Broker拉消息的参数
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.filtersrvController.getBrokerName());
long offset = requestHeader.getQueueOffset();
int maxNums = requestHeader.getMaxMsgNums();
final PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
responseHeader.setMaxOffset(pullResult.getMaxOffset());
responseHeader.setMinOffset(pullResult.getMinOffset());
responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
response.setRemark(null);
switch (pullResult.getPullStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
List<MessageExt> msgListOK = new ArrayList<MessageExt>();
try {
for (MessageExt msg : pullResult.getMsgFoundList()) {
boolean match = findFilterClass.getMessageFilter().match(msg);
if (match) {
msgListOK.add(msg);
}
}
// 有消息返回
if (!msgListOK.isEmpty()) {
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
response, msgListOK);
return;
}
// 全部都被过滤掉了
else {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
}
}
// 只要抛异常,就终止过滤,并返回客户端异常
catch (Throwable e) {
final String error =
String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
requestHeader.getConsumerGroup(), requestHeader.getTopic());
log.error(error, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx,
response, null);
return;
}
break;
case NO_MATCHED_MSG:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_NEW_MSG:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_ILLEGAL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
break;
default:
break;
}
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
}
@Override
public void onException(Throwable e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response,
null);
return;
}
};
pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
return null;
}
你会发现在filtersrv中也有一个DefaultMQPullConsumer
,你是否明白了?其实这个时候filtersrv自己也是一个消费端,替consumer请求它所需要的topic的消息,你是否和nginx,apache或者f5这类反向代理对应上了呢?其实filtersrv其实就是一层夹在broker和consumer之间的代理。
最后还有一个重点没说,说了半天的MessageFilter
,它应该在哪里发挥作用呢?看到方法pullMessageForward
方法中匿名类PullCallback
没?在它的onSuccess
方法FOUND
情况下会调用MessageFilter
的match
方法将当前返回给消息进行一次过滤。
这里需要注意一点的是,这个MessageFilter
不能拿来做负载,因为如果通过MessageFilter
的match
过滤之后,对于broker来说,过滤掉的消息是已经消费掉的,其他的消费者是无法再消费的(消息模式为CLUSTERING
)。那这个MessageFilter
还有什么用呢?可以简单理解为他就是一个过滤器!过滤消费端感兴趣的数据。我这里列举一个实例:
比如当前消息模式是BROADCASTING
,由于该模式同一个组的消费者,每条消息都会收到,可能订阅这个topic的消费端不是对所有的消息都感兴趣,那么就有必要加一个过滤器将该消费者感兴趣的数据提供才传给该消费者。这样从通信成本来说会相对减少,因为按照常规filtersrv将会和broker部署在一起,那么它们之间的传输比和consumer之间的传输成本肯定低。
上面对filtersrv在namesrv,broker和consumer中的体现进行了介绍和讨论,那么加入filtersrv之后,整个通信图会是怎么样呢?下图给出了有了filtersrv之后,该是怎么样的情况。