一、分布式系统的基本概念 分布式系统是一种由多个独立的计算机节点组成的系统,这些节点通过网络进行通信和协同工作。分布式系统的主要特点包括(/s/18D6LswoDj-O0jVPJvrgQ2A 提取码: yfu2): 分布式性:节点分布在不同的地理位置,可以通过网络进行通信。 并行性:多个节点可以同时执行任务,提高系统性能。 故障容错性:分布式系统具有高度的可靠性,即使某个节点出现故障,系统也能继续运行。
二、常见的消息队列 当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等,而部分数据库如Redis、MySQL以及phxsql也可实现消息队列的功能。
三、消息队列的组成 1 Broker 消息服务器,作为server提供消息核心服务。
2 Producer 消息生产者,业务的发起方,负责生产消息传输给broker。
3 Consumer 消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理。
4 Topic 主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播。
5 Queue 队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收。
6 Message 消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输。
四、消息队列的特点 采用异步处理模式 消息发送者 可以发送一个消息而无须等待响应。消息发送者 将消息发送到一条 虚拟的通道(主题 或 队列)上,消息接收者 则 订阅 或是 监听 该通道。一条信息可能最终转发给 一个或多个 消息接收者,这些接收者都无需对 消息发送者 做出 同步回应。整个过程都是 异步的。
应用系统之间解耦合 主要体现在如下两点: 发送者和接受者不必了解对方、只需要 确认消息; 发送者和接受者 不必同时在线。 比如在线交易系统为了保证数据的 最终一致,在 支付系统 处理完成后会把 支付结果 放到 消息中间件 里,通知 订单系统 修改 订单支付状态。两个系统是通过消息中间件解耦的。
五、流行消息队列系统比较 传统企业型消息队列ActiveMQ遵循了JMS规范,实现了点对点和发布订阅模型,但其他流行的消息队列RabbitMQ、Kafka并没有遵循JMS规范。 RabbitMQ RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。AQMP协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同queue,消费端根据queue名称消费消息。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后立即删除,不保留历史消息。
六、消息中间件带来的好处 1、解耦:传统模式的缺点:系统间耦合性太强;中间件模式的的优点:将消息写入消息队列,需要消息的系统自己从消息队列中订阅 2、异步:传统模式的缺点:一些非必要的业务逻辑以同步的方式运行,太耗费时间;中间件模式的的优点:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度 3、横向扩展 4、安全可靠 5、顺序保证
七、消息队列的优点 1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。 2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。 3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。 4、复用:一次发送多次消费。 5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。 6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。
八、Java环境下的RabbitMQ示例 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Producer { private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}