在写这个文章前不得不吐槽目前国内一些blog的文章,尽是些复制粘贴的文章,提到点上但没任何的深入和例子。.........
经过测试下来总结一下RabbitMQ的Exchange的特性:
1、direct
生产者可以指定路由键,消费者可以指定路由键,但不能讲路由键设置为#(全部)。
2、topic
生产者可以指定路由键,消费者可以指定路由键,也可以不指定(或者是#)。
3、fanout
生产者和消费都忽略路由键。
在现实的场景里,通常是生产者会生产多个路由键的消费,然后多个消费来消费指定路由键的消息,但通常生产者的生产代码是同一份,如何在发消息的时候动态指定当前消息的路由键呢?
例子:门店平台系统集中处理多个门店的数据,然后分别将不同门店的数据发送到不同的门店(即:A门店只消费属于A门店的消息,B门店只消费属于B的消息)
看例子:
application.yml
1 spring:
2 cloud:
3 stream:
4 # 设置默认的binder
5 default-binder: pos
6 binders:
7 scm:
8 type: rabbit
9 environment:
10 spring:
11 rabbitmq:
12 # 连接到scm的host和exchange
13 virtual-host: scm
14 pos:
15 type: rabbit
16 environment:
17 spring:
18 rabbitmq:
19 # 连接到pos的host和exchange
20 virtual-host: pos
21
22 shop:
23 type: rabbit
24 environment:
25 spring:
26 rabbitmq:
27 # 连接到shop的host和exchange
28 virtual-host: shop
29
30 bindings:
31 # ---------消息消费------------
32
33 # 集单开始生产消费
34 order_set_start_produce_input:
35 binder: pos
36 destination: POS_ORDER_SET_STRAT_PRODUCE
37 group: pos_group
38
39 # 门店ID为1的消费者
40 shop_consumer_input_1:
41 binder: shop
42 destination: POS_ORDER_SET_STRAT_PRODUCE
43 group: shop_1_group
44
45
46 #-----------消息生产-----------
47 # 集单开始生产通知生产
48 order_set_start_produce_output:
49 binder: pos
50 destination: POS_ORDER_SET_STRAT_PRODUCE
51
52 rabbit:
53 bindings:
54 # 集单开始生产消费者
55 order_set_start_produce_input:
56 consumer:
57 exchangeType: topic
58 autoBindDlq: true
59 republishToDlq: true
60 deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_POS_DLX
61 #bindingRoutingKey: '#'
62 # 门店1的消费者
63 shop_consumer_input_1:
64 consumer:
65 exchangeType: topic
66 autoBindDlq: true
67 republishToDlq: true
68 deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_SHOP_1_DLX
69 bindingRoutingKey: 1
70 deadLetterRoutingKey: 1
71
72 # 生产者配置
73 order_set_start_produce_output:
74 producer:
75 exchangeType: topic
76 routingKeyExpression: headers.shopId
77 # routingKeyExpression: headers['shopId']
上面的配置文件配置了一个动态的基于shopId做路由的生产者配置,一个消费全部路由键的消费者,如果要配置指定路由键的可以在配置文件里设置bindingRoutingKey属性的值。
生产者java代码:
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.longge.pos.production.mq.dto.OrderSetProductionMsg;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MqSendUtil {
private static MessageChannel orderSetStartProduceChannel;
public static void setSfOrderCreateChannel(MessageChannel channel) {
sfOrderCreateProduceChannel = channel;
}
public static void sendOrderSetPrintMsg(OrderSetProductionMsg msg) {
// add kv pair - routingkeyexpression (which matches 'type') will then evaluate
// and add the value as routing key
log.info("发送开始生产的MQ:{}", JSONObject.toJSONString(msg));
orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).setHeader("shopId", msg.getOrderSet().getShopId()).build());
//orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).build());
}
}
动态路由的核心在于上面那个红色的字体的地方,这个是和配置文件里的 routingKeyExpression 的配置是匹配的。