前言
在rocketMQ里面一般有两种获取消息的模式,一种是push, 一种是pull ,其实本质上都是pull ,只不过在于两者实现的机制不太一样,在之前的文章中介绍过push模式,此处不再做赘述。
pull消息模式呢,取消息的过程需要用户自己写,获取topic的消息队列,然后循环队列获取消息,上报offset, 直到最后取完了,换下一个队列,
官方demo
public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name"); consumer.setNamesrvAddr("localhost:9876"); consumer.start(); // 获取消费者的队列 Set<MessageQueue> mqs = consumer.fetchMessageQueuesInBalance("TopicTest"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { // 获取消息 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); // 更新offset putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); }
如果看过笔者之前写的一篇文章RocketMQ系列之push(推)消息模式(六)
应该就会很明显的感受到,pull模式下的消息就是用户自己写,自己拿topic的messageQueue的集合去broker里面拉取消息,而push模式下全部是rocketMq帮我们做好了
image-20201107111124853
pull模式是获取当前consumer里面负载到的messageQueue, 然后循环拉取每个消息队列里面的消息内容,上报offset的进度,
image-20201107111508985
push模式以每个messageQueue构建一个队列任务,后台线程异步的去拉取, 根据borker阻塞的时间可以实现长轮询和短轮询,
优缺点对比
在此对比一下push和pull两种模式的优缺点
push
优点:
1.push模式采用长轮询阻塞的方式获取消息,实时性非常高,用户体验好
2.rocketMq处理了获取消息的细节,使用起来比较简单方便
缺点:
1.当消费者能力远远低于生产者能力的时候,会产生一定的消费者消息堆积,消息堆积会占用消费者服务的资源,主要在于内存资源
解决方案:
rocketMq针对push模式提供了流量控制,有三种,单个队列消息数量(默认1000),单个队列内存中的大小(默认100M), 消息跨度(2000), 通过这三种控制,可以有效的控制消息对消费者的影响,各位可以根据自己项目的实际情况进行调整。
pull
优点:
1.想消费多少就消费多少,想怎么消费就怎么消费,哈哈,灵活性较大,不存在过多占用消费者资源的问题
缺点:
1.实时性很低
2.拉取消息的间隔不好设置,太短则borker压力大,太长则实时性很低。
在实际生产环境中,笔者一直使用的push消息模式,pull模式这里随手写下,不做重点描述啦!
欢迎关注我的微信公众号:【sharedCode】
回复:“资源”、“架构”等关键词获取海量免费学习资料。
本文分享自微信公众号 - sharedCode(sharedCode)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。